X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=posix-aio-compat.c;h=c4116e30f2e2391762b5528cd46a179d53843ba0;hb=d25f89c9e91d6c46b85969922411a211a6347a7d;hp=5ea197f6686144dee61f9939a0fed30539827301;hpb=9ef91a677110ec200d7b2904fc4bcae5a77329ad;p=qemu.git diff --git a/posix-aio-compat.c b/posix-aio-compat.c index 5ea197f66..c4116e30f 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -17,14 +17,15 @@ #include #include #include -#include #include #include #include -#include "sys-queue.h" +#include "qemu-queue.h" #include "osdep.h" +#include "sysemu.h" #include "qemu-common.h" +#include "trace.h" #include "block_int.h" #include "block/raw-posix-aio.h" @@ -35,7 +36,7 @@ struct qemu_paiocb { int aio_fildes; union { struct iovec *aio_iov; - void *aio_ioctl_buf; + void *aio_ioctl_buf; }; int aio_niov; size_t aio_nbytes; @@ -43,11 +44,13 @@ struct qemu_paiocb { int ev_signo; off_t aio_offset; - TAILQ_ENTRY(qemu_paiocb) node; + QTAILQ_ENTRY(qemu_paiocb) node; int aio_type; ssize_t ret; int active; struct qemu_paiocb *next; + + int async_context_id; }; typedef struct PosixAioState { @@ -63,7 +66,7 @@ static pthread_attr_t attr; static int max_threads = 64; static int cur_threads = 0; static int idle_threads = 0; -static TAILQ_HEAD(, qemu_paiocb) request_list; +static QTAILQ_HEAD(, qemu_paiocb) request_list; #ifdef CONFIG_PREADV static int preadv_present = 1; @@ -115,23 +118,33 @@ static void thread_create(pthread_t *thread, pthread_attr_t *attr, if (ret) die2(ret, "pthread_create"); } -static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb) +static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb) { - int ret; - - ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf); - if (ret == -1) - return -errno; - - /* - * This looks weird, but the aio code only consideres a request - * successfull if it has written the number full number of bytes. - * - * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command, - * so in fact we return the ioctl command here to make posix_aio_read() - * happy.. - */ - return aiocb->aio_nbytes; + int ret; + + ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf); + if (ret == -1) + return -errno; + + /* + * This looks weird, but the aio code only consideres a request + * successful if it has written the number full number of bytes. + * + * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command, + * so in fact we return the ioctl command here to make posix_aio_read() + * happy.. + */ + return aiocb->aio_nbytes; +} + +static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb) +{ + int ret; + + ret = qemu_fdatasync(aiocb->aio_fildes); + if (ret == -1) + return -errno; + return 0; } #ifdef CONFIG_PREADV @@ -164,7 +177,7 @@ qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset) #endif -static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb) +static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb) { size_t offset = 0; ssize_t len; @@ -187,10 +200,10 @@ static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb) return len; } -static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf) +static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf) { - size_t offset = 0; - size_t len; + ssize_t offset = 0; + ssize_t len; while (offset < aiocb->aio_nbytes) { if (aiocb->aio_type & QEMU_AIO_WRITE) @@ -218,9 +231,9 @@ static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf) return offset; } -static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb) +static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) { - size_t nbytes; + ssize_t nbytes; char *buf; if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) { @@ -237,10 +250,10 @@ static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb) * Try preadv/pwritev first and fall back to linearizing the * buffer if it's not supported. */ - if (preadv_present) { + if (preadv_present) { nbytes = handle_aiocb_rw_vector(aiocb); if (nbytes == aiocb->aio_nbytes) - return nbytes; + return nbytes; if (nbytes < 0 && nbytes != -ENOSYS) return nbytes; preadv_present = 0; @@ -257,7 +270,7 @@ static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb) * Ok, we have to do it the hard way, copy all segments into * a single aligned buffer. */ - buf = qemu_memalign(512, aiocb->aio_nbytes); + buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes); if (aiocb->aio_type & QEMU_AIO_WRITE) { char *p = buf; int i; @@ -291,17 +304,12 @@ static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb) static void *aio_thread(void *unused) { pid_t pid; - sigset_t set; pid = getpid(); - /* block all signals */ - if (sigfillset(&set)) die("sigfillset"); - if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask"); - while (1) { struct qemu_paiocb *aiocb; - size_t ret = 0; + ssize_t ret = 0; qemu_timeval tv; struct timespec ts; @@ -311,43 +319,45 @@ static void *aio_thread(void *unused) mutex_lock(&lock); - while (TAILQ_EMPTY(&request_list) && + while (QTAILQ_EMPTY(&request_list) && !(ret == ETIMEDOUT)) { + idle_threads++; ret = cond_timedwait(&cond, &lock, &ts); + idle_threads--; } - if (TAILQ_EMPTY(&request_list)) + if (QTAILQ_EMPTY(&request_list)) break; - aiocb = TAILQ_FIRST(&request_list); - TAILQ_REMOVE(&request_list, aiocb, node); + aiocb = QTAILQ_FIRST(&request_list); + QTAILQ_REMOVE(&request_list, aiocb, node); aiocb->active = 1; - idle_threads--; mutex_unlock(&lock); switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { case QEMU_AIO_READ: case QEMU_AIO_WRITE: - ret = handle_aiocb_rw(aiocb); - break; + ret = handle_aiocb_rw(aiocb); + break; + case QEMU_AIO_FLUSH: + ret = handle_aiocb_flush(aiocb); + break; case QEMU_AIO_IOCTL: - ret = handle_aiocb_ioctl(aiocb); - break; - default: - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); - ret = -EINVAL; - break; - } + ret = handle_aiocb_ioctl(aiocb); + break; + default: + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); + ret = -EINVAL; + break; + } mutex_lock(&lock); aiocb->ret = ret; - idle_threads++; mutex_unlock(&lock); if (kill(pid, aiocb->ev_signo)) die("kill failed"); } - idle_threads--; cur_threads--; mutex_unlock(&lock); @@ -356,9 +366,17 @@ static void *aio_thread(void *unused) static void spawn_thread(void) { + sigset_t set, oldset; + cur_threads++; - idle_threads++; + + /* block all signals */ + if (sigfillset(&set)) die("sigfillset"); + if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask"); + thread_create(&thread_id, &attr, aio_thread, NULL); + + if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore"); } static void qemu_paio_submit(struct qemu_paiocb *aiocb) @@ -368,7 +386,7 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb) mutex_lock(&lock); if (idle_threads == 0 && cur_threads < max_threads) spawn_thread(); - TAILQ_INSERT_TAIL(&request_list, aiocb, node); + QTAILQ_INSERT_TAIL(&request_list, aiocb, node); mutex_unlock(&lock); cond_signal(&cond); } @@ -396,36 +414,33 @@ static int qemu_paio_error(struct qemu_paiocb *aiocb) return ret; } -static void posix_aio_read(void *opaque) +static int posix_aio_process_queue(void *opaque) { PosixAioState *s = opaque; struct qemu_paiocb *acb, **pacb; int ret; - ssize_t len; - - /* read all bytes from signal pipe */ - for (;;) { - char bytes[16]; - - len = read(s->rfd, bytes, sizeof(bytes)); - if (len == -1 && errno == EINTR) - continue; /* try again */ - if (len == sizeof(bytes)) - continue; /* more to read */ - break; - } + int result = 0; + int async_context_id = get_async_context_id(); for(;;) { pacb = &s->first_aio; for(;;) { acb = *pacb; if (!acb) - goto the_end; + return result; + + /* we're only interested in requests in the right context */ + if (acb->async_context_id != async_context_id) { + pacb = &acb->next; + continue; + } + ret = qemu_paio_error(acb); if (ret == ECANCELED) { /* remove the request */ *pacb = acb->next; qemu_aio_release(acb); + result = 1; } else if (ret != EINPROGRESS) { /* end of aio */ if (ret == 0) { @@ -437,18 +452,43 @@ static void posix_aio_read(void *opaque) } else { ret = -ret; } + + trace_paio_complete(acb, acb->common.opaque, ret); + /* remove the request */ *pacb = acb->next; /* call the callback */ acb->common.cb(acb->common.opaque, ret); qemu_aio_release(acb); + result = 1; break; } else { pacb = &acb->next; } } } - the_end: ; + + return result; +} + +static void posix_aio_read(void *opaque) +{ + PosixAioState *s = opaque; + ssize_t len; + + /* read all bytes from signal pipe */ + for (;;) { + char bytes[16]; + + len = read(s->rfd, bytes, sizeof(bytes)); + if (len == -1 && errno == EINTR) + continue; /* try again */ + if (len == sizeof(bytes)) + continue; /* more to read */ + break; + } + + posix_aio_process_queue(s); } static int posix_aio_flush(void *opaque) @@ -463,8 +503,11 @@ static void aio_signal_handler(int signum) { if (posix_aio_state) { char byte = 0; + ssize_t ret; - write(posix_aio_state->wfd, &byte, sizeof(byte)); + ret = write(posix_aio_state->wfd, &byte, sizeof(byte)); + if (ret < 0 && errno != EAGAIN) + die("write()"); } qemu_service_io(); @@ -494,9 +537,11 @@ static void paio_cancel(BlockDriverAIOCB *blockacb) struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; int active = 0; + trace_paio_cancel(acb, acb->common.opaque); + mutex_lock(&lock); if (!acb->active) { - TAILQ_REMOVE(&request_list, acb, node); + QTAILQ_REMOVE(&request_list, acb, node); acb->ret = -ECANCELED; } else if (acb->ret == -EINPROGRESS) { active = 1; @@ -518,7 +563,7 @@ static AIOPool raw_aio_pool = { .cancel = paio_cancel, }; -BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd, +BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockDriverCompletionFunc *cb, void *opaque, int type) { @@ -530,14 +575,19 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd, acb->aio_type = type; acb->aio_fildes = fd; acb->ev_signo = SIGUSR2; - acb->aio_iov = qiov->iov; - acb->aio_niov = qiov->niov; + acb->async_context_id = get_async_context_id(); + + if (qiov) { + acb->aio_iov = qiov->iov; + acb->aio_niov = qiov->niov; + } acb->aio_nbytes = nb_sectors * 512; acb->aio_offset = sector_num * 512; acb->next = posix_aio_state->first_aio; posix_aio_state->first_aio = acb; + trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); qemu_paio_submit(acb); return &acb->common; } @@ -554,6 +604,7 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd, acb->aio_type = QEMU_AIO_IOCTL; acb->aio_fildes = fd; acb->ev_signo = SIGUSR2; + acb->async_context_id = get_async_context_id(); acb->aio_offset = 0; acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; @@ -565,7 +616,7 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd, return &acb->common; } -void *paio_init(void) +int paio_init(void) { struct sigaction act; PosixAioState *s; @@ -573,7 +624,7 @@ void *paio_init(void) int ret; if (posix_aio_state) - return posix_aio_state; + return 0; s = qemu_malloc(sizeof(PosixAioState)); @@ -583,9 +634,9 @@ void *paio_init(void) sigaction(SIGUSR2, &act, NULL); s->first_aio = NULL; - if (pipe(fds) == -1) { + if (qemu_pipe(fds) == -1) { fprintf(stderr, "failed to create pipe\n"); - return NULL; + return -1; } s->rfd = fds[0]; @@ -594,7 +645,8 @@ void *paio_init(void) fcntl(s->rfd, F_SETFL, O_NONBLOCK); fcntl(s->wfd, F_SETFL, O_NONBLOCK); - qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s); + qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, + posix_aio_process_queue, s); ret = pthread_attr_init(&attr); if (ret) @@ -604,9 +656,8 @@ void *paio_init(void) if (ret) die2(ret, "pthread_attr_setdetachstate"); - TAILQ_INIT(&request_list); + QTAILQ_INIT(&request_list); posix_aio_state = s; - - return posix_aio_state; + return 0; }