]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/asio/detail/impl/epoll_reactor.ipp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / boost / asio / detail / impl / epoll_reactor.ipp
1 //
2 // detail/impl/epoll_reactor.ipp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
12 #define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
13
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18 #include <boost/asio/detail/config.hpp>
19
20 #if defined(BOOST_ASIO_HAS_EPOLL)
21
22 #include <cstddef>
23 #include <sys/epoll.h>
24 #include <boost/asio/detail/epoll_reactor.hpp>
25 #include <boost/asio/detail/throw_error.hpp>
26 #include <boost/asio/error.hpp>
27
28 #if defined(BOOST_ASIO_HAS_TIMERFD)
29 # include <sys/timerfd.h>
30 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
31
32 #include <boost/asio/detail/push_options.hpp>
33
34 namespace boost {
35 namespace asio {
36 namespace detail {
37
38 epoll_reactor::epoll_reactor(boost::asio::execution_context& ctx)
39 : execution_context_service_base<epoll_reactor>(ctx),
40 scheduler_(use_service<scheduler>(ctx)),
41 mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
42 REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
43 interrupter_(),
44 epoll_fd_(do_epoll_create()),
45 timer_fd_(do_timerfd_create()),
46 shutdown_(false),
47 registered_descriptors_mutex_(mutex_.enabled())
48 {
49 // Add the interrupter's descriptor to epoll.
50 epoll_event ev = { 0, { 0 } };
51 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
52 ev.data.ptr = &interrupter_;
53 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
54 interrupter_.interrupt();
55
56 // Add the timer descriptor to epoll.
57 if (timer_fd_ != -1)
58 {
59 ev.events = EPOLLIN | EPOLLERR;
60 ev.data.ptr = &timer_fd_;
61 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
62 }
63 }
64
65 epoll_reactor::~epoll_reactor()
66 {
67 if (epoll_fd_ != -1)
68 close(epoll_fd_);
69 if (timer_fd_ != -1)
70 close(timer_fd_);
71 }
72
73 void epoll_reactor::shutdown()
74 {
75 mutex::scoped_lock lock(mutex_);
76 shutdown_ = true;
77 lock.unlock();
78
79 op_queue<operation> ops;
80
81 while (descriptor_state* state = registered_descriptors_.first())
82 {
83 for (int i = 0; i < max_ops; ++i)
84 ops.push(state->op_queue_[i]);
85 state->shutdown_ = true;
86 registered_descriptors_.free(state);
87 }
88
89 timer_queues_.get_all_timers(ops);
90
91 scheduler_.abandon_operations(ops);
92 }
93
94 void epoll_reactor::notify_fork(
95 boost::asio::execution_context::fork_event fork_ev)
96 {
97 if (fork_ev == boost::asio::execution_context::fork_child)
98 {
99 if (epoll_fd_ != -1)
100 ::close(epoll_fd_);
101 epoll_fd_ = -1;
102 epoll_fd_ = do_epoll_create();
103
104 if (timer_fd_ != -1)
105 ::close(timer_fd_);
106 timer_fd_ = -1;
107 timer_fd_ = do_timerfd_create();
108
109 interrupter_.recreate();
110
111 // Add the interrupter's descriptor to epoll.
112 epoll_event ev = { 0, { 0 } };
113 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
114 ev.data.ptr = &interrupter_;
115 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
116 interrupter_.interrupt();
117
118 // Add the timer descriptor to epoll.
119 if (timer_fd_ != -1)
120 {
121 ev.events = EPOLLIN | EPOLLERR;
122 ev.data.ptr = &timer_fd_;
123 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
124 }
125
126 update_timeout();
127
128 // Re-register all descriptors with epoll.
129 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
130 for (descriptor_state* state = registered_descriptors_.first();
131 state != 0; state = state->next_)
132 {
133 ev.events = state->registered_events_;
134 ev.data.ptr = state;
135 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev);
136 if (result != 0)
137 {
138 boost::system::error_code ec(errno,
139 boost::asio::error::get_system_category());
140 boost::asio::detail::throw_error(ec, "epoll re-registration");
141 }
142 }
143 }
144 }
145
146 void epoll_reactor::init_task()
147 {
148 scheduler_.init_task();
149 }
150
151 int epoll_reactor::register_descriptor(socket_type descriptor,
152 epoll_reactor::per_descriptor_data& descriptor_data)
153 {
154 descriptor_data = allocate_descriptor_state();
155
156 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
157 context(), static_cast<uintmax_t>(descriptor),
158 reinterpret_cast<uintmax_t>(descriptor_data)));
159
160 {
161 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
162
163 descriptor_data->reactor_ = this;
164 descriptor_data->descriptor_ = descriptor;
165 descriptor_data->shutdown_ = false;
166 for (int i = 0; i < max_ops; ++i)
167 descriptor_data->try_speculative_[i] = true;
168 }
169
170 epoll_event ev = { 0, { 0 } };
171 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
172 descriptor_data->registered_events_ = ev.events;
173 ev.data.ptr = descriptor_data;
174 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
175 if (result != 0)
176 {
177 if (errno == EPERM)
178 {
179 // This file descriptor type is not supported by epoll. However, if it is
180 // a regular file then operations on it will not block. We will allow
181 // this descriptor to be used and fail later if an operation on it would
182 // otherwise require a trip through the reactor.
183 descriptor_data->registered_events_ = 0;
184 return 0;
185 }
186 return errno;
187 }
188
189 return 0;
190 }
191
192 int epoll_reactor::register_internal_descriptor(
193 int op_type, socket_type descriptor,
194 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
195 {
196 descriptor_data = allocate_descriptor_state();
197
198 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
199 context(), static_cast<uintmax_t>(descriptor),
200 reinterpret_cast<uintmax_t>(descriptor_data)));
201
202 {
203 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
204
205 descriptor_data->reactor_ = this;
206 descriptor_data->descriptor_ = descriptor;
207 descriptor_data->shutdown_ = false;
208 descriptor_data->op_queue_[op_type].push(op);
209 for (int i = 0; i < max_ops; ++i)
210 descriptor_data->try_speculative_[i] = true;
211 }
212
213 epoll_event ev = { 0, { 0 } };
214 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
215 descriptor_data->registered_events_ = ev.events;
216 ev.data.ptr = descriptor_data;
217 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
218 if (result != 0)
219 return errno;
220
221 return 0;
222 }
223
224 void epoll_reactor::move_descriptor(socket_type,
225 epoll_reactor::per_descriptor_data& target_descriptor_data,
226 epoll_reactor::per_descriptor_data& source_descriptor_data)
227 {
228 target_descriptor_data = source_descriptor_data;
229 source_descriptor_data = 0;
230 }
231
232 void epoll_reactor::start_op(int op_type, socket_type descriptor,
233 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
234 bool is_continuation, bool allow_speculative)
235 {
236 if (!descriptor_data)
237 {
238 op->ec_ = boost::asio::error::bad_descriptor;
239 post_immediate_completion(op, is_continuation);
240 return;
241 }
242
243 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
244
245 if (descriptor_data->shutdown_)
246 {
247 post_immediate_completion(op, is_continuation);
248 return;
249 }
250
251 if (descriptor_data->op_queue_[op_type].empty())
252 {
253 if (allow_speculative
254 && (op_type != read_op
255 || descriptor_data->op_queue_[except_op].empty()))
256 {
257 if (descriptor_data->try_speculative_[op_type])
258 {
259 if (reactor_op::status status = op->perform())
260 {
261 if (status == reactor_op::done_and_exhausted)
262 if (descriptor_data->registered_events_ != 0)
263 descriptor_data->try_speculative_[op_type] = false;
264 descriptor_lock.unlock();
265 scheduler_.post_immediate_completion(op, is_continuation);
266 return;
267 }
268 }
269
270 if (descriptor_data->registered_events_ == 0)
271 {
272 op->ec_ = boost::asio::error::operation_not_supported;
273 scheduler_.post_immediate_completion(op, is_continuation);
274 return;
275 }
276
277 if (op_type == write_op)
278 {
279 if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
280 {
281 epoll_event ev = { 0, { 0 } };
282 ev.events = descriptor_data->registered_events_ | EPOLLOUT;
283 ev.data.ptr = descriptor_data;
284 if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
285 {
286 descriptor_data->registered_events_ |= ev.events;
287 }
288 else
289 {
290 op->ec_ = boost::system::error_code(errno,
291 boost::asio::error::get_system_category());
292 scheduler_.post_immediate_completion(op, is_continuation);
293 return;
294 }
295 }
296 }
297 }
298 else if (descriptor_data->registered_events_ == 0)
299 {
300 op->ec_ = boost::asio::error::operation_not_supported;
301 scheduler_.post_immediate_completion(op, is_continuation);
302 return;
303 }
304 else
305 {
306 if (op_type == write_op)
307 {
308 descriptor_data->registered_events_ |= EPOLLOUT;
309 }
310
311 epoll_event ev = { 0, { 0 } };
312 ev.events = descriptor_data->registered_events_;
313 ev.data.ptr = descriptor_data;
314 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
315 }
316 }
317
318 descriptor_data->op_queue_[op_type].push(op);
319 scheduler_.work_started();
320 }
321
322 void epoll_reactor::cancel_ops(socket_type,
323 epoll_reactor::per_descriptor_data& descriptor_data)
324 {
325 if (!descriptor_data)
326 return;
327
328 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
329
330 op_queue<operation> ops;
331 for (int i = 0; i < max_ops; ++i)
332 {
333 while (reactor_op* op = descriptor_data->op_queue_[i].front())
334 {
335 op->ec_ = boost::asio::error::operation_aborted;
336 descriptor_data->op_queue_[i].pop();
337 ops.push(op);
338 }
339 }
340
341 descriptor_lock.unlock();
342
343 scheduler_.post_deferred_completions(ops);
344 }
345
346 void epoll_reactor::deregister_descriptor(socket_type descriptor,
347 epoll_reactor::per_descriptor_data& descriptor_data, bool closing)
348 {
349 if (!descriptor_data)
350 return;
351
352 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
353
354 if (!descriptor_data->shutdown_)
355 {
356 if (closing)
357 {
358 // The descriptor will be automatically removed from the epoll set when
359 // it is closed.
360 }
361 else if (descriptor_data->registered_events_ != 0)
362 {
363 epoll_event ev = { 0, { 0 } };
364 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
365 }
366
367 op_queue<operation> ops;
368 for (int i = 0; i < max_ops; ++i)
369 {
370 while (reactor_op* op = descriptor_data->op_queue_[i].front())
371 {
372 op->ec_ = boost::asio::error::operation_aborted;
373 descriptor_data->op_queue_[i].pop();
374 ops.push(op);
375 }
376 }
377
378 descriptor_data->descriptor_ = -1;
379 descriptor_data->shutdown_ = true;
380
381 descriptor_lock.unlock();
382
383 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
384 context(), static_cast<uintmax_t>(descriptor),
385 reinterpret_cast<uintmax_t>(descriptor_data)));
386
387 scheduler_.post_deferred_completions(ops);
388
389 // Leave descriptor_data set so that it will be freed by the subsequent
390 // call to cleanup_descriptor_data.
391 }
392 else
393 {
394 // We are shutting down, so prevent cleanup_descriptor_data from freeing
395 // the descriptor_data object and let the destructor free it instead.
396 descriptor_data = 0;
397 }
398 }
399
400 void epoll_reactor::deregister_internal_descriptor(socket_type descriptor,
401 epoll_reactor::per_descriptor_data& descriptor_data)
402 {
403 if (!descriptor_data)
404 return;
405
406 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
407
408 if (!descriptor_data->shutdown_)
409 {
410 epoll_event ev = { 0, { 0 } };
411 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
412
413 op_queue<operation> ops;
414 for (int i = 0; i < max_ops; ++i)
415 ops.push(descriptor_data->op_queue_[i]);
416
417 descriptor_data->descriptor_ = -1;
418 descriptor_data->shutdown_ = true;
419
420 descriptor_lock.unlock();
421
422 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
423 context(), static_cast<uintmax_t>(descriptor),
424 reinterpret_cast<uintmax_t>(descriptor_data)));
425
426 // Leave descriptor_data set so that it will be freed by the subsequent
427 // call to cleanup_descriptor_data.
428 }
429 else
430 {
431 // We are shutting down, so prevent cleanup_descriptor_data from freeing
432 // the descriptor_data object and let the destructor free it instead.
433 descriptor_data = 0;
434 }
435 }
436
437 void epoll_reactor::cleanup_descriptor_data(
438 per_descriptor_data& descriptor_data)
439 {
440 if (descriptor_data)
441 {
442 free_descriptor_state(descriptor_data);
443 descriptor_data = 0;
444 }
445 }
446
447 void epoll_reactor::run(long usec, op_queue<operation>& ops)
448 {
449 // This code relies on the fact that the scheduler queues the reactor task
450 // behind all descriptor operations generated by this function. This means,
451 // that by the time we reach this point, any previously returned descriptor
452 // operations have already been dequeued. Therefore it is now safe for us to
453 // reuse and return them for the scheduler to queue again.
454
455 // Calculate timeout. Check the timer queues only if timerfd is not in use.
456 int timeout;
457 if (usec == 0)
458 timeout = 0;
459 else
460 {
461 timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
462 if (timer_fd_ == -1)
463 {
464 mutex::scoped_lock lock(mutex_);
465 timeout = get_timeout(timeout);
466 }
467 }
468
469 // Block on the epoll descriptor.
470 epoll_event events[128];
471 int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
472
473 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
474 // Trace the waiting events.
475 for (int i = 0; i < num_events; ++i)
476 {
477 void* ptr = events[i].data.ptr;
478 if (ptr == &interrupter_)
479 {
480 // Ignore.
481 }
482 # if defined(BOOST_ASIO_HAS_TIMERFD)
483 else if (ptr == &timer_fd_)
484 {
485 // Ignore.
486 }
487 # endif // defined(BOOST_ASIO_HAS_TIMERFD)
488 else
489 {
490 unsigned event_mask = 0;
491 if ((events[i].events & EPOLLIN) != 0)
492 event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
493 if ((events[i].events & EPOLLOUT))
494 event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
495 if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0)
496 event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
497 BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
498 reinterpret_cast<uintmax_t>(ptr), event_mask));
499 }
500 }
501 #endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
502
503 #if defined(BOOST_ASIO_HAS_TIMERFD)
504 bool check_timers = (timer_fd_ == -1);
505 #else // defined(BOOST_ASIO_HAS_TIMERFD)
506 bool check_timers = true;
507 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
508
509 // Dispatch the waiting events.
510 for (int i = 0; i < num_events; ++i)
511 {
512 void* ptr = events[i].data.ptr;
513 if (ptr == &interrupter_)
514 {
515 // No need to reset the interrupter since we're leaving the descriptor
516 // in a ready-to-read state and relying on edge-triggered notifications
517 // to make it so that we only get woken up when the descriptor's epoll
518 // registration is updated.
519
520 #if defined(BOOST_ASIO_HAS_TIMERFD)
521 if (timer_fd_ == -1)
522 check_timers = true;
523 #else // defined(BOOST_ASIO_HAS_TIMERFD)
524 check_timers = true;
525 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
526 }
527 #if defined(BOOST_ASIO_HAS_TIMERFD)
528 else if (ptr == &timer_fd_)
529 {
530 check_timers = true;
531 }
532 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
533 else
534 {
535 // The descriptor operation doesn't count as work in and of itself, so we
536 // don't call work_started() here. This still allows the scheduler to
537 // stop if the only remaining operations are descriptor operations.
538 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
539 if (!ops.is_enqueued(descriptor_data))
540 {
541 descriptor_data->set_ready_events(events[i].events);
542 ops.push(descriptor_data);
543 }
544 else
545 {
546 descriptor_data->add_ready_events(events[i].events);
547 }
548 }
549 }
550
551 if (check_timers)
552 {
553 mutex::scoped_lock common_lock(mutex_);
554 timer_queues_.get_ready_timers(ops);
555
556 #if defined(BOOST_ASIO_HAS_TIMERFD)
557 if (timer_fd_ != -1)
558 {
559 itimerspec new_timeout;
560 itimerspec old_timeout;
561 int flags = get_timeout(new_timeout);
562 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
563 }
564 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
565 }
566 }
567
568 void epoll_reactor::interrupt()
569 {
570 epoll_event ev = { 0, { 0 } };
571 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
572 ev.data.ptr = &interrupter_;
573 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
574 }
575
576 int epoll_reactor::do_epoll_create()
577 {
578 #if defined(EPOLL_CLOEXEC)
579 int fd = epoll_create1(EPOLL_CLOEXEC);
580 #else // defined(EPOLL_CLOEXEC)
581 int fd = -1;
582 errno = EINVAL;
583 #endif // defined(EPOLL_CLOEXEC)
584
585 if (fd == -1 && (errno == EINVAL || errno == ENOSYS))
586 {
587 fd = epoll_create(epoll_size);
588 if (fd != -1)
589 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
590 }
591
592 if (fd == -1)
593 {
594 boost::system::error_code ec(errno,
595 boost::asio::error::get_system_category());
596 boost::asio::detail::throw_error(ec, "epoll");
597 }
598
599 return fd;
600 }
601
602 int epoll_reactor::do_timerfd_create()
603 {
604 #if defined(BOOST_ASIO_HAS_TIMERFD)
605 # if defined(TFD_CLOEXEC)
606 int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
607 # else // defined(TFD_CLOEXEC)
608 int fd = -1;
609 errno = EINVAL;
610 # endif // defined(TFD_CLOEXEC)
611
612 if (fd == -1 && errno == EINVAL)
613 {
614 fd = timerfd_create(CLOCK_MONOTONIC, 0);
615 if (fd != -1)
616 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
617 }
618
619 return fd;
620 #else // defined(BOOST_ASIO_HAS_TIMERFD)
621 return -1;
622 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
623 }
624
625 epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
626 {
627 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
628 return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
629 REACTOR_IO, scheduler_.concurrency_hint()));
630 }
631
632 void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s)
633 {
634 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
635 registered_descriptors_.free(s);
636 }
637
638 void epoll_reactor::do_add_timer_queue(timer_queue_base& queue)
639 {
640 mutex::scoped_lock lock(mutex_);
641 timer_queues_.insert(&queue);
642 }
643
644 void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue)
645 {
646 mutex::scoped_lock lock(mutex_);
647 timer_queues_.erase(&queue);
648 }
649
650 void epoll_reactor::update_timeout()
651 {
652 #if defined(BOOST_ASIO_HAS_TIMERFD)
653 if (timer_fd_ != -1)
654 {
655 itimerspec new_timeout;
656 itimerspec old_timeout;
657 int flags = get_timeout(new_timeout);
658 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
659 return;
660 }
661 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
662 interrupt();
663 }
664
665 int epoll_reactor::get_timeout(int msec)
666 {
667 // By default we will wait no longer than 5 minutes. This will ensure that
668 // any changes to the system clock are detected after no longer than this.
669 const int max_msec = 5 * 60 * 1000;
670 return timer_queues_.wait_duration_msec(
671 (msec < 0 || max_msec < msec) ? max_msec : msec);
672 }
673
674 #if defined(BOOST_ASIO_HAS_TIMERFD)
675 int epoll_reactor::get_timeout(itimerspec& ts)
676 {
677 ts.it_interval.tv_sec = 0;
678 ts.it_interval.tv_nsec = 0;
679
680 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
681 ts.it_value.tv_sec = usec / 1000000;
682 ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
683
684 return usec ? 0 : TFD_TIMER_ABSTIME;
685 }
686 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
687
688 struct epoll_reactor::perform_io_cleanup_on_block_exit
689 {
690 explicit perform_io_cleanup_on_block_exit(epoll_reactor* r)
691 : reactor_(r), first_op_(0)
692 {
693 }
694
695 ~perform_io_cleanup_on_block_exit()
696 {
697 if (first_op_)
698 {
699 // Post the remaining completed operations for invocation.
700 if (!ops_.empty())
701 reactor_->scheduler_.post_deferred_completions(ops_);
702
703 // A user-initiated operation has completed, but there's no need to
704 // explicitly call work_finished() here. Instead, we'll take advantage of
705 // the fact that the scheduler will call work_finished() once we return.
706 }
707 else
708 {
709 // No user-initiated operations have completed, so we need to compensate
710 // for the work_finished() call that the scheduler will make once this
711 // operation returns.
712 reactor_->scheduler_.compensating_work_started();
713 }
714 }
715
716 epoll_reactor* reactor_;
717 op_queue<operation> ops_;
718 operation* first_op_;
719 };
720
721 epoll_reactor::descriptor_state::descriptor_state(bool locking)
722 : operation(&epoll_reactor::descriptor_state::do_complete),
723 mutex_(locking)
724 {
725 }
726
727 operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
728 {
729 mutex_.lock();
730 perform_io_cleanup_on_block_exit io_cleanup(reactor_);
731 mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
732
733 // Exception operations must be processed first to ensure that any
734 // out-of-band data is read before normal data.
735 static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
736 for (int j = max_ops - 1; j >= 0; --j)
737 {
738 if (events & (flag[j] | EPOLLERR | EPOLLHUP))
739 {
740 try_speculative_[j] = true;
741 while (reactor_op* op = op_queue_[j].front())
742 {
743 if (reactor_op::status status = op->perform())
744 {
745 op_queue_[j].pop();
746 io_cleanup.ops_.push(op);
747 if (status == reactor_op::done_and_exhausted)
748 {
749 try_speculative_[j] = false;
750 break;
751 }
752 }
753 else
754 break;
755 }
756 }
757 }
758
759 // The first operation will be returned for completion now. The others will
760 // be posted for later by the io_cleanup object's destructor.
761 io_cleanup.first_op_ = io_cleanup.ops_.front();
762 io_cleanup.ops_.pop();
763 return io_cleanup.first_op_;
764 }
765
766 void epoll_reactor::descriptor_state::do_complete(
767 void* owner, operation* base,
768 const boost::system::error_code& ec, std::size_t bytes_transferred)
769 {
770 if (owner)
771 {
772 descriptor_state* descriptor_data = static_cast<descriptor_state*>(base);
773 uint32_t events = static_cast<uint32_t>(bytes_transferred);
774 if (operation* op = descriptor_data->perform_io(events))
775 {
776 op->complete(owner, ec, 0);
777 }
778 }
779 }
780
781 } // namespace detail
782 } // namespace asio
783 } // namespace boost
784
785 #include <boost/asio/detail/pop_options.hpp>
786
787 #endif // defined(BOOST_ASIO_HAS_EPOLL)
788
789 #endif // BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP