]> git.proxmox.com Git - qemu.git/blobdiff - posix-aio-compat.c
vfio-pci: Update slow path INTx algorithm
[qemu.git] / posix-aio-compat.c
index 7b862b5400377b932c374de0fc9eb049e7520330..96e4daf5059fedae6ca8f4d6ed8292562e0ce4a5 100644 (file)
@@ -9,6 +9,8 @@
  * This work is licensed under the terms of the GNU GPL, version 2.  See
  * the COPYING file in the top-level directory.
  *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
  */
 
 #include <sys/ioctl.h>
 #include <unistd.h>
 #include <errno.h>
 #include <time.h>
-#include <signal.h>
 #include <string.h>
 #include <stdlib.h>
 #include <stdio.h>
 
 #include "qemu-queue.h"
 #include "osdep.h"
+#include "sysemu.h"
 #include "qemu-common.h"
 #include "trace.h"
 #include "block_int.h"
+#include "iov.h"
 
 #include "block/raw-posix-aio.h"
 
+static void do_spawn_thread(void);
 
 struct qemu_paiocb {
     BlockDriverAIOCB common;
@@ -41,7 +45,6 @@ struct qemu_paiocb {
     int aio_niov;
     size_t aio_nbytes;
 #define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
-    int ev_signo;
     off_t aio_offset;
 
     QTAILQ_ENTRY(qemu_paiocb) node;
@@ -49,8 +52,6 @@ struct qemu_paiocb {
     ssize_t ret;
     int active;
     struct qemu_paiocb *next;
-
-    int async_context_id;
 };
 
 typedef struct PosixAioState {
@@ -66,6 +67,9 @@ static pthread_attr_t attr;
 static int max_threads = 64;
 static int cur_threads = 0;
 static int idle_threads = 0;
+static int new_threads = 0;     /* backlog of threads we need to create */
+static int pending_threads = 0; /* threads created but not running yet */
+static QEMUBH *new_thread_bh;
 static QTAILQ_HEAD(, qemu_paiocb) request_list;
 
 #ifdef CONFIG_PREADV
@@ -127,8 +131,8 @@ static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
         return -errno;
 
     /*
-     * This looks weird, but the aio code only consideres a request
-     * successful if it has written the number full number of bytes.
+     * This looks weird, but the aio code only considers a request
+     * successful if it has written the full number of bytes.
      *
      * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
      * so in fact we return the ioctl command here to make posix_aio_read()
@@ -179,7 +183,6 @@ qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 
 static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
 {
-    size_t offset = 0;
     ssize_t len;
 
     do {
@@ -187,12 +190,12 @@ static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
             len = qemu_pwritev(aiocb->aio_fildes,
                                aiocb->aio_iov,
                                aiocb->aio_niov,
-                               aiocb->aio_offset + offset);
+                               aiocb->aio_offset);
          else
             len = qemu_preadv(aiocb->aio_fildes,
                               aiocb->aio_iov,
                               aiocb->aio_niov,
-                              aiocb->aio_offset + offset);
+                              aiocb->aio_offset);
     } while (len == -1 && errno == EINTR);
 
     if (len == -1)
@@ -200,6 +203,12 @@ static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
     return len;
 }
 
+/*
+ * Read/writes the data to/from a given linear buffer.
+ *
+ * Returns the number of bytes handles or -errno in case of an error. Short
+ * reads are only returned if the end of the file is reached.
+ */
 static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
 {
     ssize_t offset = 0;
@@ -301,11 +310,14 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
+static void posix_aio_notify_event(void);
+
 static void *aio_thread(void *unused)
 {
-    pid_t pid;
-
-    pid = getpid();
+    mutex_lock(&lock);
+    pending_threads--;
+    mutex_unlock(&lock);
+    do_spawn_thread();
 
     while (1) {
         struct qemu_paiocb *aiocb;
@@ -321,7 +333,9 @@ static void *aio_thread(void *unused)
 
         while (QTAILQ_EMPTY(&request_list) &&
                !(ret == ETIMEDOUT)) {
+            idle_threads++;
             ret = cond_timedwait(&cond, &lock, &ts);
+            idle_threads--;
         }
 
         if (QTAILQ_EMPTY(&request_list))
@@ -330,11 +344,20 @@ static void *aio_thread(void *unused)
         aiocb = QTAILQ_FIRST(&request_list);
         QTAILQ_REMOVE(&request_list, aiocb, node);
         aiocb->active = 1;
-        idle_threads--;
         mutex_unlock(&lock);
 
         switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
         case QEMU_AIO_READ:
+            ret = handle_aiocb_rw(aiocb);
+            if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
+                /* A short read means that we have reached EOF. Pad the buffer
+                 * with zeros for bytes after EOF. */
+                iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
+                           0, aiocb->aio_nbytes - ret);
+
+                ret = aiocb->aio_nbytes;
+            }
+            break;
         case QEMU_AIO_WRITE:
             ret = handle_aiocb_rw(aiocb);
             break;
@@ -352,25 +375,31 @@ static void *aio_thread(void *unused)
 
         mutex_lock(&lock);
         aiocb->ret = ret;
-        idle_threads++;
         mutex_unlock(&lock);
 
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+        posix_aio_notify_event();
     }
 
-    idle_threads--;
     cur_threads--;
     mutex_unlock(&lock);
 
     return NULL;
 }
 
-static void spawn_thread(void)
+static void do_spawn_thread(void)
 {
     sigset_t set, oldset;
 
-    cur_threads++;
-    idle_threads++;
+    mutex_lock(&lock);
+    if (!new_threads) {
+        mutex_unlock(&lock);
+        return;
+    }
+
+    new_threads--;
+    pending_threads++;
+
+    mutex_unlock(&lock);
 
     /* block all signals */
     if (sigfillset(&set)) die("sigfillset");
@@ -381,6 +410,27 @@ static void spawn_thread(void)
     if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
 }
 
+static void spawn_thread_bh_fn(void *opaque)
+{
+    do_spawn_thread();
+}
+
+static void spawn_thread(void)
+{
+    cur_threads++;
+    new_threads++;
+    /* If there are threads being created, they will spawn new workers, so
+     * we don't spend time creating many threads in a loop holding a mutex or
+     * starving the current vcpu.
+     *
+     * If there are no idle threads, ask the main thread to create one, so we
+     * inherit the correct affinity instead of the vcpu affinity.
+     */
+    if (!pending_threads) {
+        qemu_bh_schedule(new_thread_bh);
+    }
+}
+
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
@@ -416,33 +466,37 @@ static int qemu_paio_error(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-static int posix_aio_process_queue(void *opaque)
+static void posix_aio_read(void *opaque)
 {
     PosixAioState *s = opaque;
     struct qemu_paiocb *acb, **pacb;
     int ret;
-    int result = 0;
-    int async_context_id = get_async_context_id();
+    ssize_t len;
+
+    /* read all bytes from signal pipe */
+    for (;;) {
+        char bytes[16];
+
+        len = read(s->rfd, bytes, sizeof(bytes));
+        if (len == -1 && errno == EINTR)
+            continue; /* try again */
+        if (len == sizeof(bytes))
+            continue; /* more to read */
+        break;
+    }
 
     for(;;) {
         pacb = &s->first_aio;
         for(;;) {
             acb = *pacb;
             if (!acb)
-                return result;
-
-            /* we're only interested in requests in the right context */
-            if (acb->async_context_id != async_context_id) {
-                pacb = &acb->next;
-                continue;
-            }
+                return;
 
             ret = qemu_paio_error(acb);
             if (ret == ECANCELED) {
                 /* remove the request */
                 *pacb = acb->next;
                 qemu_aio_release(acb);
-                result = 1;
             } else if (ret != EINPROGRESS) {
                 /* end of aio */
                 if (ret == 0) {
@@ -454,40 +508,20 @@ static int posix_aio_process_queue(void *opaque)
                 } else {
                     ret = -ret;
                 }
+
+                trace_paio_complete(acb, acb->common.opaque, ret);
+
                 /* remove the request */
                 *pacb = acb->next;
                 /* call the callback */
                 acb->common.cb(acb->common.opaque, ret);
                 qemu_aio_release(acb);
-                result = 1;
                 break;
             } else {
                 pacb = &acb->next;
             }
         }
     }
-
-    return result;
-}
-
-static void posix_aio_read(void *opaque)
-{
-    PosixAioState *s = opaque;
-    ssize_t len;
-
-    /* read all bytes from signal pipe */
-    for (;;) {
-        char bytes[16];
-
-        len = read(s->rfd, bytes, sizeof(bytes));
-        if (len == -1 && errno == EINTR)
-            continue; /* try again */
-        if (len == sizeof(bytes))
-            continue; /* more to read */
-        break;
-    }
-
-    posix_aio_process_queue(s);
 }
 
 static int posix_aio_flush(void *opaque)
@@ -498,18 +532,14 @@ static int posix_aio_flush(void *opaque)
 
 static PosixAioState *posix_aio_state;
 
-static void aio_signal_handler(int signum)
+static void posix_aio_notify_event(void)
 {
-    if (posix_aio_state) {
-        char byte = 0;
-        ssize_t ret;
-
-        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
-        if (ret < 0 && errno != EAGAIN)
-            die("write()");
-    }
+    char byte = 0;
+    ssize_t ret;
 
-    qemu_service_io();
+    ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+    if (ret < 0 && errno != EAGAIN)
+        die("write()");
 }
 
 static void paio_remove(struct qemu_paiocb *acb)
@@ -536,6 +566,8 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
+    trace_paio_cancel(acb, acb->common.opaque);
+
     mutex_lock(&lock);
     if (!acb->active) {
         QTAILQ_REMOVE(&request_list, acb, node);
@@ -567,12 +599,8 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
     struct qemu_paiocb *acb;
 
     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
-    if (!acb)
-        return NULL;
     acb->aio_type = type;
     acb->aio_fildes = fd;
-    acb->ev_signo = SIGUSR2;
-    acb->async_context_id = get_async_context_id();
 
     if (qiov) {
         acb->aio_iov = qiov->iov;
@@ -596,12 +624,8 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
     struct qemu_paiocb *acb;
 
     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
-    if (!acb)
-        return NULL;
     acb->aio_type = QEMU_AIO_IOCTL;
     acb->aio_fildes = fd;
-    acb->ev_signo = SIGUSR2;
-    acb->async_context_id = get_async_context_id();
     acb->aio_offset = 0;
     acb->aio_ioctl_buf = buf;
     acb->aio_ioctl_cmd = req;
@@ -615,7 +639,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 
 int paio_init(void)
 {
-    struct sigaction act;
     PosixAioState *s;
     int fds[2];
     int ret;
@@ -623,16 +646,12 @@ int paio_init(void)
     if (posix_aio_state)
         return 0;
 
-    s = qemu_malloc(sizeof(PosixAioState));
-
-    sigfillset(&act.sa_mask);
-    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
-    act.sa_handler = aio_signal_handler;
-    sigaction(SIGUSR2, &act, NULL);
+    s = g_malloc(sizeof(PosixAioState));
 
     s->first_aio = NULL;
     if (qemu_pipe(fds) == -1) {
         fprintf(stderr, "failed to create pipe\n");
+        g_free(s);
         return -1;
     }
 
@@ -642,8 +661,7 @@ int paio_init(void)
     fcntl(s->rfd, F_SETFL, O_NONBLOCK);
     fcntl(s->wfd, F_SETFL, O_NONBLOCK);
 
-    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
-        posix_aio_process_queue, s);
+    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
 
     ret = pthread_attr_init(&attr);
     if (ret)
@@ -654,6 +672,7 @@ int paio_init(void)
         die2(ret, "pthread_attr_setdetachstate");
 
     QTAILQ_INIT(&request_list);
+    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
 
     posix_aio_state = s;
     return 0;