]> git.proxmox.com Git - qemu.git/blob - posix-aio-compat.c
qemu: mutex/thread/cond wrappers and configure tweaks (Marcelo Tosatti)
[qemu.git] / posix-aio-compat.c
1 /*
2 * QEMU posix-aio emulation
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
12 */
13
14 #include <sys/ioctl.h>
15 #include <pthread.h>
16 #include <unistd.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include "osdep.h"
23 #include "qemu-common.h"
24
25 #include "posix-aio-compat.h"
26
27 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
29 static pthread_t thread_id;
30 static pthread_attr_t attr;
31 static int max_threads = 64;
32 static int cur_threads = 0;
33 static int idle_threads = 0;
34 static TAILQ_HEAD(, qemu_paiocb) request_list;
35
36 #ifdef HAVE_PREADV
37 static int preadv_present = 1;
38 #else
39 static int preadv_present = 0;
40 #endif
41
42 static void die2(int err, const char *what)
43 {
44 fprintf(stderr, "%s failed: %s\n", what, strerror(err));
45 abort();
46 }
47
48 static void die(const char *what)
49 {
50 die2(errno, what);
51 }
52
53 static void mutex_lock(pthread_mutex_t *mutex)
54 {
55 int ret = pthread_mutex_lock(mutex);
56 if (ret) die2(ret, "pthread_mutex_lock");
57 }
58
59 static void mutex_unlock(pthread_mutex_t *mutex)
60 {
61 int ret = pthread_mutex_unlock(mutex);
62 if (ret) die2(ret, "pthread_mutex_unlock");
63 }
64
65 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
66 struct timespec *ts)
67 {
68 int ret = pthread_cond_timedwait(cond, mutex, ts);
69 if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
70 return ret;
71 }
72
73 static void cond_signal(pthread_cond_t *cond)
74 {
75 int ret = pthread_cond_signal(cond);
76 if (ret) die2(ret, "pthread_cond_signal");
77 }
78
79 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
80 void *(*start_routine)(void*), void *arg)
81 {
82 int ret = pthread_create(thread, attr, start_routine, arg);
83 if (ret) die2(ret, "pthread_create");
84 }
85
86 static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
87 {
88 int ret;
89
90 ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
91 if (ret == -1)
92 return -errno;
93 return ret;
94 }
95
96 #ifdef HAVE_PREADV
97
98 static ssize_t
99 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
100 {
101 return preadv(fd, iov, nr_iov, offset);
102 }
103
104 static ssize_t
105 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
106 {
107 return pwritev(fd, iov, nr_iov, offset);
108 }
109
110 #else
111
112 static ssize_t
113 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
114 {
115 return -ENOSYS;
116 }
117
118 static ssize_t
119 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
120 {
121 return -ENOSYS;
122 }
123
124 #endif
125
126 /*
127 * Check if we need to copy the data in the aiocb into a new
128 * properly aligned buffer.
129 */
130 static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
131 {
132 if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
133 int i;
134
135 for (i = 0; i < aiocb->aio_niov; i++)
136 if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
137 return 1;
138 }
139
140 return 0;
141 }
142
143 static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
144 {
145 size_t offset = 0;
146 ssize_t len;
147
148 do {
149 if (aiocb->aio_type == QEMU_PAIO_WRITE)
150 len = qemu_pwritev(aiocb->aio_fildes,
151 aiocb->aio_iov,
152 aiocb->aio_niov,
153 aiocb->aio_offset + offset);
154 else
155 len = qemu_preadv(aiocb->aio_fildes,
156 aiocb->aio_iov,
157 aiocb->aio_niov,
158 aiocb->aio_offset + offset);
159 } while (len == -1 && errno == EINTR);
160
161 if (len == -1)
162 return -errno;
163 return len;
164 }
165
166 static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
167 {
168 size_t offset = 0;
169 size_t len;
170
171 while (offset < aiocb->aio_nbytes) {
172 if (aiocb->aio_type == QEMU_PAIO_WRITE)
173 len = pwrite(aiocb->aio_fildes,
174 (const char *)buf + offset,
175 aiocb->aio_nbytes - offset,
176 aiocb->aio_offset + offset);
177 else
178 len = pread(aiocb->aio_fildes,
179 buf + offset,
180 aiocb->aio_nbytes - offset,
181 aiocb->aio_offset + offset);
182
183 if (len == -1 && errno == EINTR)
184 continue;
185 else if (len == -1) {
186 offset = -errno;
187 break;
188 } else if (len == 0)
189 break;
190
191 offset += len;
192 }
193
194 return offset;
195 }
196
197 static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
198 {
199 size_t nbytes;
200 char *buf;
201
202 if (!aiocb_needs_copy(aiocb)) {
203 /*
204 * If there is just a single buffer, and it is properly aligned
205 * we can just use plain pread/pwrite without any problems.
206 */
207 if (aiocb->aio_niov == 1)
208 return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
209
210 /*
211 * We have more than one iovec, and all are properly aligned.
212 *
213 * Try preadv/pwritev first and fall back to linearizing the
214 * buffer if it's not supported.
215 */
216 if (preadv_present) {
217 nbytes = handle_aiocb_rw_vector(aiocb);
218 if (nbytes == aiocb->aio_nbytes)
219 return nbytes;
220 if (nbytes < 0 && nbytes != -ENOSYS)
221 return nbytes;
222 preadv_present = 0;
223 }
224
225 /*
226 * XXX(hch): short read/write. no easy way to handle the reminder
227 * using these interfaces. For now retry using plain
228 * pread/pwrite?
229 */
230 }
231
232 /*
233 * Ok, we have to do it the hard way, copy all segments into
234 * a single aligned buffer.
235 */
236 buf = qemu_memalign(512, aiocb->aio_nbytes);
237 if (aiocb->aio_type == QEMU_PAIO_WRITE) {
238 char *p = buf;
239 int i;
240
241 for (i = 0; i < aiocb->aio_niov; ++i) {
242 memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
243 p += aiocb->aio_iov[i].iov_len;
244 }
245 }
246
247 nbytes = handle_aiocb_rw_linear(aiocb, buf);
248 if (aiocb->aio_type != QEMU_PAIO_WRITE) {
249 char *p = buf;
250 size_t count = aiocb->aio_nbytes, copy;
251 int i;
252
253 for (i = 0; i < aiocb->aio_niov && count; ++i) {
254 copy = count;
255 if (copy > aiocb->aio_iov[i].iov_len)
256 copy = aiocb->aio_iov[i].iov_len;
257 memcpy(aiocb->aio_iov[i].iov_base, p, copy);
258 p += copy;
259 count -= copy;
260 }
261 }
262 qemu_vfree(buf);
263
264 return nbytes;
265 }
266
267 static void *aio_thread(void *unused)
268 {
269 pid_t pid;
270 sigset_t set;
271
272 pid = getpid();
273
274 /* block all signals */
275 if (sigfillset(&set)) die("sigfillset");
276 if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
277
278 while (1) {
279 struct qemu_paiocb *aiocb;
280 size_t ret = 0;
281 qemu_timeval tv;
282 struct timespec ts;
283
284 qemu_gettimeofday(&tv);
285 ts.tv_sec = tv.tv_sec + 10;
286 ts.tv_nsec = 0;
287
288 mutex_lock(&lock);
289
290 while (TAILQ_EMPTY(&request_list) &&
291 !(ret == ETIMEDOUT)) {
292 ret = cond_timedwait(&cond, &lock, &ts);
293 }
294
295 if (TAILQ_EMPTY(&request_list))
296 break;
297
298 aiocb = TAILQ_FIRST(&request_list);
299 TAILQ_REMOVE(&request_list, aiocb, node);
300 aiocb->active = 1;
301 idle_threads--;
302 mutex_unlock(&lock);
303
304 switch (aiocb->aio_type) {
305 case QEMU_PAIO_READ:
306 case QEMU_PAIO_WRITE:
307 ret = handle_aiocb_rw(aiocb);
308 break;
309 case QEMU_PAIO_IOCTL:
310 ret = handle_aiocb_ioctl(aiocb);
311 break;
312 default:
313 fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
314 ret = -EINVAL;
315 break;
316 }
317
318 mutex_lock(&lock);
319 aiocb->ret = ret;
320 idle_threads++;
321 mutex_unlock(&lock);
322
323 if (kill(pid, aiocb->ev_signo)) die("kill failed");
324 }
325
326 idle_threads--;
327 cur_threads--;
328 mutex_unlock(&lock);
329
330 return NULL;
331 }
332
333 static void spawn_thread(void)
334 {
335 cur_threads++;
336 idle_threads++;
337 thread_create(&thread_id, &attr, aio_thread, NULL);
338 }
339
340 int qemu_paio_init(struct qemu_paioinit *aioinit)
341 {
342 int ret;
343
344 ret = pthread_attr_init(&attr);
345 if (ret) die2(ret, "pthread_attr_init");
346
347 ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
348 if (ret) die2(ret, "pthread_attr_setdetachstate");
349
350 TAILQ_INIT(&request_list);
351
352 return 0;
353 }
354
355 static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
356 {
357 aiocb->aio_type = type;
358 aiocb->ret = -EINPROGRESS;
359 aiocb->active = 0;
360 mutex_lock(&lock);
361 if (idle_threads == 0 && cur_threads < max_threads)
362 spawn_thread();
363 TAILQ_INSERT_TAIL(&request_list, aiocb, node);
364 mutex_unlock(&lock);
365 cond_signal(&cond);
366
367 return 0;
368 }
369
370 int qemu_paio_read(struct qemu_paiocb *aiocb)
371 {
372 return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
373 }
374
375 int qemu_paio_write(struct qemu_paiocb *aiocb)
376 {
377 return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
378 }
379
380 int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
381 {
382 return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
383 }
384
385 ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
386 {
387 ssize_t ret;
388
389 mutex_lock(&lock);
390 ret = aiocb->ret;
391 mutex_unlock(&lock);
392
393 return ret;
394 }
395
396 int qemu_paio_error(struct qemu_paiocb *aiocb)
397 {
398 ssize_t ret = qemu_paio_return(aiocb);
399
400 if (ret < 0)
401 ret = -ret;
402 else
403 ret = 0;
404
405 return ret;
406 }
407
408 int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
409 {
410 int ret;
411
412 mutex_lock(&lock);
413 if (!aiocb->active) {
414 TAILQ_REMOVE(&request_list, aiocb, node);
415 aiocb->ret = -ECANCELED;
416 ret = QEMU_PAIO_CANCELED;
417 } else if (aiocb->ret == -EINPROGRESS)
418 ret = QEMU_PAIO_NOTCANCELED;
419 else
420 ret = QEMU_PAIO_ALLDONE;
421 mutex_unlock(&lock);
422
423 return ret;
424 }