]> git.proxmox.com Git - qemu.git/blob - posix-aio-compat.c
Avoid thundering herd problem
[qemu.git] / posix-aio-compat.c
1 /*
2 * QEMU posix-aio emulation
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
12 */
13
14 #include <pthread.h>
15 #include <unistd.h>
16 #include <errno.h>
17 #include <time.h>
18 #include <string.h>
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include "osdep.h"
22
23 #include "posix-aio-compat.h"
24
25 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
26 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
27 static pthread_t thread_id;
28 static int max_threads = 64;
29 static int cur_threads = 0;
30 static int idle_threads = 0;
31 static TAILQ_HEAD(, qemu_paiocb) request_list;
32
33 static void die2(int err, const char *what)
34 {
35 fprintf(stderr, "%s failed: %s\n", what, strerror(err));
36 abort();
37 }
38
39 static void die(const char *what)
40 {
41 die2(errno, what);
42 }
43
44 static void mutex_lock(pthread_mutex_t *mutex)
45 {
46 int ret = pthread_mutex_lock(mutex);
47 if (ret) die2(ret, "pthread_mutex_lock");
48 }
49
50 static void mutex_unlock(pthread_mutex_t *mutex)
51 {
52 int ret = pthread_mutex_unlock(mutex);
53 if (ret) die2(ret, "pthread_mutex_unlock");
54 }
55
56 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
57 struct timespec *ts)
58 {
59 int ret = pthread_cond_timedwait(cond, mutex, ts);
60 if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
61 return ret;
62 }
63
64 static void cond_signal(pthread_cond_t *cond)
65 {
66 int ret = pthread_cond_signal(cond);
67 if (ret) die2(ret, "pthread_cond_signal");
68 }
69
70 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
71 void *(*start_routine)(void*), void *arg)
72 {
73 int ret = pthread_create(thread, attr, start_routine, arg);
74 if (ret) die2(ret, "pthread_create");
75 }
76
77 static void *aio_thread(void *unused)
78 {
79 sigset_t set;
80
81 /* block all signals */
82 if (sigfillset(&set)) die("sigfillset");
83 if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
84
85 while (1) {
86 struct qemu_paiocb *aiocb;
87 size_t offset;
88 int ret = 0;
89 qemu_timeval tv;
90 struct timespec ts;
91
92 qemu_gettimeofday(&tv);
93 ts.tv_sec = tv.tv_sec + 10;
94 ts.tv_nsec = 0;
95
96 mutex_lock(&lock);
97
98 while (TAILQ_EMPTY(&request_list) &&
99 !(ret == ETIMEDOUT)) {
100 ret = cond_timedwait(&cond, &lock, &ts);
101 }
102
103 if (ret == ETIMEDOUT)
104 break;
105
106 aiocb = TAILQ_FIRST(&request_list);
107 TAILQ_REMOVE(&request_list, aiocb, node);
108
109 offset = 0;
110 aiocb->active = 1;
111
112 idle_threads--;
113 mutex_unlock(&lock);
114
115 while (offset < aiocb->aio_nbytes) {
116 ssize_t len;
117
118 if (aiocb->is_write)
119 len = pwrite(aiocb->aio_fildes,
120 (const char *)aiocb->aio_buf + offset,
121 aiocb->aio_nbytes - offset,
122 aiocb->aio_offset + offset);
123 else
124 len = pread(aiocb->aio_fildes,
125 (char *)aiocb->aio_buf + offset,
126 aiocb->aio_nbytes - offset,
127 aiocb->aio_offset + offset);
128
129 if (len == -1 && errno == EINTR)
130 continue;
131 else if (len == -1) {
132 offset = -errno;
133 break;
134 } else if (len == 0)
135 break;
136
137 offset += len;
138 }
139
140 mutex_lock(&lock);
141 aiocb->ret = offset;
142 idle_threads++;
143 mutex_unlock(&lock);
144
145 if (kill(getpid(), aiocb->ev_signo)) die("kill failed");
146 }
147
148 idle_threads--;
149 cur_threads--;
150 mutex_unlock(&lock);
151
152 return NULL;
153 }
154
155 static void spawn_thread(void)
156 {
157 int ret;
158 pthread_attr_t attr;
159
160 cur_threads++;
161 idle_threads++;
162
163 ret = pthread_attr_init(&attr);
164 if (ret) die2 (ret, "pthread_attr_init");
165 ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
166 if (ret) die2 (ret, "pthread_attr_setdetachstate");
167 thread_create(&thread_id, &attr, aio_thread, NULL);
168 ret = pthread_attr_destroy(&attr);
169 if (ret) die2 (ret, "pthread_attr_destroy");
170 }
171
172 int qemu_paio_init(struct qemu_paioinit *aioinit)
173 {
174 TAILQ_INIT(&request_list);
175
176 return 0;
177 }
178
179 static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
180 {
181 aiocb->is_write = is_write;
182 aiocb->ret = -EINPROGRESS;
183 aiocb->active = 0;
184 mutex_lock(&lock);
185 if (idle_threads == 0 && cur_threads < max_threads)
186 spawn_thread();
187 TAILQ_INSERT_TAIL(&request_list, aiocb, node);
188 mutex_unlock(&lock);
189 cond_signal(&cond);
190
191 return 0;
192 }
193
194 int qemu_paio_read(struct qemu_paiocb *aiocb)
195 {
196 return qemu_paio_submit(aiocb, 0);
197 }
198
199 int qemu_paio_write(struct qemu_paiocb *aiocb)
200 {
201 return qemu_paio_submit(aiocb, 1);
202 }
203
204 ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
205 {
206 ssize_t ret;
207
208 mutex_lock(&lock);
209 ret = aiocb->ret;
210 mutex_unlock(&lock);
211
212 return ret;
213 }
214
215 int qemu_paio_error(struct qemu_paiocb *aiocb)
216 {
217 ssize_t ret = qemu_paio_return(aiocb);
218
219 if (ret < 0)
220 ret = -ret;
221 else
222 ret = 0;
223
224 return ret;
225 }
226
227 int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
228 {
229 int ret;
230
231 mutex_lock(&lock);
232 if (!aiocb->active) {
233 TAILQ_REMOVE(&request_list, aiocb, node);
234 aiocb->ret = -ECANCELED;
235 ret = QEMU_PAIO_CANCELED;
236 } else if (aiocb->ret == -EINPROGRESS)
237 ret = QEMU_PAIO_NOTCANCELED;
238 else
239 ret = QEMU_PAIO_ALLDONE;
240 mutex_unlock(&lock);
241
242 return ret;
243 }