]> git.proxmox.com Git - mirror_qemu.git/blob - util/aio-posix.c
aio-posix: make AioHandler deletion O(1)
[mirror_qemu.git] / util / aio-posix.c
1 /*
2 * QEMU aio implementation
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
14 */
15
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "qemu/rcu.h"
19 #include "qemu/rcu_queue.h"
20 #include "qemu/sockets.h"
21 #include "qemu/cutils.h"
22 #include "trace.h"
23 #ifdef CONFIG_EPOLL_CREATE1
24 #include <sys/epoll.h>
25 #endif
26
27 struct AioHandler
28 {
29 GPollFD pfd;
30 IOHandler *io_read;
31 IOHandler *io_write;
32 AioPollFn *io_poll;
33 IOHandler *io_poll_begin;
34 IOHandler *io_poll_end;
35 void *opaque;
36 bool is_external;
37 QLIST_ENTRY(AioHandler) node;
38 QLIST_ENTRY(AioHandler) node_deleted;
39 };
40
41 #ifdef CONFIG_EPOLL_CREATE1
42
43 /* The fd number threshold to switch to epoll */
44 #define EPOLL_ENABLE_THRESHOLD 64
45
46 static void aio_epoll_disable(AioContext *ctx)
47 {
48 ctx->epoll_enabled = false;
49 if (!ctx->epoll_available) {
50 return;
51 }
52 ctx->epoll_available = false;
53 close(ctx->epollfd);
54 }
55
56 static inline int epoll_events_from_pfd(int pfd_events)
57 {
58 return (pfd_events & G_IO_IN ? EPOLLIN : 0) |
59 (pfd_events & G_IO_OUT ? EPOLLOUT : 0) |
60 (pfd_events & G_IO_HUP ? EPOLLHUP : 0) |
61 (pfd_events & G_IO_ERR ? EPOLLERR : 0);
62 }
63
64 static bool aio_epoll_try_enable(AioContext *ctx)
65 {
66 AioHandler *node;
67 struct epoll_event event;
68
69 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
70 int r;
71 if (QLIST_IS_INSERTED(node, node_deleted) || !node->pfd.events) {
72 continue;
73 }
74 event.events = epoll_events_from_pfd(node->pfd.events);
75 event.data.ptr = node;
76 r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event);
77 if (r) {
78 return false;
79 }
80 }
81 ctx->epoll_enabled = true;
82 return true;
83 }
84
85 static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
86 {
87 struct epoll_event event;
88 int r;
89 int ctl;
90
91 if (!ctx->epoll_enabled) {
92 return;
93 }
94 if (!node->pfd.events) {
95 ctl = EPOLL_CTL_DEL;
96 } else {
97 event.data.ptr = node;
98 event.events = epoll_events_from_pfd(node->pfd.events);
99 ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
100 }
101
102 r = epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);
103 if (r) {
104 aio_epoll_disable(ctx);
105 }
106 }
107
108 static int aio_epoll(AioContext *ctx, int64_t timeout)
109 {
110 GPollFD pfd = {
111 .fd = ctx->epollfd,
112 .events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR,
113 };
114 AioHandler *node;
115 int i, ret = 0;
116 struct epoll_event events[128];
117
118 if (timeout > 0) {
119 ret = qemu_poll_ns(&pfd, 1, timeout);
120 if (ret > 0) {
121 timeout = 0;
122 }
123 }
124 if (timeout <= 0 || ret > 0) {
125 ret = epoll_wait(ctx->epollfd, events,
126 ARRAY_SIZE(events),
127 timeout);
128 if (ret <= 0) {
129 goto out;
130 }
131 for (i = 0; i < ret; i++) {
132 int ev = events[i].events;
133 node = events[i].data.ptr;
134 node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
135 (ev & EPOLLOUT ? G_IO_OUT : 0) |
136 (ev & EPOLLHUP ? G_IO_HUP : 0) |
137 (ev & EPOLLERR ? G_IO_ERR : 0);
138 }
139 }
140 out:
141 return ret;
142 }
143
144 static bool aio_epoll_enabled(AioContext *ctx)
145 {
146 /* Fall back to ppoll when external clients are disabled. */
147 return !aio_external_disabled(ctx) && ctx->epoll_enabled;
148 }
149
150 static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
151 unsigned npfd, int64_t timeout)
152 {
153 if (!ctx->epoll_available) {
154 return false;
155 }
156 if (aio_epoll_enabled(ctx)) {
157 return true;
158 }
159 if (npfd >= EPOLL_ENABLE_THRESHOLD) {
160 if (aio_epoll_try_enable(ctx)) {
161 return true;
162 } else {
163 aio_epoll_disable(ctx);
164 }
165 }
166 return false;
167 }
168
169 #else
170
171 static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
172 {
173 }
174
175 static int aio_epoll(AioContext *ctx, GPollFD *pfds,
176 unsigned npfd, int64_t timeout)
177 {
178 assert(false);
179 }
180
181 static bool aio_epoll_enabled(AioContext *ctx)
182 {
183 return false;
184 }
185
186 static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
187 unsigned npfd, int64_t timeout)
188 {
189 return false;
190 }
191
192 #endif
193
194 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
195 {
196 AioHandler *node;
197
198 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
199 if (node->pfd.fd == fd) {
200 if (!QLIST_IS_INSERTED(node, node_deleted)) {
201 return node;
202 }
203 }
204 }
205
206 return NULL;
207 }
208
209 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
210 {
211 /* If the GSource is in the process of being destroyed then
212 * g_source_remove_poll() causes an assertion failure. Skip
213 * removal in that case, because glib cleans up its state during
214 * destruction anyway.
215 */
216 if (!g_source_is_destroyed(&ctx->source)) {
217 g_source_remove_poll(&ctx->source, &node->pfd);
218 }
219
220 /* If a read is in progress, just mark the node as deleted */
221 if (qemu_lockcnt_count(&ctx->list_lock)) {
222 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
223 node->pfd.revents = 0;
224 return false;
225 }
226 /* Otherwise, delete it for real. We can't just mark it as
227 * deleted because deleted nodes are only cleaned up while
228 * no one is walking the handlers list.
229 */
230 QLIST_REMOVE(node, node);
231 return true;
232 }
233
234 void aio_set_fd_handler(AioContext *ctx,
235 int fd,
236 bool is_external,
237 IOHandler *io_read,
238 IOHandler *io_write,
239 AioPollFn *io_poll,
240 void *opaque)
241 {
242 AioHandler *node;
243 AioHandler *new_node = NULL;
244 bool is_new = false;
245 bool deleted = false;
246 int poll_disable_change;
247
248 qemu_lockcnt_lock(&ctx->list_lock);
249
250 node = find_aio_handler(ctx, fd);
251
252 /* Are we deleting the fd handler? */
253 if (!io_read && !io_write && !io_poll) {
254 if (node == NULL) {
255 qemu_lockcnt_unlock(&ctx->list_lock);
256 return;
257 }
258 /* Clean events in order to unregister fd from the ctx epoll. */
259 node->pfd.events = 0;
260
261 poll_disable_change = -!node->io_poll;
262 } else {
263 poll_disable_change = !io_poll - (node && !node->io_poll);
264 if (node == NULL) {
265 is_new = true;
266 }
267 /* Alloc and insert if it's not already there */
268 new_node = g_new0(AioHandler, 1);
269
270 /* Update handler with latest information */
271 new_node->io_read = io_read;
272 new_node->io_write = io_write;
273 new_node->io_poll = io_poll;
274 new_node->opaque = opaque;
275 new_node->is_external = is_external;
276
277 if (is_new) {
278 new_node->pfd.fd = fd;
279 } else {
280 new_node->pfd = node->pfd;
281 }
282 g_source_add_poll(&ctx->source, &new_node->pfd);
283
284 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
285 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
286
287 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
288 }
289 if (node) {
290 deleted = aio_remove_fd_handler(ctx, node);
291 }
292
293 /* No need to order poll_disable_cnt writes against other updates;
294 * the counter is only used to avoid wasting time and latency on
295 * iterated polling when the system call will be ultimately necessary.
296 * Changing handlers is a rare event, and a little wasted polling until
297 * the aio_notify below is not an issue.
298 */
299 atomic_set(&ctx->poll_disable_cnt,
300 atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
301
302 if (new_node) {
303 aio_epoll_update(ctx, new_node, is_new);
304 } else if (node) {
305 /* Unregister deleted fd_handler */
306 aio_epoll_update(ctx, node, false);
307 }
308 qemu_lockcnt_unlock(&ctx->list_lock);
309 aio_notify(ctx);
310
311 if (deleted) {
312 g_free(node);
313 }
314 }
315
316 void aio_set_fd_poll(AioContext *ctx, int fd,
317 IOHandler *io_poll_begin,
318 IOHandler *io_poll_end)
319 {
320 AioHandler *node = find_aio_handler(ctx, fd);
321
322 if (!node) {
323 return;
324 }
325
326 node->io_poll_begin = io_poll_begin;
327 node->io_poll_end = io_poll_end;
328 }
329
330 void aio_set_event_notifier(AioContext *ctx,
331 EventNotifier *notifier,
332 bool is_external,
333 EventNotifierHandler *io_read,
334 AioPollFn *io_poll)
335 {
336 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
337 (IOHandler *)io_read, NULL, io_poll, notifier);
338 }
339
340 void aio_set_event_notifier_poll(AioContext *ctx,
341 EventNotifier *notifier,
342 EventNotifierHandler *io_poll_begin,
343 EventNotifierHandler *io_poll_end)
344 {
345 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
346 (IOHandler *)io_poll_begin,
347 (IOHandler *)io_poll_end);
348 }
349
350 static void poll_set_started(AioContext *ctx, bool started)
351 {
352 AioHandler *node;
353
354 if (started == ctx->poll_started) {
355 return;
356 }
357
358 ctx->poll_started = started;
359
360 qemu_lockcnt_inc(&ctx->list_lock);
361 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
362 IOHandler *fn;
363
364 if (QLIST_IS_INSERTED(node, node_deleted)) {
365 continue;
366 }
367
368 if (started) {
369 fn = node->io_poll_begin;
370 } else {
371 fn = node->io_poll_end;
372 }
373
374 if (fn) {
375 fn(node->opaque);
376 }
377 }
378 qemu_lockcnt_dec(&ctx->list_lock);
379 }
380
381
382 bool aio_prepare(AioContext *ctx)
383 {
384 /* Poll mode cannot be used with glib's event loop, disable it. */
385 poll_set_started(ctx, false);
386
387 return false;
388 }
389
390 bool aio_pending(AioContext *ctx)
391 {
392 AioHandler *node;
393 bool result = false;
394
395 /*
396 * We have to walk very carefully in case aio_set_fd_handler is
397 * called while we're walking.
398 */
399 qemu_lockcnt_inc(&ctx->list_lock);
400
401 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
402 int revents;
403
404 revents = node->pfd.revents & node->pfd.events;
405 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
406 aio_node_check(ctx, node->is_external)) {
407 result = true;
408 break;
409 }
410 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
411 aio_node_check(ctx, node->is_external)) {
412 result = true;
413 break;
414 }
415 }
416 qemu_lockcnt_dec(&ctx->list_lock);
417
418 return result;
419 }
420
421 static void aio_free_deleted_handlers(AioContext *ctx)
422 {
423 AioHandler *node;
424
425 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
426 return;
427 }
428 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
429 return; /* we are nested, let the parent do the freeing */
430 }
431
432 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
433 QLIST_REMOVE(node, node);
434 QLIST_REMOVE(node, node_deleted);
435 g_free(node);
436 }
437
438 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
439 }
440
441 static bool aio_dispatch_handlers(AioContext *ctx)
442 {
443 AioHandler *node, *tmp;
444 bool progress = false;
445
446 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
447 int revents;
448
449 revents = node->pfd.revents & node->pfd.events;
450 node->pfd.revents = 0;
451
452 if (!QLIST_IS_INSERTED(node, node_deleted) &&
453 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
454 aio_node_check(ctx, node->is_external) &&
455 node->io_read) {
456 node->io_read(node->opaque);
457
458 /* aio_notify() does not count as progress */
459 if (node->opaque != &ctx->notifier) {
460 progress = true;
461 }
462 }
463 if (!QLIST_IS_INSERTED(node, node_deleted) &&
464 (revents & (G_IO_OUT | G_IO_ERR)) &&
465 aio_node_check(ctx, node->is_external) &&
466 node->io_write) {
467 node->io_write(node->opaque);
468 progress = true;
469 }
470 }
471
472 return progress;
473 }
474
475 void aio_dispatch(AioContext *ctx)
476 {
477 qemu_lockcnt_inc(&ctx->list_lock);
478 aio_bh_poll(ctx);
479 aio_dispatch_handlers(ctx);
480 aio_free_deleted_handlers(ctx);
481 qemu_lockcnt_dec(&ctx->list_lock);
482
483 timerlistgroup_run_timers(&ctx->tlg);
484 }
485
486 /* These thread-local variables are used only in a small part of aio_poll
487 * around the call to the poll() system call. In particular they are not
488 * used while aio_poll is performing callbacks, which makes it much easier
489 * to think about reentrancy!
490 *
491 * Stack-allocated arrays would be perfect but they have size limitations;
492 * heap allocation is expensive enough that we want to reuse arrays across
493 * calls to aio_poll(). And because poll() has to be called without holding
494 * any lock, the arrays cannot be stored in AioContext. Thread-local data
495 * has none of the disadvantages of these three options.
496 */
497 static __thread GPollFD *pollfds;
498 static __thread AioHandler **nodes;
499 static __thread unsigned npfd, nalloc;
500 static __thread Notifier pollfds_cleanup_notifier;
501
502 static void pollfds_cleanup(Notifier *n, void *unused)
503 {
504 g_assert(npfd == 0);
505 g_free(pollfds);
506 g_free(nodes);
507 nalloc = 0;
508 }
509
510 static void add_pollfd(AioHandler *node)
511 {
512 if (npfd == nalloc) {
513 if (nalloc == 0) {
514 pollfds_cleanup_notifier.notify = pollfds_cleanup;
515 qemu_thread_atexit_add(&pollfds_cleanup_notifier);
516 nalloc = 8;
517 } else {
518 g_assert(nalloc <= INT_MAX);
519 nalloc *= 2;
520 }
521 pollfds = g_renew(GPollFD, pollfds, nalloc);
522 nodes = g_renew(AioHandler *, nodes, nalloc);
523 }
524 nodes[npfd] = node;
525 pollfds[npfd] = (GPollFD) {
526 .fd = node->pfd.fd,
527 .events = node->pfd.events,
528 };
529 npfd++;
530 }
531
532 static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
533 {
534 bool progress = false;
535 AioHandler *node;
536
537 /*
538 * Optimization: ->io_poll() handlers often contain RCU read critical
539 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
540 * -> rcu_read_lock() -> ... sequences with expensive memory
541 * synchronization primitives. Make the entire polling loop an RCU
542 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
543 * are cheap.
544 */
545 RCU_READ_LOCK_GUARD();
546
547 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
548 if (!QLIST_IS_INSERTED(node, node_deleted) && node->io_poll &&
549 aio_node_check(ctx, node->is_external) &&
550 node->io_poll(node->opaque)) {
551 /*
552 * Polling was successful, exit try_poll_mode immediately
553 * to adjust the next polling time.
554 */
555 *timeout = 0;
556 if (node->opaque != &ctx->notifier) {
557 progress = true;
558 }
559 }
560
561 /* Caller handles freeing deleted nodes. Don't do it here. */
562 }
563
564 return progress;
565 }
566
567 /* run_poll_handlers:
568 * @ctx: the AioContext
569 * @max_ns: maximum time to poll for, in nanoseconds
570 *
571 * Polls for a given time.
572 *
573 * Note that ctx->notify_me must be non-zero so this function can detect
574 * aio_notify().
575 *
576 * Note that the caller must have incremented ctx->list_lock.
577 *
578 * Returns: true if progress was made, false otherwise
579 */
580 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
581 {
582 bool progress;
583 int64_t start_time, elapsed_time;
584
585 assert(ctx->notify_me);
586 assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
587
588 trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
589
590 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
591 do {
592 progress = run_poll_handlers_once(ctx, timeout);
593 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
594 max_ns = qemu_soonest_timeout(*timeout, max_ns);
595 assert(!(max_ns && progress));
596 } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt));
597
598 /* If time has passed with no successful polling, adjust *timeout to
599 * keep the same ending time.
600 */
601 if (*timeout != -1) {
602 *timeout -= MIN(*timeout, elapsed_time);
603 }
604
605 trace_run_poll_handlers_end(ctx, progress, *timeout);
606 return progress;
607 }
608
609 /* try_poll_mode:
610 * @ctx: the AioContext
611 * @timeout: timeout for blocking wait, computed by the caller and updated if
612 * polling succeeds.
613 *
614 * ctx->notify_me must be non-zero so this function can detect aio_notify().
615 *
616 * Note that the caller must have incremented ctx->list_lock.
617 *
618 * Returns: true if progress was made, false otherwise
619 */
620 static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
621 {
622 int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
623
624 if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) {
625 poll_set_started(ctx, true);
626
627 if (run_poll_handlers(ctx, max_ns, timeout)) {
628 return true;
629 }
630 }
631
632 poll_set_started(ctx, false);
633
634 /* Even if we don't run busy polling, try polling once in case it can make
635 * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2).
636 */
637 return run_poll_handlers_once(ctx, timeout);
638 }
639
640 bool aio_poll(AioContext *ctx, bool blocking)
641 {
642 AioHandler *node;
643 int i;
644 int ret = 0;
645 bool progress;
646 int64_t timeout;
647 int64_t start = 0;
648
649 assert(in_aio_context_home_thread(ctx));
650
651 /* aio_notify can avoid the expensive event_notifier_set if
652 * everything (file descriptors, bottom halves, timers) will
653 * be re-evaluated before the next blocking poll(). This is
654 * already true when aio_poll is called with blocking == false;
655 * if blocking == true, it is only true after poll() returns,
656 * so disable the optimization now.
657 */
658 if (blocking) {
659 atomic_add(&ctx->notify_me, 2);
660 }
661
662 qemu_lockcnt_inc(&ctx->list_lock);
663
664 if (ctx->poll_max_ns) {
665 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
666 }
667
668 timeout = blocking ? aio_compute_timeout(ctx) : 0;
669 progress = try_poll_mode(ctx, &timeout);
670 assert(!(timeout && progress));
671
672 /* If polling is allowed, non-blocking aio_poll does not need the
673 * system call---a single round of run_poll_handlers_once suffices.
674 */
675 if (timeout || atomic_read(&ctx->poll_disable_cnt)) {
676 assert(npfd == 0);
677
678 /* fill pollfds */
679
680 if (!aio_epoll_enabled(ctx)) {
681 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
682 if (!QLIST_IS_INSERTED(node, node_deleted) && node->pfd.events
683 && aio_node_check(ctx, node->is_external)) {
684 add_pollfd(node);
685 }
686 }
687 }
688
689 /* wait until next event */
690 if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
691 npfd = 0; /* pollfds[] is not being used */
692 ret = aio_epoll(ctx, timeout);
693 } else {
694 ret = qemu_poll_ns(pollfds, npfd, timeout);
695 }
696 }
697
698 if (blocking) {
699 atomic_sub(&ctx->notify_me, 2);
700 aio_notify_accept(ctx);
701 }
702
703 /* Adjust polling time */
704 if (ctx->poll_max_ns) {
705 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
706
707 if (block_ns <= ctx->poll_ns) {
708 /* This is the sweet spot, no adjustment needed */
709 } else if (block_ns > ctx->poll_max_ns) {
710 /* We'd have to poll for too long, poll less */
711 int64_t old = ctx->poll_ns;
712
713 if (ctx->poll_shrink) {
714 ctx->poll_ns /= ctx->poll_shrink;
715 } else {
716 ctx->poll_ns = 0;
717 }
718
719 trace_poll_shrink(ctx, old, ctx->poll_ns);
720 } else if (ctx->poll_ns < ctx->poll_max_ns &&
721 block_ns < ctx->poll_max_ns) {
722 /* There is room to grow, poll longer */
723 int64_t old = ctx->poll_ns;
724 int64_t grow = ctx->poll_grow;
725
726 if (grow == 0) {
727 grow = 2;
728 }
729
730 if (ctx->poll_ns) {
731 ctx->poll_ns *= grow;
732 } else {
733 ctx->poll_ns = 4000; /* start polling at 4 microseconds */
734 }
735
736 if (ctx->poll_ns > ctx->poll_max_ns) {
737 ctx->poll_ns = ctx->poll_max_ns;
738 }
739
740 trace_poll_grow(ctx, old, ctx->poll_ns);
741 }
742 }
743
744 /* if we have any readable fds, dispatch event */
745 if (ret > 0) {
746 for (i = 0; i < npfd; i++) {
747 nodes[i]->pfd.revents = pollfds[i].revents;
748 }
749 }
750
751 npfd = 0;
752
753 progress |= aio_bh_poll(ctx);
754
755 if (ret > 0) {
756 progress |= aio_dispatch_handlers(ctx);
757 }
758
759 aio_free_deleted_handlers(ctx);
760
761 qemu_lockcnt_dec(&ctx->list_lock);
762
763 progress |= timerlistgroup_run_timers(&ctx->tlg);
764
765 return progress;
766 }
767
768 void aio_context_setup(AioContext *ctx)
769 {
770 #ifdef CONFIG_EPOLL_CREATE1
771 assert(!ctx->epollfd);
772 ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
773 if (ctx->epollfd == -1) {
774 fprintf(stderr, "Failed to create epoll instance: %s", strerror(errno));
775 ctx->epoll_available = false;
776 } else {
777 ctx->epoll_available = true;
778 }
779 #endif
780 }
781
782 void aio_context_destroy(AioContext *ctx)
783 {
784 #ifdef CONFIG_EPOLL_CREATE1
785 aio_epoll_disable(ctx);
786 #endif
787 }
788
789 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
790 int64_t grow, int64_t shrink, Error **errp)
791 {
792 /* No thread synchronization here, it doesn't matter if an incorrect value
793 * is used once.
794 */
795 ctx->poll_max_ns = max_ns;
796 ctx->poll_ns = 0;
797 ctx->poll_grow = grow;
798 ctx->poll_shrink = shrink;
799
800 aio_notify(ctx);
801 }