]> git.proxmox.com Git - mirror_qemu.git/blob - util/aio-posix.c
iothread: add aio-max-batch parameter
[mirror_qemu.git] / util / aio-posix.c
1 /*
2 * QEMU aio implementation
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
14 */
15
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/rcu.h"
20 #include "qemu/rcu_queue.h"
21 #include "qemu/sockets.h"
22 #include "qemu/cutils.h"
23 #include "trace.h"
24 #include "aio-posix.h"
25
26 /* Stop userspace polling on a handler if it isn't active for some time */
27 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
28
29 bool aio_poll_disabled(AioContext *ctx)
30 {
31 return qatomic_read(&ctx->poll_disable_cnt);
32 }
33
34 void aio_add_ready_handler(AioHandlerList *ready_list,
35 AioHandler *node,
36 int revents)
37 {
38 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
39 node->pfd.revents = revents;
40 QLIST_INSERT_HEAD(ready_list, node, node_ready);
41 }
42
43 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
44 {
45 AioHandler *node;
46
47 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
48 if (node->pfd.fd == fd) {
49 if (!QLIST_IS_INSERTED(node, node_deleted)) {
50 return node;
51 }
52 }
53 }
54
55 return NULL;
56 }
57
58 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
59 {
60 /* If the GSource is in the process of being destroyed then
61 * g_source_remove_poll() causes an assertion failure. Skip
62 * removal in that case, because glib cleans up its state during
63 * destruction anyway.
64 */
65 if (!g_source_is_destroyed(&ctx->source)) {
66 g_source_remove_poll(&ctx->source, &node->pfd);
67 }
68
69 node->pfd.revents = 0;
70
71 /* If the fd monitor has already marked it deleted, leave it alone */
72 if (QLIST_IS_INSERTED(node, node_deleted)) {
73 return false;
74 }
75
76 /* If a read is in progress, just mark the node as deleted */
77 if (qemu_lockcnt_count(&ctx->list_lock)) {
78 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
79 return false;
80 }
81 /* Otherwise, delete it for real. We can't just mark it as
82 * deleted because deleted nodes are only cleaned up while
83 * no one is walking the handlers list.
84 */
85 QLIST_SAFE_REMOVE(node, node_poll);
86 QLIST_REMOVE(node, node);
87 return true;
88 }
89
90 void aio_set_fd_handler(AioContext *ctx,
91 int fd,
92 bool is_external,
93 IOHandler *io_read,
94 IOHandler *io_write,
95 AioPollFn *io_poll,
96 void *opaque)
97 {
98 AioHandler *node;
99 AioHandler *new_node = NULL;
100 bool is_new = false;
101 bool deleted = false;
102 int poll_disable_change;
103
104 qemu_lockcnt_lock(&ctx->list_lock);
105
106 node = find_aio_handler(ctx, fd);
107
108 /* Are we deleting the fd handler? */
109 if (!io_read && !io_write && !io_poll) {
110 if (node == NULL) {
111 qemu_lockcnt_unlock(&ctx->list_lock);
112 return;
113 }
114 /* Clean events in order to unregister fd from the ctx epoll. */
115 node->pfd.events = 0;
116
117 poll_disable_change = -!node->io_poll;
118 } else {
119 poll_disable_change = !io_poll - (node && !node->io_poll);
120 if (node == NULL) {
121 is_new = true;
122 }
123 /* Alloc and insert if it's not already there */
124 new_node = g_new0(AioHandler, 1);
125
126 /* Update handler with latest information */
127 new_node->io_read = io_read;
128 new_node->io_write = io_write;
129 new_node->io_poll = io_poll;
130 new_node->opaque = opaque;
131 new_node->is_external = is_external;
132
133 if (is_new) {
134 new_node->pfd.fd = fd;
135 } else {
136 new_node->pfd = node->pfd;
137 }
138 g_source_add_poll(&ctx->source, &new_node->pfd);
139
140 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
141 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
142
143 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
144 }
145
146 /* No need to order poll_disable_cnt writes against other updates;
147 * the counter is only used to avoid wasting time and latency on
148 * iterated polling when the system call will be ultimately necessary.
149 * Changing handlers is a rare event, and a little wasted polling until
150 * the aio_notify below is not an issue.
151 */
152 qatomic_set(&ctx->poll_disable_cnt,
153 qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
154
155 ctx->fdmon_ops->update(ctx, node, new_node);
156 if (node) {
157 deleted = aio_remove_fd_handler(ctx, node);
158 }
159 qemu_lockcnt_unlock(&ctx->list_lock);
160 aio_notify(ctx);
161
162 if (deleted) {
163 g_free(node);
164 }
165 }
166
167 void aio_set_fd_poll(AioContext *ctx, int fd,
168 IOHandler *io_poll_begin,
169 IOHandler *io_poll_end)
170 {
171 AioHandler *node = find_aio_handler(ctx, fd);
172
173 if (!node) {
174 return;
175 }
176
177 node->io_poll_begin = io_poll_begin;
178 node->io_poll_end = io_poll_end;
179 }
180
181 void aio_set_event_notifier(AioContext *ctx,
182 EventNotifier *notifier,
183 bool is_external,
184 EventNotifierHandler *io_read,
185 AioPollFn *io_poll)
186 {
187 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
188 (IOHandler *)io_read, NULL, io_poll, notifier);
189 }
190
191 void aio_set_event_notifier_poll(AioContext *ctx,
192 EventNotifier *notifier,
193 EventNotifierHandler *io_poll_begin,
194 EventNotifierHandler *io_poll_end)
195 {
196 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
197 (IOHandler *)io_poll_begin,
198 (IOHandler *)io_poll_end);
199 }
200
201 static bool poll_set_started(AioContext *ctx, bool started)
202 {
203 AioHandler *node;
204 bool progress = false;
205
206 if (started == ctx->poll_started) {
207 return false;
208 }
209
210 ctx->poll_started = started;
211
212 qemu_lockcnt_inc(&ctx->list_lock);
213 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
214 IOHandler *fn;
215
216 if (QLIST_IS_INSERTED(node, node_deleted)) {
217 continue;
218 }
219
220 if (started) {
221 fn = node->io_poll_begin;
222 } else {
223 fn = node->io_poll_end;
224 }
225
226 if (fn) {
227 fn(node->opaque);
228 }
229
230 /* Poll one last time in case ->io_poll_end() raced with the event */
231 if (!started) {
232 progress = node->io_poll(node->opaque) || progress;
233 }
234 }
235 qemu_lockcnt_dec(&ctx->list_lock);
236
237 return progress;
238 }
239
240
241 bool aio_prepare(AioContext *ctx)
242 {
243 /* Poll mode cannot be used with glib's event loop, disable it. */
244 poll_set_started(ctx, false);
245
246 return false;
247 }
248
249 bool aio_pending(AioContext *ctx)
250 {
251 AioHandler *node;
252 bool result = false;
253
254 /*
255 * We have to walk very carefully in case aio_set_fd_handler is
256 * called while we're walking.
257 */
258 qemu_lockcnt_inc(&ctx->list_lock);
259
260 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
261 int revents;
262
263 revents = node->pfd.revents & node->pfd.events;
264 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
265 aio_node_check(ctx, node->is_external)) {
266 result = true;
267 break;
268 }
269 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
270 aio_node_check(ctx, node->is_external)) {
271 result = true;
272 break;
273 }
274 }
275 qemu_lockcnt_dec(&ctx->list_lock);
276
277 return result;
278 }
279
280 static void aio_free_deleted_handlers(AioContext *ctx)
281 {
282 AioHandler *node;
283
284 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
285 return;
286 }
287 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
288 return; /* we are nested, let the parent do the freeing */
289 }
290
291 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
292 QLIST_REMOVE(node, node);
293 QLIST_REMOVE(node, node_deleted);
294 QLIST_SAFE_REMOVE(node, node_poll);
295 g_free(node);
296 }
297
298 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
299 }
300
301 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
302 {
303 bool progress = false;
304 int revents;
305
306 revents = node->pfd.revents & node->pfd.events;
307 node->pfd.revents = 0;
308
309 /*
310 * Start polling AioHandlers when they become ready because activity is
311 * likely to continue. Note that starvation is theoretically possible when
312 * fdmon_supports_polling(), but only until the fd fires for the first
313 * time.
314 */
315 if (!QLIST_IS_INSERTED(node, node_deleted) &&
316 !QLIST_IS_INSERTED(node, node_poll) &&
317 node->io_poll) {
318 trace_poll_add(ctx, node, node->pfd.fd, revents);
319 if (ctx->poll_started && node->io_poll_begin) {
320 node->io_poll_begin(node->opaque);
321 }
322 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
323 }
324
325 if (!QLIST_IS_INSERTED(node, node_deleted) &&
326 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
327 aio_node_check(ctx, node->is_external) &&
328 node->io_read) {
329 node->io_read(node->opaque);
330
331 /* aio_notify() does not count as progress */
332 if (node->opaque != &ctx->notifier) {
333 progress = true;
334 }
335 }
336 if (!QLIST_IS_INSERTED(node, node_deleted) &&
337 (revents & (G_IO_OUT | G_IO_ERR)) &&
338 aio_node_check(ctx, node->is_external) &&
339 node->io_write) {
340 node->io_write(node->opaque);
341 progress = true;
342 }
343
344 return progress;
345 }
346
347 /*
348 * If we have a list of ready handlers then this is more efficient than
349 * scanning all handlers with aio_dispatch_handlers().
350 */
351 static bool aio_dispatch_ready_handlers(AioContext *ctx,
352 AioHandlerList *ready_list)
353 {
354 bool progress = false;
355 AioHandler *node;
356
357 while ((node = QLIST_FIRST(ready_list))) {
358 QLIST_REMOVE(node, node_ready);
359 progress = aio_dispatch_handler(ctx, node) || progress;
360 }
361
362 return progress;
363 }
364
365 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
366 static bool aio_dispatch_handlers(AioContext *ctx)
367 {
368 AioHandler *node, *tmp;
369 bool progress = false;
370
371 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
372 progress = aio_dispatch_handler(ctx, node) || progress;
373 }
374
375 return progress;
376 }
377
378 void aio_dispatch(AioContext *ctx)
379 {
380 qemu_lockcnt_inc(&ctx->list_lock);
381 aio_bh_poll(ctx);
382 aio_dispatch_handlers(ctx);
383 aio_free_deleted_handlers(ctx);
384 qemu_lockcnt_dec(&ctx->list_lock);
385
386 timerlistgroup_run_timers(&ctx->tlg);
387 }
388
389 static bool run_poll_handlers_once(AioContext *ctx,
390 int64_t now,
391 int64_t *timeout)
392 {
393 bool progress = false;
394 AioHandler *node;
395 AioHandler *tmp;
396
397 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
398 if (aio_node_check(ctx, node->is_external) &&
399 node->io_poll(node->opaque)) {
400 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
401
402 /*
403 * Polling was successful, exit try_poll_mode immediately
404 * to adjust the next polling time.
405 */
406 *timeout = 0;
407 if (node->opaque != &ctx->notifier) {
408 progress = true;
409 }
410 }
411
412 /* Caller handles freeing deleted nodes. Don't do it here. */
413 }
414
415 return progress;
416 }
417
418 static bool fdmon_supports_polling(AioContext *ctx)
419 {
420 return ctx->fdmon_ops->need_wait != aio_poll_disabled;
421 }
422
423 static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
424 {
425 AioHandler *node;
426 AioHandler *tmp;
427 bool progress = false;
428
429 /*
430 * File descriptor monitoring implementations without userspace polling
431 * support suffer from starvation when a subset of handlers is polled
432 * because fds will not be processed in a timely fashion. Don't remove
433 * idle poll handlers.
434 */
435 if (!fdmon_supports_polling(ctx)) {
436 return false;
437 }
438
439 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
440 if (node->poll_idle_timeout == 0LL) {
441 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
442 } else if (now >= node->poll_idle_timeout) {
443 trace_poll_remove(ctx, node, node->pfd.fd);
444 node->poll_idle_timeout = 0LL;
445 QLIST_SAFE_REMOVE(node, node_poll);
446 if (ctx->poll_started && node->io_poll_end) {
447 node->io_poll_end(node->opaque);
448
449 /*
450 * Final poll in case ->io_poll_end() races with an event.
451 * Nevermind about re-adding the handler in the rare case where
452 * this causes progress.
453 */
454 progress = node->io_poll(node->opaque) || progress;
455 }
456 }
457 }
458
459 return progress;
460 }
461
462 /* run_poll_handlers:
463 * @ctx: the AioContext
464 * @max_ns: maximum time to poll for, in nanoseconds
465 *
466 * Polls for a given time.
467 *
468 * Note that the caller must have incremented ctx->list_lock.
469 *
470 * Returns: true if progress was made, false otherwise
471 */
472 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
473 {
474 bool progress;
475 int64_t start_time, elapsed_time;
476
477 assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
478
479 trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
480
481 /*
482 * Optimization: ->io_poll() handlers often contain RCU read critical
483 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
484 * -> rcu_read_lock() -> ... sequences with expensive memory
485 * synchronization primitives. Make the entire polling loop an RCU
486 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
487 * are cheap.
488 */
489 RCU_READ_LOCK_GUARD();
490
491 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
492 do {
493 progress = run_poll_handlers_once(ctx, start_time, timeout);
494 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
495 max_ns = qemu_soonest_timeout(*timeout, max_ns);
496 assert(!(max_ns && progress));
497 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
498
499 if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
500 *timeout = 0;
501 progress = true;
502 }
503
504 /* If time has passed with no successful polling, adjust *timeout to
505 * keep the same ending time.
506 */
507 if (*timeout != -1) {
508 *timeout -= MIN(*timeout, elapsed_time);
509 }
510
511 trace_run_poll_handlers_end(ctx, progress, *timeout);
512 return progress;
513 }
514
515 /* try_poll_mode:
516 * @ctx: the AioContext
517 * @timeout: timeout for blocking wait, computed by the caller and updated if
518 * polling succeeds.
519 *
520 * Note that the caller must have incremented ctx->list_lock.
521 *
522 * Returns: true if progress was made, false otherwise
523 */
524 static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
525 {
526 int64_t max_ns;
527
528 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
529 return false;
530 }
531
532 max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
533 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
534 poll_set_started(ctx, true);
535
536 if (run_poll_handlers(ctx, max_ns, timeout)) {
537 return true;
538 }
539 }
540
541 if (poll_set_started(ctx, false)) {
542 *timeout = 0;
543 return true;
544 }
545
546 return false;
547 }
548
549 bool aio_poll(AioContext *ctx, bool blocking)
550 {
551 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
552 int ret = 0;
553 bool progress;
554 bool use_notify_me;
555 int64_t timeout;
556 int64_t start = 0;
557
558 /*
559 * There cannot be two concurrent aio_poll calls for the same AioContext (or
560 * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
561 * We rely on this below to avoid slow locked accesses to ctx->notify_me.
562 *
563 * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
564 * is special in that it runs in the main thread, but that thread's context
565 * is qemu_aio_context.
566 */
567 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
568 qemu_get_aio_context() : ctx));
569
570 qemu_lockcnt_inc(&ctx->list_lock);
571
572 if (ctx->poll_max_ns) {
573 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
574 }
575
576 timeout = blocking ? aio_compute_timeout(ctx) : 0;
577 progress = try_poll_mode(ctx, &timeout);
578 assert(!(timeout && progress));
579
580 /*
581 * aio_notify can avoid the expensive event_notifier_set if
582 * everything (file descriptors, bottom halves, timers) will
583 * be re-evaluated before the next blocking poll(). This is
584 * already true when aio_poll is called with blocking == false;
585 * if blocking == true, it is only true after poll() returns,
586 * so disable the optimization now.
587 */
588 use_notify_me = timeout != 0;
589 if (use_notify_me) {
590 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
591 /*
592 * Write ctx->notify_me before reading ctx->notified. Pairs with
593 * smp_mb in aio_notify().
594 */
595 smp_mb();
596
597 /* Don't block if aio_notify() was called */
598 if (qatomic_read(&ctx->notified)) {
599 timeout = 0;
600 }
601 }
602
603 /* If polling is allowed, non-blocking aio_poll does not need the
604 * system call---a single round of run_poll_handlers_once suffices.
605 */
606 if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
607 ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
608 }
609
610 if (use_notify_me) {
611 /* Finish the poll before clearing the flag. */
612 qatomic_store_release(&ctx->notify_me,
613 qatomic_read(&ctx->notify_me) - 2);
614 }
615
616 aio_notify_accept(ctx);
617
618 /* Adjust polling time */
619 if (ctx->poll_max_ns) {
620 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
621
622 if (block_ns <= ctx->poll_ns) {
623 /* This is the sweet spot, no adjustment needed */
624 } else if (block_ns > ctx->poll_max_ns) {
625 /* We'd have to poll for too long, poll less */
626 int64_t old = ctx->poll_ns;
627
628 if (ctx->poll_shrink) {
629 ctx->poll_ns /= ctx->poll_shrink;
630 } else {
631 ctx->poll_ns = 0;
632 }
633
634 trace_poll_shrink(ctx, old, ctx->poll_ns);
635 } else if (ctx->poll_ns < ctx->poll_max_ns &&
636 block_ns < ctx->poll_max_ns) {
637 /* There is room to grow, poll longer */
638 int64_t old = ctx->poll_ns;
639 int64_t grow = ctx->poll_grow;
640
641 if (grow == 0) {
642 grow = 2;
643 }
644
645 if (ctx->poll_ns) {
646 ctx->poll_ns *= grow;
647 } else {
648 ctx->poll_ns = 4000; /* start polling at 4 microseconds */
649 }
650
651 if (ctx->poll_ns > ctx->poll_max_ns) {
652 ctx->poll_ns = ctx->poll_max_ns;
653 }
654
655 trace_poll_grow(ctx, old, ctx->poll_ns);
656 }
657 }
658
659 progress |= aio_bh_poll(ctx);
660
661 if (ret > 0) {
662 progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
663 }
664
665 aio_free_deleted_handlers(ctx);
666
667 qemu_lockcnt_dec(&ctx->list_lock);
668
669 progress |= timerlistgroup_run_timers(&ctx->tlg);
670
671 return progress;
672 }
673
674 void aio_context_setup(AioContext *ctx)
675 {
676 ctx->fdmon_ops = &fdmon_poll_ops;
677 ctx->epollfd = -1;
678
679 /* Use the fastest fd monitoring implementation if available */
680 if (fdmon_io_uring_setup(ctx)) {
681 return;
682 }
683
684 fdmon_epoll_setup(ctx);
685 }
686
687 void aio_context_destroy(AioContext *ctx)
688 {
689 fdmon_io_uring_destroy(ctx);
690 fdmon_epoll_disable(ctx);
691 aio_free_deleted_handlers(ctx);
692 }
693
694 void aio_context_use_g_source(AioContext *ctx)
695 {
696 /*
697 * Disable io_uring when the glib main loop is used because it doesn't
698 * support mixed glib/aio_poll() usage. It relies on aio_poll() being
699 * called regularly so that changes to the monitored file descriptors are
700 * submitted, otherwise a list of pending fd handlers builds up.
701 */
702 fdmon_io_uring_destroy(ctx);
703 aio_free_deleted_handlers(ctx);
704 }
705
706 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
707 int64_t grow, int64_t shrink, Error **errp)
708 {
709 /* No thread synchronization here, it doesn't matter if an incorrect value
710 * is used once.
711 */
712 ctx->poll_max_ns = max_ns;
713 ctx->poll_ns = 0;
714 ctx->poll_grow = grow;
715 ctx->poll_shrink = shrink;
716
717 aio_notify(ctx);
718 }
719
720 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
721 Error **errp)
722 {
723 /*
724 * No thread synchronization here, it doesn't matter if an incorrect value
725 * is used once.
726 */
727 ctx->aio_max_batch = max_batch;
728
729 aio_notify(ctx);
730 }