]> git.proxmox.com Git - qemu.git/blobdiff - posix-aio-compat.c
list MST as pci layer maintainer
[qemu.git] / posix-aio-compat.c
index 498cc1f4cbdf1afe562df546a4f077e96a6efc56..dc14f5355d47f0bb12c00eac697431d8c60ad51e 100644 (file)
@@ -22,7 +22,7 @@
 #include <stdlib.h>
 #include <stdio.h>
 
-#include "sys-queue.h"
+#include "qemu-queue.h"
 #include "osdep.h"
 #include "qemu-common.h"
 #include "block_int.h"
@@ -43,11 +43,13 @@ struct qemu_paiocb {
     int ev_signo;
     off_t aio_offset;
 
-    TAILQ_ENTRY(qemu_paiocb) node;
+    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
     int active;
     struct qemu_paiocb *next;
+
+    int async_context_id;
 };
 
 typedef struct PosixAioState {
@@ -63,7 +65,7 @@ static pthread_attr_t attr;
 static int max_threads = 64;
 static int cur_threads = 0;
 static int idle_threads = 0;
-static TAILQ_HEAD(, qemu_paiocb) request_list;
+static QTAILQ_HEAD(, qemu_paiocb) request_list;
 
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
@@ -115,7 +117,7 @@ static void thread_create(pthread_t *thread, pthread_attr_t *attr,
     if (ret) die2(ret, "pthread_create");
 }
 
-static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
        int ret;
 
@@ -134,11 +136,11 @@ static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
        return aiocb->aio_nbytes;
 }
 
-static size_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
 {
     int ret;
 
-    ret = fdatasync(aiocb->aio_fildes);
+    ret = qemu_fdatasync(aiocb->aio_fildes);
     if (ret == -1)
         return -errno;
     return 0;
@@ -174,7 +176,7 @@ qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 
 #endif
 
-static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
 {
     size_t offset = 0;
     ssize_t len;
@@ -197,10 +199,10 @@ static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
     return len;
 }
 
-static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
+static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
 {
-    size_t offset = 0;
-    size_t len;
+    ssize_t offset = 0;
+    ssize_t len;
 
     while (offset < aiocb->aio_nbytes) {
          if (aiocb->aio_type & QEMU_AIO_WRITE)
@@ -228,9 +230,9 @@ static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
     return offset;
 }
 
-static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
 {
-    size_t nbytes;
+    ssize_t nbytes;
     char *buf;
 
     if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
@@ -301,17 +303,12 @@ static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
 static void *aio_thread(void *unused)
 {
     pid_t pid;
-    sigset_t set;
 
     pid = getpid();
 
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
-
     while (1) {
         struct qemu_paiocb *aiocb;
-        size_t ret = 0;
+        ssize_t ret = 0;
         qemu_timeval tv;
         struct timespec ts;
 
@@ -321,16 +318,16 @@ static void *aio_thread(void *unused)
 
         mutex_lock(&lock);
 
-        while (TAILQ_EMPTY(&request_list) &&
+        while (QTAILQ_EMPTY(&request_list) &&
                !(ret == ETIMEDOUT)) {
             ret = cond_timedwait(&cond, &lock, &ts);
         }
 
-        if (TAILQ_EMPTY(&request_list))
+        if (QTAILQ_EMPTY(&request_list))
             break;
 
-        aiocb = TAILQ_FIRST(&request_list);
-        TAILQ_REMOVE(&request_list, aiocb, node);
+        aiocb = QTAILQ_FIRST(&request_list);
+        QTAILQ_REMOVE(&request_list, aiocb, node);
         aiocb->active = 1;
         idle_threads--;
         mutex_unlock(&lock);
@@ -369,9 +366,18 @@ static void *aio_thread(void *unused)
 
 static void spawn_thread(void)
 {
+    sigset_t set, oldset;
+
     cur_threads++;
     idle_threads++;
+
+    /* block all signals */
+    if (sigfillset(&set)) die("sigfillset");
+    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
+
     thread_create(&thread_id, &attr, aio_thread, NULL);
+
+    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
@@ -381,7 +387,7 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
     mutex_lock(&lock);
     if (idle_threads == 0 && cur_threads < max_threads)
         spawn_thread();
-    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
+    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
     mutex_unlock(&lock);
     cond_signal(&cond);
 }
@@ -409,36 +415,33 @@ static int qemu_paio_error(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-static void posix_aio_read(void *opaque)
+static int posix_aio_process_queue(void *opaque)
 {
     PosixAioState *s = opaque;
     struct qemu_paiocb *acb, **pacb;
     int ret;
-    ssize_t len;
-
-    /* read all bytes from signal pipe */
-    for (;;) {
-        char bytes[16];
-
-        len = read(s->rfd, bytes, sizeof(bytes));
-        if (len == -1 && errno == EINTR)
-            continue; /* try again */
-        if (len == sizeof(bytes))
-            continue; /* more to read */
-        break;
-    }
+    int result = 0;
+    int async_context_id = get_async_context_id();
 
     for(;;) {
         pacb = &s->first_aio;
         for(;;) {
             acb = *pacb;
             if (!acb)
-                goto the_end;
+                return result;
+
+            /* we're only interested in requests in the right context */
+            if (acb->async_context_id != async_context_id) {
+                pacb = &acb->next;
+                continue;
+            }
+
             ret = qemu_paio_error(acb);
             if (ret == ECANCELED) {
                 /* remove the request */
                 *pacb = acb->next;
                 qemu_aio_release(acb);
+                result = 1;
             } else if (ret != EINPROGRESS) {
                 /* end of aio */
                 if (ret == 0) {
@@ -455,13 +458,35 @@ static void posix_aio_read(void *opaque)
                 /* call the callback */
                 acb->common.cb(acb->common.opaque, ret);
                 qemu_aio_release(acb);
+                result = 1;
                 break;
             } else {
                 pacb = &acb->next;
             }
         }
     }
- the_end: ;
+
+    return result;
+}
+
+static void posix_aio_read(void *opaque)
+{
+    PosixAioState *s = opaque;
+    ssize_t len;
+
+    /* read all bytes from signal pipe */
+    for (;;) {
+        char bytes[16];
+
+        len = read(s->rfd, bytes, sizeof(bytes));
+        if (len == -1 && errno == EINTR)
+            continue; /* try again */
+        if (len == sizeof(bytes))
+            continue; /* more to read */
+        break;
+    }
+
+    posix_aio_process_queue(s);
 }
 
 static int posix_aio_flush(void *opaque)
@@ -509,7 +534,7 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
 
     mutex_lock(&lock);
     if (!acb->active) {
-        TAILQ_REMOVE(&request_list, acb, node);
+        QTAILQ_REMOVE(&request_list, acb, node);
         acb->ret = -ECANCELED;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
@@ -531,7 +556,7 @@ static AIOPool raw_aio_pool = {
     .cancel             = paio_cancel,
 };
 
-BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
+BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
         BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
@@ -543,6 +568,8 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
     acb->aio_type = type;
     acb->aio_fildes = fd;
     acb->ev_signo = SIGUSR2;
+    acb->async_context_id = get_async_context_id();
+
     if (qiov) {
         acb->aio_iov = qiov->iov;
         acb->aio_niov = qiov->niov;
@@ -580,7 +607,7 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
     return &acb->common;
 }
 
-void *paio_init(void)
+int paio_init(void)
 {
     struct sigaction act;
     PosixAioState *s;
@@ -588,7 +615,7 @@ void *paio_init(void)
     int ret;
 
     if (posix_aio_state)
-        return posix_aio_state;
+        return 0;
 
     s = qemu_malloc(sizeof(PosixAioState));
 
@@ -598,9 +625,9 @@ void *paio_init(void)
     sigaction(SIGUSR2, &act, NULL);
 
     s->first_aio = NULL;
-    if (pipe(fds) == -1) {
+    if (qemu_pipe(fds) == -1) {
         fprintf(stderr, "failed to create pipe\n");
-        return NULL;
+        return -1;
     }
 
     s->rfd = fds[0];
@@ -609,7 +636,8 @@ void *paio_init(void)
     fcntl(s->rfd, F_SETFL, O_NONBLOCK);
     fcntl(s->wfd, F_SETFL, O_NONBLOCK);
 
-    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
+    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
+        posix_aio_process_queue, s);
 
     ret = pthread_attr_init(&attr);
     if (ret)
@@ -619,9 +647,8 @@ void *paio_init(void)
     if (ret)
         die2(ret, "pthread_attr_setdetachstate");
 
-    TAILQ_INIT(&request_list);
+    QTAILQ_INIT(&request_list);
 
     posix_aio_state = s;
-
-    return posix_aio_state;
+    return 0;
 }