2 // detail/impl/epoll_reactor.ipp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
11 #ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
12 #define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18 #include <boost/asio/detail/config.hpp>
20 #if defined(BOOST_ASIO_HAS_EPOLL)
23 #include <sys/epoll.h>
24 #include <boost/asio/detail/epoll_reactor.hpp>
25 #include <boost/asio/detail/throw_error.hpp>
26 #include <boost/asio/error.hpp>
28 #if defined(BOOST_ASIO_HAS_TIMERFD)
29 # include <sys/timerfd.h>
30 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
32 #include <boost/asio/detail/push_options.hpp>
38 epoll_reactor::epoll_reactor(boost::asio::execution_context& ctx)
39 : execution_context_service_base<epoll_reactor>(ctx),
40 scheduler_(use_service<scheduler>(ctx)),
41 mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
42 REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
44 epoll_fd_(do_epoll_create()),
45 timer_fd_(do_timerfd_create()),
47 registered_descriptors_mutex_(mutex_.enabled())
49 // Add the interrupter's descriptor to epoll.
50 epoll_event ev = { 0, { 0 } };
51 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
52 ev.data.ptr = &interrupter_;
53 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
54 interrupter_.interrupt();
56 // Add the timer descriptor to epoll.
59 ev.events = EPOLLIN | EPOLLERR;
60 ev.data.ptr = &timer_fd_;
61 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
65 epoll_reactor::~epoll_reactor()
73 void epoll_reactor::shutdown()
75 mutex::scoped_lock lock(mutex_);
79 op_queue<operation> ops;
81 while (descriptor_state* state = registered_descriptors_.first())
83 for (int i = 0; i < max_ops; ++i)
84 ops.push(state->op_queue_[i]);
85 state->shutdown_ = true;
86 registered_descriptors_.free(state);
89 timer_queues_.get_all_timers(ops);
91 scheduler_.abandon_operations(ops);
94 void epoll_reactor::notify_fork(
95 boost::asio::execution_context::fork_event fork_ev)
97 if (fork_ev == boost::asio::execution_context::fork_child)
102 epoll_fd_ = do_epoll_create();
107 timer_fd_ = do_timerfd_create();
109 interrupter_.recreate();
111 // Add the interrupter's descriptor to epoll.
112 epoll_event ev = { 0, { 0 } };
113 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
114 ev.data.ptr = &interrupter_;
115 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
116 interrupter_.interrupt();
118 // Add the timer descriptor to epoll.
121 ev.events = EPOLLIN | EPOLLERR;
122 ev.data.ptr = &timer_fd_;
123 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
128 // Re-register all descriptors with epoll.
129 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
130 for (descriptor_state* state = registered_descriptors_.first();
131 state != 0; state = state->next_)
133 ev.events = state->registered_events_;
135 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev);
138 boost::system::error_code ec(errno,
139 boost::asio::error::get_system_category());
140 boost::asio::detail::throw_error(ec, "epoll re-registration");
146 void epoll_reactor::init_task()
148 scheduler_.init_task();
151 int epoll_reactor::register_descriptor(socket_type descriptor,
152 epoll_reactor::per_descriptor_data& descriptor_data)
154 descriptor_data = allocate_descriptor_state();
156 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
157 context(), static_cast<uintmax_t>(descriptor),
158 reinterpret_cast<uintmax_t>(descriptor_data)));
161 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
163 descriptor_data->reactor_ = this;
164 descriptor_data->descriptor_ = descriptor;
165 descriptor_data->shutdown_ = false;
166 for (int i = 0; i < max_ops; ++i)
167 descriptor_data->try_speculative_[i] = true;
170 epoll_event ev = { 0, { 0 } };
171 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
172 descriptor_data->registered_events_ = ev.events;
173 ev.data.ptr = descriptor_data;
174 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
179 // This file descriptor type is not supported by epoll. However, if it is
180 // a regular file then operations on it will not block. We will allow
181 // this descriptor to be used and fail later if an operation on it would
182 // otherwise require a trip through the reactor.
183 descriptor_data->registered_events_ = 0;
192 int epoll_reactor::register_internal_descriptor(
193 int op_type, socket_type descriptor,
194 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
196 descriptor_data = allocate_descriptor_state();
198 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
199 context(), static_cast<uintmax_t>(descriptor),
200 reinterpret_cast<uintmax_t>(descriptor_data)));
203 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
205 descriptor_data->reactor_ = this;
206 descriptor_data->descriptor_ = descriptor;
207 descriptor_data->shutdown_ = false;
208 descriptor_data->op_queue_[op_type].push(op);
209 for (int i = 0; i < max_ops; ++i)
210 descriptor_data->try_speculative_[i] = true;
213 epoll_event ev = { 0, { 0 } };
214 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
215 descriptor_data->registered_events_ = ev.events;
216 ev.data.ptr = descriptor_data;
217 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
224 void epoll_reactor::move_descriptor(socket_type,
225 epoll_reactor::per_descriptor_data& target_descriptor_data,
226 epoll_reactor::per_descriptor_data& source_descriptor_data)
228 target_descriptor_data = source_descriptor_data;
229 source_descriptor_data = 0;
232 void epoll_reactor::start_op(int op_type, socket_type descriptor,
233 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
234 bool is_continuation, bool allow_speculative)
236 if (!descriptor_data)
238 op->ec_ = boost::asio::error::bad_descriptor;
239 post_immediate_completion(op, is_continuation);
243 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
245 if (descriptor_data->shutdown_)
247 post_immediate_completion(op, is_continuation);
251 if (descriptor_data->op_queue_[op_type].empty())
253 if (allow_speculative
254 && (op_type != read_op
255 || descriptor_data->op_queue_[except_op].empty()))
257 if (descriptor_data->try_speculative_[op_type])
259 if (reactor_op::status status = op->perform())
261 if (status == reactor_op::done_and_exhausted)
262 if (descriptor_data->registered_events_ != 0)
263 descriptor_data->try_speculative_[op_type] = false;
264 descriptor_lock.unlock();
265 scheduler_.post_immediate_completion(op, is_continuation);
270 if (descriptor_data->registered_events_ == 0)
272 op->ec_ = boost::asio::error::operation_not_supported;
273 scheduler_.post_immediate_completion(op, is_continuation);
277 if (op_type == write_op)
279 if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
281 epoll_event ev = { 0, { 0 } };
282 ev.events = descriptor_data->registered_events_ | EPOLLOUT;
283 ev.data.ptr = descriptor_data;
284 if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
286 descriptor_data->registered_events_ |= ev.events;
290 op->ec_ = boost::system::error_code(errno,
291 boost::asio::error::get_system_category());
292 scheduler_.post_immediate_completion(op, is_continuation);
298 else if (descriptor_data->registered_events_ == 0)
300 op->ec_ = boost::asio::error::operation_not_supported;
301 scheduler_.post_immediate_completion(op, is_continuation);
306 if (op_type == write_op)
308 descriptor_data->registered_events_ |= EPOLLOUT;
311 epoll_event ev = { 0, { 0 } };
312 ev.events = descriptor_data->registered_events_;
313 ev.data.ptr = descriptor_data;
314 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
318 descriptor_data->op_queue_[op_type].push(op);
319 scheduler_.work_started();
322 void epoll_reactor::cancel_ops(socket_type,
323 epoll_reactor::per_descriptor_data& descriptor_data)
325 if (!descriptor_data)
328 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
330 op_queue<operation> ops;
331 for (int i = 0; i < max_ops; ++i)
333 while (reactor_op* op = descriptor_data->op_queue_[i].front())
335 op->ec_ = boost::asio::error::operation_aborted;
336 descriptor_data->op_queue_[i].pop();
341 descriptor_lock.unlock();
343 scheduler_.post_deferred_completions(ops);
346 void epoll_reactor::deregister_descriptor(socket_type descriptor,
347 epoll_reactor::per_descriptor_data& descriptor_data, bool closing)
349 if (!descriptor_data)
352 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
354 if (!descriptor_data->shutdown_)
358 // The descriptor will be automatically removed from the epoll set when
361 else if (descriptor_data->registered_events_ != 0)
363 epoll_event ev = { 0, { 0 } };
364 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
367 op_queue<operation> ops;
368 for (int i = 0; i < max_ops; ++i)
370 while (reactor_op* op = descriptor_data->op_queue_[i].front())
372 op->ec_ = boost::asio::error::operation_aborted;
373 descriptor_data->op_queue_[i].pop();
378 descriptor_data->descriptor_ = -1;
379 descriptor_data->shutdown_ = true;
381 descriptor_lock.unlock();
383 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
384 context(), static_cast<uintmax_t>(descriptor),
385 reinterpret_cast<uintmax_t>(descriptor_data)));
387 scheduler_.post_deferred_completions(ops);
389 // Leave descriptor_data set so that it will be freed by the subsequent
390 // call to cleanup_descriptor_data.
394 // We are shutting down, so prevent cleanup_descriptor_data from freeing
395 // the descriptor_data object and let the destructor free it instead.
400 void epoll_reactor::deregister_internal_descriptor(socket_type descriptor,
401 epoll_reactor::per_descriptor_data& descriptor_data)
403 if (!descriptor_data)
406 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
408 if (!descriptor_data->shutdown_)
410 epoll_event ev = { 0, { 0 } };
411 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
413 op_queue<operation> ops;
414 for (int i = 0; i < max_ops; ++i)
415 ops.push(descriptor_data->op_queue_[i]);
417 descriptor_data->descriptor_ = -1;
418 descriptor_data->shutdown_ = true;
420 descriptor_lock.unlock();
422 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
423 context(), static_cast<uintmax_t>(descriptor),
424 reinterpret_cast<uintmax_t>(descriptor_data)));
426 // Leave descriptor_data set so that it will be freed by the subsequent
427 // call to cleanup_descriptor_data.
431 // We are shutting down, so prevent cleanup_descriptor_data from freeing
432 // the descriptor_data object and let the destructor free it instead.
437 void epoll_reactor::cleanup_descriptor_data(
438 per_descriptor_data& descriptor_data)
442 free_descriptor_state(descriptor_data);
447 void epoll_reactor::run(long usec, op_queue<operation>& ops)
449 // This code relies on the fact that the scheduler queues the reactor task
450 // behind all descriptor operations generated by this function. This means,
451 // that by the time we reach this point, any previously returned descriptor
452 // operations have already been dequeued. Therefore it is now safe for us to
453 // reuse and return them for the scheduler to queue again.
455 // Calculate timeout. Check the timer queues only if timerfd is not in use.
461 timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
464 mutex::scoped_lock lock(mutex_);
465 timeout = get_timeout(timeout);
469 // Block on the epoll descriptor.
470 epoll_event events[128];
471 int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
473 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
474 // Trace the waiting events.
475 for (int i = 0; i < num_events; ++i)
477 void* ptr = events[i].data.ptr;
478 if (ptr == &interrupter_)
482 # if defined(BOOST_ASIO_HAS_TIMERFD)
483 else if (ptr == &timer_fd_)
487 # endif // defined(BOOST_ASIO_HAS_TIMERFD)
490 unsigned event_mask = 0;
491 if ((events[i].events & EPOLLIN) != 0)
492 event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
493 if ((events[i].events & EPOLLOUT))
494 event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
495 if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0)
496 event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
497 BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
498 reinterpret_cast<uintmax_t>(ptr), event_mask));
501 #endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
503 #if defined(BOOST_ASIO_HAS_TIMERFD)
504 bool check_timers = (timer_fd_ == -1);
505 #else // defined(BOOST_ASIO_HAS_TIMERFD)
506 bool check_timers = true;
507 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
509 // Dispatch the waiting events.
510 for (int i = 0; i < num_events; ++i)
512 void* ptr = events[i].data.ptr;
513 if (ptr == &interrupter_)
515 // No need to reset the interrupter since we're leaving the descriptor
516 // in a ready-to-read state and relying on edge-triggered notifications
517 // to make it so that we only get woken up when the descriptor's epoll
518 // registration is updated.
520 #if defined(BOOST_ASIO_HAS_TIMERFD)
523 #else // defined(BOOST_ASIO_HAS_TIMERFD)
525 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
527 #if defined(BOOST_ASIO_HAS_TIMERFD)
528 else if (ptr == &timer_fd_)
532 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
535 // The descriptor operation doesn't count as work in and of itself, so we
536 // don't call work_started() here. This still allows the scheduler to
537 // stop if the only remaining operations are descriptor operations.
538 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
539 if (!ops.is_enqueued(descriptor_data))
541 descriptor_data->set_ready_events(events[i].events);
542 ops.push(descriptor_data);
546 descriptor_data->add_ready_events(events[i].events);
553 mutex::scoped_lock common_lock(mutex_);
554 timer_queues_.get_ready_timers(ops);
556 #if defined(BOOST_ASIO_HAS_TIMERFD)
559 itimerspec new_timeout;
560 itimerspec old_timeout;
561 int flags = get_timeout(new_timeout);
562 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
564 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
568 void epoll_reactor::interrupt()
570 epoll_event ev = { 0, { 0 } };
571 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
572 ev.data.ptr = &interrupter_;
573 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
576 int epoll_reactor::do_epoll_create()
578 #if defined(EPOLL_CLOEXEC)
579 int fd = epoll_create1(EPOLL_CLOEXEC);
580 #else // defined(EPOLL_CLOEXEC)
583 #endif // defined(EPOLL_CLOEXEC)
585 if (fd == -1 && (errno == EINVAL || errno == ENOSYS))
587 fd = epoll_create(epoll_size);
589 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
594 boost::system::error_code ec(errno,
595 boost::asio::error::get_system_category());
596 boost::asio::detail::throw_error(ec, "epoll");
602 int epoll_reactor::do_timerfd_create()
604 #if defined(BOOST_ASIO_HAS_TIMERFD)
605 # if defined(TFD_CLOEXEC)
606 int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
607 # else // defined(TFD_CLOEXEC)
610 # endif // defined(TFD_CLOEXEC)
612 if (fd == -1 && errno == EINVAL)
614 fd = timerfd_create(CLOCK_MONOTONIC, 0);
616 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
620 #else // defined(BOOST_ASIO_HAS_TIMERFD)
622 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
625 epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
627 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
628 return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
629 REACTOR_IO, scheduler_.concurrency_hint()));
632 void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s)
634 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
635 registered_descriptors_.free(s);
638 void epoll_reactor::do_add_timer_queue(timer_queue_base& queue)
640 mutex::scoped_lock lock(mutex_);
641 timer_queues_.insert(&queue);
644 void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue)
646 mutex::scoped_lock lock(mutex_);
647 timer_queues_.erase(&queue);
650 void epoll_reactor::update_timeout()
652 #if defined(BOOST_ASIO_HAS_TIMERFD)
655 itimerspec new_timeout;
656 itimerspec old_timeout;
657 int flags = get_timeout(new_timeout);
658 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
661 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
665 int epoll_reactor::get_timeout(int msec)
667 // By default we will wait no longer than 5 minutes. This will ensure that
668 // any changes to the system clock are detected after no longer than this.
669 const int max_msec = 5 * 60 * 1000;
670 return timer_queues_.wait_duration_msec(
671 (msec < 0 || max_msec < msec) ? max_msec : msec);
674 #if defined(BOOST_ASIO_HAS_TIMERFD)
675 int epoll_reactor::get_timeout(itimerspec& ts)
677 ts.it_interval.tv_sec = 0;
678 ts.it_interval.tv_nsec = 0;
680 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
681 ts.it_value.tv_sec = usec / 1000000;
682 ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
684 return usec ? 0 : TFD_TIMER_ABSTIME;
686 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
688 struct epoll_reactor::perform_io_cleanup_on_block_exit
690 explicit perform_io_cleanup_on_block_exit(epoll_reactor* r)
691 : reactor_(r), first_op_(0)
695 ~perform_io_cleanup_on_block_exit()
699 // Post the remaining completed operations for invocation.
701 reactor_->scheduler_.post_deferred_completions(ops_);
703 // A user-initiated operation has completed, but there's no need to
704 // explicitly call work_finished() here. Instead, we'll take advantage of
705 // the fact that the scheduler will call work_finished() once we return.
709 // No user-initiated operations have completed, so we need to compensate
710 // for the work_finished() call that the scheduler will make once this
711 // operation returns.
712 reactor_->scheduler_.compensating_work_started();
716 epoll_reactor* reactor_;
717 op_queue<operation> ops_;
718 operation* first_op_;
721 epoll_reactor::descriptor_state::descriptor_state(bool locking)
722 : operation(&epoll_reactor::descriptor_state::do_complete),
727 operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
730 perform_io_cleanup_on_block_exit io_cleanup(reactor_);
731 mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
733 // Exception operations must be processed first to ensure that any
734 // out-of-band data is read before normal data.
735 static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
736 for (int j = max_ops - 1; j >= 0; --j)
738 if (events & (flag[j] | EPOLLERR | EPOLLHUP))
740 try_speculative_[j] = true;
741 while (reactor_op* op = op_queue_[j].front())
743 if (reactor_op::status status = op->perform())
746 io_cleanup.ops_.push(op);
747 if (status == reactor_op::done_and_exhausted)
749 try_speculative_[j] = false;
759 // The first operation will be returned for completion now. The others will
760 // be posted for later by the io_cleanup object's destructor.
761 io_cleanup.first_op_ = io_cleanup.ops_.front();
762 io_cleanup.ops_.pop();
763 return io_cleanup.first_op_;
766 void epoll_reactor::descriptor_state::do_complete(
767 void* owner, operation* base,
768 const boost::system::error_code& ec, std::size_t bytes_transferred)
772 descriptor_state* descriptor_data = static_cast<descriptor_state*>(base);
773 uint32_t events = static_cast<uint32_t>(bytes_transferred);
774 if (operation* op = descriptor_data->perform_io(events))
776 op->complete(owner, ec, 0);
781 } // namespace detail
785 #include <boost/asio/detail/pop_options.hpp>
787 #endif // defined(BOOST_ASIO_HAS_EPOLL)
789 #endif // BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP