]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Linux native AIO support. | |
3 | * | |
4 | * Copyright (C) 2009 IBM, Corp. | |
5 | * Copyright (C) 2009 Red Hat, Inc. | |
6 | * | |
7 | * This work is licensed under the terms of the GNU GPL, version 2 or later. | |
8 | * See the COPYING file in the top-level directory. | |
9 | */ | |
10 | #include "qemu/osdep.h" | |
11 | #include "block/aio.h" | |
12 | #include "qemu/queue.h" | |
13 | #include "block/block.h" | |
14 | #include "block/raw-aio.h" | |
15 | #include "qemu/event_notifier.h" | |
16 | #include "qemu/coroutine.h" | |
17 | #include "qapi/error.h" | |
18 | ||
19 | #include <libaio.h> | |
20 | ||
21 | /* | |
22 | * Queue size (per-device). | |
23 | * | |
24 | * XXX: eventually we need to communicate this to the guest and/or make it | |
25 | * tunable by the guest. If we get more outstanding requests at a time | |
26 | * than this we will get EAGAIN from io_submit which is communicated to | |
27 | * the guest as an I/O error. | |
28 | */ | |
29 | #define MAX_EVENTS 128 | |
30 | ||
31 | struct qemu_laiocb { | |
32 | Coroutine *co; | |
33 | LinuxAioState *ctx; | |
34 | struct iocb iocb; | |
35 | ssize_t ret; | |
36 | size_t nbytes; | |
37 | QEMUIOVector *qiov; | |
38 | bool is_read; | |
39 | QSIMPLEQ_ENTRY(qemu_laiocb) next; | |
40 | }; | |
41 | ||
42 | typedef struct { | |
43 | int plugged; | |
44 | unsigned int in_queue; | |
45 | unsigned int in_flight; | |
46 | bool blocked; | |
47 | QSIMPLEQ_HEAD(, qemu_laiocb) pending; | |
48 | } LaioQueue; | |
49 | ||
50 | struct LinuxAioState { | |
51 | AioContext *aio_context; | |
52 | ||
53 | io_context_t ctx; | |
54 | EventNotifier e; | |
55 | ||
56 | /* io queue for submit at batch. Protected by AioContext lock. */ | |
57 | LaioQueue io_q; | |
58 | ||
59 | /* I/O completion processing. Only runs in I/O thread. */ | |
60 | QEMUBH *completion_bh; | |
61 | int event_idx; | |
62 | int event_max; | |
63 | }; | |
64 | ||
65 | static void ioq_submit(LinuxAioState *s); | |
66 | ||
67 | static inline ssize_t io_event_ret(struct io_event *ev) | |
68 | { | |
69 | return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); | |
70 | } | |
71 | ||
72 | /* | |
73 | * Completes an AIO request. | |
74 | */ | |
75 | static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) | |
76 | { | |
77 | int ret; | |
78 | ||
79 | ret = laiocb->ret; | |
80 | if (ret != -ECANCELED) { | |
81 | if (ret == laiocb->nbytes) { | |
82 | ret = 0; | |
83 | } else if (ret >= 0) { | |
84 | /* Short reads mean EOF, pad with zeros. */ | |
85 | if (laiocb->is_read) { | |
86 | qemu_iovec_memset(laiocb->qiov, ret, 0, | |
87 | laiocb->qiov->size - ret); | |
88 | } else { | |
89 | ret = -ENOSPC; | |
90 | } | |
91 | } | |
92 | } | |
93 | ||
94 | laiocb->ret = ret; | |
95 | ||
96 | /* | |
97 | * If the coroutine is already entered it must be in ioq_submit() and | |
98 | * will notice laio->ret has been filled in when it eventually runs | |
99 | * later. Coroutines cannot be entered recursively so avoid doing | |
100 | * that! | |
101 | */ | |
102 | if (!qemu_coroutine_entered(laiocb->co)) { | |
103 | aio_co_wake(laiocb->co); | |
104 | } | |
105 | } | |
106 | ||
107 | /** | |
108 | * aio_ring buffer which is shared between userspace and kernel. | |
109 | * | |
110 | * This copied from linux/fs/aio.c, common header does not exist | |
111 | * but AIO exists for ages so we assume ABI is stable. | |
112 | */ | |
113 | struct aio_ring { | |
114 | unsigned id; /* kernel internal index number */ | |
115 | unsigned nr; /* number of io_events */ | |
116 | unsigned head; /* Written to by userland or by kernel. */ | |
117 | unsigned tail; | |
118 | ||
119 | unsigned magic; | |
120 | unsigned compat_features; | |
121 | unsigned incompat_features; | |
122 | unsigned header_length; /* size of aio_ring */ | |
123 | ||
124 | struct io_event io_events[0]; | |
125 | }; | |
126 | ||
127 | /** | |
128 | * io_getevents_peek: | |
129 | * @ctx: AIO context | |
130 | * @events: pointer on events array, output value | |
131 | ||
132 | * Returns the number of completed events and sets a pointer | |
133 | * on events array. This function does not update the internal | |
134 | * ring buffer, only reads head and tail. When @events has been | |
135 | * processed io_getevents_commit() must be called. | |
136 | */ | |
137 | static inline unsigned int io_getevents_peek(io_context_t ctx, | |
138 | struct io_event **events) | |
139 | { | |
140 | struct aio_ring *ring = (struct aio_ring *)ctx; | |
141 | unsigned int head = ring->head, tail = ring->tail; | |
142 | unsigned int nr; | |
143 | ||
144 | nr = tail >= head ? tail - head : ring->nr - head; | |
145 | *events = ring->io_events + head; | |
146 | /* To avoid speculative loads of s->events[i] before observing tail. | |
147 | Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */ | |
148 | smp_rmb(); | |
149 | ||
150 | return nr; | |
151 | } | |
152 | ||
153 | /** | |
154 | * io_getevents_commit: | |
155 | * @ctx: AIO context | |
156 | * @nr: the number of events on which head should be advanced | |
157 | * | |
158 | * Advances head of a ring buffer. | |
159 | */ | |
160 | static inline void io_getevents_commit(io_context_t ctx, unsigned int nr) | |
161 | { | |
162 | struct aio_ring *ring = (struct aio_ring *)ctx; | |
163 | ||
164 | if (nr) { | |
165 | ring->head = (ring->head + nr) % ring->nr; | |
166 | } | |
167 | } | |
168 | ||
169 | /** | |
170 | * io_getevents_advance_and_peek: | |
171 | * @ctx: AIO context | |
172 | * @events: pointer on events array, output value | |
173 | * @nr: the number of events on which head should be advanced | |
174 | * | |
175 | * Advances head of a ring buffer and returns number of elements left. | |
176 | */ | |
177 | static inline unsigned int | |
178 | io_getevents_advance_and_peek(io_context_t ctx, | |
179 | struct io_event **events, | |
180 | unsigned int nr) | |
181 | { | |
182 | io_getevents_commit(ctx, nr); | |
183 | return io_getevents_peek(ctx, events); | |
184 | } | |
185 | ||
186 | /** | |
187 | * qemu_laio_process_completions: | |
188 | * @s: AIO state | |
189 | * | |
190 | * Fetches completed I/O requests and invokes their callbacks. | |
191 | * | |
192 | * The function is somewhat tricky because it supports nested event loops, for | |
193 | * example when a request callback invokes aio_poll(). In order to do this, | |
194 | * indices are kept in LinuxAioState. Function schedules BH completion so it | |
195 | * can be called again in a nested event loop. When there are no events left | |
196 | * to complete the BH is being canceled. | |
197 | */ | |
198 | static void qemu_laio_process_completions(LinuxAioState *s) | |
199 | { | |
200 | struct io_event *events; | |
201 | ||
202 | /* Reschedule so nested event loops see currently pending completions */ | |
203 | qemu_bh_schedule(s->completion_bh); | |
204 | ||
205 | while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events, | |
206 | s->event_idx))) { | |
207 | for (s->event_idx = 0; s->event_idx < s->event_max; ) { | |
208 | struct iocb *iocb = events[s->event_idx].obj; | |
209 | struct qemu_laiocb *laiocb = | |
210 | container_of(iocb, struct qemu_laiocb, iocb); | |
211 | ||
212 | laiocb->ret = io_event_ret(&events[s->event_idx]); | |
213 | ||
214 | /* Change counters one-by-one because we can be nested. */ | |
215 | s->io_q.in_flight--; | |
216 | s->event_idx++; | |
217 | qemu_laio_process_completion(laiocb); | |
218 | } | |
219 | } | |
220 | ||
221 | qemu_bh_cancel(s->completion_bh); | |
222 | ||
223 | /* If we are nested we have to notify the level above that we are done | |
224 | * by setting event_max to zero, upper level will then jump out of it's | |
225 | * own `for` loop. If we are the last all counters droped to zero. */ | |
226 | s->event_max = 0; | |
227 | s->event_idx = 0; | |
228 | } | |
229 | ||
230 | static void qemu_laio_process_completions_and_submit(LinuxAioState *s) | |
231 | { | |
232 | aio_context_acquire(s->aio_context); | |
233 | qemu_laio_process_completions(s); | |
234 | ||
235 | if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { | |
236 | ioq_submit(s); | |
237 | } | |
238 | aio_context_release(s->aio_context); | |
239 | } | |
240 | ||
241 | static void qemu_laio_completion_bh(void *opaque) | |
242 | { | |
243 | LinuxAioState *s = opaque; | |
244 | ||
245 | qemu_laio_process_completions_and_submit(s); | |
246 | } | |
247 | ||
248 | static void qemu_laio_completion_cb(EventNotifier *e) | |
249 | { | |
250 | LinuxAioState *s = container_of(e, LinuxAioState, e); | |
251 | ||
252 | if (event_notifier_test_and_clear(&s->e)) { | |
253 | qemu_laio_process_completions_and_submit(s); | |
254 | } | |
255 | } | |
256 | ||
257 | static bool qemu_laio_poll_cb(void *opaque) | |
258 | { | |
259 | EventNotifier *e = opaque; | |
260 | LinuxAioState *s = container_of(e, LinuxAioState, e); | |
261 | struct io_event *events; | |
262 | ||
263 | if (!io_getevents_peek(s->ctx, &events)) { | |
264 | return false; | |
265 | } | |
266 | ||
267 | qemu_laio_process_completions_and_submit(s); | |
268 | return true; | |
269 | } | |
270 | ||
271 | static void ioq_init(LaioQueue *io_q) | |
272 | { | |
273 | QSIMPLEQ_INIT(&io_q->pending); | |
274 | io_q->plugged = 0; | |
275 | io_q->in_queue = 0; | |
276 | io_q->in_flight = 0; | |
277 | io_q->blocked = false; | |
278 | } | |
279 | ||
280 | static void ioq_submit(LinuxAioState *s) | |
281 | { | |
282 | int ret, len; | |
283 | struct qemu_laiocb *aiocb; | |
284 | struct iocb *iocbs[MAX_EVENTS]; | |
285 | QSIMPLEQ_HEAD(, qemu_laiocb) completed; | |
286 | ||
287 | do { | |
288 | if (s->io_q.in_flight >= MAX_EVENTS) { | |
289 | break; | |
290 | } | |
291 | len = 0; | |
292 | QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) { | |
293 | iocbs[len++] = &aiocb->iocb; | |
294 | if (s->io_q.in_flight + len >= MAX_EVENTS) { | |
295 | break; | |
296 | } | |
297 | } | |
298 | ||
299 | ret = io_submit(s->ctx, len, iocbs); | |
300 | if (ret == -EAGAIN) { | |
301 | break; | |
302 | } | |
303 | if (ret < 0) { | |
304 | /* Fail the first request, retry the rest */ | |
305 | aiocb = QSIMPLEQ_FIRST(&s->io_q.pending); | |
306 | QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next); | |
307 | s->io_q.in_queue--; | |
308 | aiocb->ret = ret; | |
309 | qemu_laio_process_completion(aiocb); | |
310 | continue; | |
311 | } | |
312 | ||
313 | s->io_q.in_flight += ret; | |
314 | s->io_q.in_queue -= ret; | |
315 | aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb); | |
316 | QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed); | |
317 | } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending)); | |
318 | s->io_q.blocked = (s->io_q.in_queue > 0); | |
319 | ||
320 | if (s->io_q.in_flight) { | |
321 | /* We can try to complete something just right away if there are | |
322 | * still requests in-flight. */ | |
323 | qemu_laio_process_completions(s); | |
324 | /* | |
325 | * Even we have completed everything (in_flight == 0), the queue can | |
326 | * have still pended requests (in_queue > 0). We do not attempt to | |
327 | * repeat submission to avoid IO hang. The reason is simple: s->e is | |
328 | * still set and completion callback will be called shortly and all | |
329 | * pended requests will be submitted from there. | |
330 | */ | |
331 | } | |
332 | } | |
333 | ||
334 | void laio_io_plug(BlockDriverState *bs, LinuxAioState *s) | |
335 | { | |
336 | s->io_q.plugged++; | |
337 | } | |
338 | ||
339 | void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s) | |
340 | { | |
341 | assert(s->io_q.plugged); | |
342 | if (--s->io_q.plugged == 0 && | |
343 | !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { | |
344 | ioq_submit(s); | |
345 | } | |
346 | } | |
347 | ||
348 | static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset, | |
349 | int type) | |
350 | { | |
351 | LinuxAioState *s = laiocb->ctx; | |
352 | struct iocb *iocbs = &laiocb->iocb; | |
353 | QEMUIOVector *qiov = laiocb->qiov; | |
354 | ||
355 | switch (type) { | |
356 | case QEMU_AIO_WRITE: | |
357 | io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset); | |
358 | break; | |
359 | case QEMU_AIO_READ: | |
360 | io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset); | |
361 | break; | |
362 | /* Currently Linux kernel does not support other operations */ | |
363 | default: | |
364 | fprintf(stderr, "%s: invalid AIO request type 0x%x.\n", | |
365 | __func__, type); | |
366 | return -EIO; | |
367 | } | |
368 | io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); | |
369 | ||
370 | QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); | |
371 | s->io_q.in_queue++; | |
372 | if (!s->io_q.blocked && | |
373 | (!s->io_q.plugged || | |
374 | s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) { | |
375 | ioq_submit(s); | |
376 | } | |
377 | ||
378 | return 0; | |
379 | } | |
380 | ||
381 | int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd, | |
382 | uint64_t offset, QEMUIOVector *qiov, int type) | |
383 | { | |
384 | int ret; | |
385 | struct qemu_laiocb laiocb = { | |
386 | .co = qemu_coroutine_self(), | |
387 | .nbytes = qiov->size, | |
388 | .ctx = s, | |
389 | .ret = -EINPROGRESS, | |
390 | .is_read = (type == QEMU_AIO_READ), | |
391 | .qiov = qiov, | |
392 | }; | |
393 | ||
394 | ret = laio_do_submit(fd, &laiocb, offset, type); | |
395 | if (ret < 0) { | |
396 | return ret; | |
397 | } | |
398 | ||
399 | if (laiocb.ret == -EINPROGRESS) { | |
400 | qemu_coroutine_yield(); | |
401 | } | |
402 | return laiocb.ret; | |
403 | } | |
404 | ||
405 | void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) | |
406 | { | |
407 | aio_set_event_notifier(old_context, &s->e, false, NULL, NULL); | |
408 | qemu_bh_delete(s->completion_bh); | |
409 | s->aio_context = NULL; | |
410 | } | |
411 | ||
412 | void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context) | |
413 | { | |
414 | s->aio_context = new_context; | |
415 | s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); | |
416 | aio_set_event_notifier(new_context, &s->e, false, | |
417 | qemu_laio_completion_cb, | |
418 | qemu_laio_poll_cb); | |
419 | } | |
420 | ||
421 | LinuxAioState *laio_init(Error **errp) | |
422 | { | |
423 | int rc; | |
424 | LinuxAioState *s; | |
425 | ||
426 | s = g_malloc0(sizeof(*s)); | |
427 | rc = event_notifier_init(&s->e, false); | |
428 | if (rc < 0) { | |
429 | error_setg_errno(errp, -rc, "failed to to initialize event notifier"); | |
430 | goto out_free_state; | |
431 | } | |
432 | ||
433 | rc = io_setup(MAX_EVENTS, &s->ctx); | |
434 | if (rc < 0) { | |
435 | error_setg_errno(errp, -rc, "failed to create linux AIO context"); | |
436 | goto out_close_efd; | |
437 | } | |
438 | ||
439 | ioq_init(&s->io_q); | |
440 | ||
441 | return s; | |
442 | ||
443 | out_close_efd: | |
444 | event_notifier_cleanup(&s->e); | |
445 | out_free_state: | |
446 | g_free(s); | |
447 | return NULL; | |
448 | } | |
449 | ||
450 | void laio_cleanup(LinuxAioState *s) | |
451 | { | |
452 | event_notifier_cleanup(&s->e); | |
453 | ||
454 | if (io_destroy(s->ctx) != 0) { | |
455 | fprintf(stderr, "%s: destroy AIO context %p failed\n", | |
456 | __func__, &s->ctx); | |
457 | } | |
458 | g_free(s); | |
459 | } |