]> git.proxmox.com Git - qemu.git/blob - posix-aio-compat.c
0eb77a50d3f79320f98cb4e69c5c6f47ed9001bf
[qemu.git] / posix-aio-compat.c
1 /*
2 * QEMU posix-aio emulation
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
12 */
13
14 #include <sys/ioctl.h>
15 #include <pthread.h>
16 #include <unistd.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include "osdep.h"
23 #include "qemu-common.h"
24
25 #include "posix-aio-compat.h"
26
27 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
29 static pthread_t thread_id;
30 static pthread_attr_t attr;
31 static int max_threads = 64;
32 static int cur_threads = 0;
33 static int idle_threads = 0;
34 static TAILQ_HEAD(, qemu_paiocb) request_list;
35
36 static void die2(int err, const char *what)
37 {
38 fprintf(stderr, "%s failed: %s\n", what, strerror(err));
39 abort();
40 }
41
42 static void die(const char *what)
43 {
44 die2(errno, what);
45 }
46
47 static void mutex_lock(pthread_mutex_t *mutex)
48 {
49 int ret = pthread_mutex_lock(mutex);
50 if (ret) die2(ret, "pthread_mutex_lock");
51 }
52
53 static void mutex_unlock(pthread_mutex_t *mutex)
54 {
55 int ret = pthread_mutex_unlock(mutex);
56 if (ret) die2(ret, "pthread_mutex_unlock");
57 }
58
59 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
60 struct timespec *ts)
61 {
62 int ret = pthread_cond_timedwait(cond, mutex, ts);
63 if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
64 return ret;
65 }
66
67 static void cond_signal(pthread_cond_t *cond)
68 {
69 int ret = pthread_cond_signal(cond);
70 if (ret) die2(ret, "pthread_cond_signal");
71 }
72
73 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
74 void *(*start_routine)(void*), void *arg)
75 {
76 int ret = pthread_create(thread, attr, start_routine, arg);
77 if (ret) die2(ret, "pthread_create");
78 }
79
80 static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
81 {
82 int ret;
83
84 ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
85 if (ret == -1)
86 return -errno;
87 return ret;
88 }
89
90 /*
91 * Check if we need to copy the data in the aiocb into a new
92 * properly aligned buffer.
93 */
94 static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
95 {
96 if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
97 int i;
98
99 for (i = 0; i < aiocb->aio_niov; i++)
100 if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
101 return 1;
102 }
103
104 return 0;
105 }
106
107 static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
108 {
109 size_t offset = 0;
110 size_t len;
111
112 while (offset < aiocb->aio_nbytes) {
113 if (aiocb->aio_type == QEMU_PAIO_WRITE)
114 len = pwrite(aiocb->aio_fildes,
115 (const char *)buf + offset,
116 aiocb->aio_nbytes - offset,
117 aiocb->aio_offset + offset);
118 else
119 len = pread(aiocb->aio_fildes,
120 buf + offset,
121 aiocb->aio_nbytes - offset,
122 aiocb->aio_offset + offset);
123
124 if (len == -1 && errno == EINTR)
125 continue;
126 else if (len == -1) {
127 offset = -errno;
128 break;
129 } else if (len == 0)
130 break;
131
132 offset += len;
133 }
134
135 return offset;
136 }
137
138 static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
139 {
140 size_t nbytes;
141 char *buf;
142
143 if (!aiocb_needs_copy(aiocb) && aiocb->aio_niov == 1) {
144 /*
145 * If there is just a single buffer, and it is properly aligned
146 * we can just use plain pread/pwrite without any problems.
147 */
148 return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
149 }
150
151 /*
152 * Ok, we have to do it the hard way, copy all segments into
153 * a single aligned buffer.
154 */
155 buf = qemu_memalign(512, aiocb->aio_nbytes);
156 if (aiocb->aio_type == QEMU_PAIO_WRITE) {
157 char *p = buf;
158 int i;
159
160 for (i = 0; i < aiocb->aio_niov; ++i) {
161 memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
162 p += aiocb->aio_iov[i].iov_len;
163 }
164 }
165
166 nbytes = handle_aiocb_rw_linear(aiocb, buf);
167 if (aiocb->aio_type != QEMU_PAIO_WRITE) {
168 char *p = buf;
169 size_t count = aiocb->aio_nbytes, copy;
170 int i;
171
172 for (i = 0; i < aiocb->aio_niov && count; ++i) {
173 copy = count;
174 if (copy > aiocb->aio_iov[i].iov_len)
175 copy = aiocb->aio_iov[i].iov_len;
176 memcpy(aiocb->aio_iov[i].iov_base, p, copy);
177 p += copy;
178 count -= copy;
179 }
180 }
181 qemu_vfree(buf);
182
183 return nbytes;
184 }
185
186 static void *aio_thread(void *unused)
187 {
188 pid_t pid;
189 sigset_t set;
190
191 pid = getpid();
192
193 /* block all signals */
194 if (sigfillset(&set)) die("sigfillset");
195 if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
196
197 while (1) {
198 struct qemu_paiocb *aiocb;
199 size_t ret = 0;
200 qemu_timeval tv;
201 struct timespec ts;
202
203 qemu_gettimeofday(&tv);
204 ts.tv_sec = tv.tv_sec + 10;
205 ts.tv_nsec = 0;
206
207 mutex_lock(&lock);
208
209 while (TAILQ_EMPTY(&request_list) &&
210 !(ret == ETIMEDOUT)) {
211 ret = cond_timedwait(&cond, &lock, &ts);
212 }
213
214 if (TAILQ_EMPTY(&request_list))
215 break;
216
217 aiocb = TAILQ_FIRST(&request_list);
218 TAILQ_REMOVE(&request_list, aiocb, node);
219 aiocb->active = 1;
220 idle_threads--;
221 mutex_unlock(&lock);
222
223 switch (aiocb->aio_type) {
224 case QEMU_PAIO_READ:
225 case QEMU_PAIO_WRITE:
226 ret = handle_aiocb_rw(aiocb);
227 break;
228 case QEMU_PAIO_IOCTL:
229 ret = handle_aiocb_ioctl(aiocb);
230 break;
231 default:
232 fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
233 ret = -EINVAL;
234 break;
235 }
236
237 mutex_lock(&lock);
238 aiocb->ret = ret;
239 idle_threads++;
240 mutex_unlock(&lock);
241
242 if (kill(pid, aiocb->ev_signo)) die("kill failed");
243 }
244
245 idle_threads--;
246 cur_threads--;
247 mutex_unlock(&lock);
248
249 return NULL;
250 }
251
252 static void spawn_thread(void)
253 {
254 cur_threads++;
255 idle_threads++;
256 thread_create(&thread_id, &attr, aio_thread, NULL);
257 }
258
259 int qemu_paio_init(struct qemu_paioinit *aioinit)
260 {
261 int ret;
262
263 ret = pthread_attr_init(&attr);
264 if (ret) die2(ret, "pthread_attr_init");
265
266 ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
267 if (ret) die2(ret, "pthread_attr_setdetachstate");
268
269 TAILQ_INIT(&request_list);
270
271 return 0;
272 }
273
274 static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
275 {
276 aiocb->aio_type = type;
277 aiocb->ret = -EINPROGRESS;
278 aiocb->active = 0;
279 mutex_lock(&lock);
280 if (idle_threads == 0 && cur_threads < max_threads)
281 spawn_thread();
282 TAILQ_INSERT_TAIL(&request_list, aiocb, node);
283 mutex_unlock(&lock);
284 cond_signal(&cond);
285
286 return 0;
287 }
288
289 int qemu_paio_read(struct qemu_paiocb *aiocb)
290 {
291 return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
292 }
293
294 int qemu_paio_write(struct qemu_paiocb *aiocb)
295 {
296 return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
297 }
298
299 int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
300 {
301 return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
302 }
303
304 ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
305 {
306 ssize_t ret;
307
308 mutex_lock(&lock);
309 ret = aiocb->ret;
310 mutex_unlock(&lock);
311
312 return ret;
313 }
314
315 int qemu_paio_error(struct qemu_paiocb *aiocb)
316 {
317 ssize_t ret = qemu_paio_return(aiocb);
318
319 if (ret < 0)
320 ret = -ret;
321 else
322 ret = 0;
323
324 return ret;
325 }
326
327 int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
328 {
329 int ret;
330
331 mutex_lock(&lock);
332 if (!aiocb->active) {
333 TAILQ_REMOVE(&request_list, aiocb, node);
334 aiocb->ret = -ECANCELED;
335 ret = QEMU_PAIO_CANCELED;
336 } else if (aiocb->ret == -EINPROGRESS)
337 ret = QEMU_PAIO_NOTCANCELED;
338 else
339 ret = QEMU_PAIO_ALLDONE;
340 mutex_unlock(&lock);
341
342 return ret;
343 }