2 // detail/impl/kqueue_reactor.ipp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 // Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
8 // Distributed under the Boost Software License, Version 1.0. (See accompanying
9 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
12 #ifndef BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
13 #define BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
15 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
17 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
19 #include <boost/asio/detail/config.hpp>
21 #if defined(BOOST_ASIO_HAS_KQUEUE)
23 #include <boost/asio/detail/kqueue_reactor.hpp>
24 #include <boost/asio/detail/throw_error.hpp>
25 #include <boost/asio/error.hpp>
27 #include <boost/asio/detail/push_options.hpp>
29 #if defined(__NetBSD__)
30 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
31 EV_SET(ev, ident, filt, flags, fflags, data, \
32 reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
34 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
35 EV_SET(ev, ident, filt, flags, fflags, data, udata)
42 kqueue_reactor::kqueue_reactor(boost::asio::io_service& io_service)
43 : boost::asio::detail::service_base<kqueue_reactor>(io_service),
44 io_service_(use_service<io_service_impl>(io_service)),
46 kqueue_fd_(do_kqueue_create()),
50 struct kevent events[1];
51 BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
52 EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
53 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
55 boost::system::error_code error(errno,
56 boost::asio::error::get_system_category());
57 boost::asio::detail::throw_error(error);
61 kqueue_reactor::~kqueue_reactor()
66 void kqueue_reactor::shutdown_service()
68 mutex::scoped_lock lock(mutex_);
72 op_queue<operation> ops;
74 while (descriptor_state* state = registered_descriptors_.first())
76 for (int i = 0; i < max_ops; ++i)
77 ops.push(state->op_queue_[i]);
78 state->shutdown_ = true;
79 registered_descriptors_.free(state);
82 timer_queues_.get_all_timers(ops);
84 io_service_.abandon_operations(ops);
87 void kqueue_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
89 if (fork_ev == boost::asio::io_service::fork_child)
91 // The kqueue descriptor is automatically closed in the child.
93 kqueue_fd_ = do_kqueue_create();
95 interrupter_.recreate();
97 struct kevent events[2];
98 BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
99 EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
100 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
102 boost::system::error_code ec(errno,
103 boost::asio::error::get_system_category());
104 boost::asio::detail::throw_error(ec, "kqueue interrupter registration");
107 // Re-register all descriptors with kqueue.
108 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
109 for (descriptor_state* state = registered_descriptors_.first();
110 state != 0; state = state->next_)
112 if (state->num_kevents_ > 0)
114 BOOST_ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
115 EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
116 BOOST_ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
117 EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
118 if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
120 boost::system::error_code ec(errno,
121 boost::asio::error::get_system_category());
122 boost::asio::detail::throw_error(ec, "kqueue re-registration");
129 void kqueue_reactor::init_task()
131 io_service_.init_task();
134 int kqueue_reactor::register_descriptor(socket_type descriptor,
135 kqueue_reactor::per_descriptor_data& descriptor_data)
137 descriptor_data = allocate_descriptor_state();
139 mutex::scoped_lock lock(descriptor_data->mutex_);
141 descriptor_data->descriptor_ = descriptor;
142 descriptor_data->num_kevents_ = 0;
143 descriptor_data->shutdown_ = false;
148 int kqueue_reactor::register_internal_descriptor(
149 int op_type, socket_type descriptor,
150 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
152 descriptor_data = allocate_descriptor_state();
154 mutex::scoped_lock lock(descriptor_data->mutex_);
156 descriptor_data->descriptor_ = descriptor;
157 descriptor_data->num_kevents_ = 1;
158 descriptor_data->shutdown_ = false;
159 descriptor_data->op_queue_[op_type].push(op);
161 struct kevent events[1];
162 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
163 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
164 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
170 void kqueue_reactor::move_descriptor(socket_type,
171 kqueue_reactor::per_descriptor_data& target_descriptor_data,
172 kqueue_reactor::per_descriptor_data& source_descriptor_data)
174 target_descriptor_data = source_descriptor_data;
175 source_descriptor_data = 0;
178 void kqueue_reactor::start_op(int op_type, socket_type descriptor,
179 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
180 bool is_continuation, bool allow_speculative)
182 if (!descriptor_data)
184 op->ec_ = boost::asio::error::bad_descriptor;
185 post_immediate_completion(op, is_continuation);
189 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
191 if (descriptor_data->shutdown_)
193 post_immediate_completion(op, is_continuation);
197 if (descriptor_data->op_queue_[op_type].empty())
199 static const int num_kevents[max_ops] = { 1, 2, 1 };
201 if (allow_speculative
202 && (op_type != read_op
203 || descriptor_data->op_queue_[except_op].empty()))
207 descriptor_lock.unlock();
208 io_service_.post_immediate_completion(op, is_continuation);
212 if (descriptor_data->num_kevents_ < num_kevents[op_type])
214 struct kevent events[2];
215 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
216 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
217 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
218 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
219 if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
221 descriptor_data->num_kevents_ = num_kevents[op_type];
225 op->ec_ = boost::system::error_code(errno,
226 boost::asio::error::get_system_category());
227 io_service_.post_immediate_completion(op, is_continuation);
234 if (descriptor_data->num_kevents_ < num_kevents[op_type])
235 descriptor_data->num_kevents_ = num_kevents[op_type];
237 struct kevent events[2];
238 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
239 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
240 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
241 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
242 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
246 descriptor_data->op_queue_[op_type].push(op);
247 io_service_.work_started();
250 void kqueue_reactor::cancel_ops(socket_type,
251 kqueue_reactor::per_descriptor_data& descriptor_data)
253 if (!descriptor_data)
256 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
258 op_queue<operation> ops;
259 for (int i = 0; i < max_ops; ++i)
261 while (reactor_op* op = descriptor_data->op_queue_[i].front())
263 op->ec_ = boost::asio::error::operation_aborted;
264 descriptor_data->op_queue_[i].pop();
269 descriptor_lock.unlock();
271 io_service_.post_deferred_completions(ops);
274 void kqueue_reactor::deregister_descriptor(socket_type descriptor,
275 kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
277 if (!descriptor_data)
280 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
282 if (!descriptor_data->shutdown_)
286 // The descriptor will be automatically removed from the kqueue when it
291 struct kevent events[2];
292 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
293 EVFILT_READ, EV_DELETE, 0, 0, 0);
294 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
295 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
296 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
299 op_queue<operation> ops;
300 for (int i = 0; i < max_ops; ++i)
302 while (reactor_op* op = descriptor_data->op_queue_[i].front())
304 op->ec_ = boost::asio::error::operation_aborted;
305 descriptor_data->op_queue_[i].pop();
310 descriptor_data->descriptor_ = -1;
311 descriptor_data->shutdown_ = true;
313 descriptor_lock.unlock();
315 free_descriptor_state(descriptor_data);
318 io_service_.post_deferred_completions(ops);
322 void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
323 kqueue_reactor::per_descriptor_data& descriptor_data)
325 if (!descriptor_data)
328 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
330 if (!descriptor_data->shutdown_)
332 struct kevent events[2];
333 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
334 EVFILT_READ, EV_DELETE, 0, 0, 0);
335 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
336 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
337 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
339 op_queue<operation> ops;
340 for (int i = 0; i < max_ops; ++i)
341 ops.push(descriptor_data->op_queue_[i]);
343 descriptor_data->descriptor_ = -1;
344 descriptor_data->shutdown_ = true;
346 descriptor_lock.unlock();
348 free_descriptor_state(descriptor_data);
353 void kqueue_reactor::run(bool block, op_queue<operation>& ops)
355 mutex::scoped_lock lock(mutex_);
357 // Determine how long to block while waiting for events.
358 timespec timeout_buf = { 0, 0 };
359 timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
363 // Block on the kqueue descriptor.
364 struct kevent events[128];
365 int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
367 // Dispatch the waiting events.
368 for (int i = 0; i < num_events; ++i)
370 void* ptr = reinterpret_cast<void*>(events[i].udata);
371 if (ptr == &interrupter_)
373 interrupter_.reset();
377 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
378 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
380 if (events[i].filter == EVFILT_WRITE
381 && descriptor_data->num_kevents_ == 2
382 && descriptor_data->op_queue_[write_op].empty())
384 // Some descriptor types, like serial ports, don't seem to support
385 // EV_CLEAR with EVFILT_WRITE. Since we have no pending write
386 // operations we'll remove the EVFILT_WRITE registration here so that
387 // we don't end up in a tight spin.
388 struct kevent delete_events[1];
389 BOOST_ASIO_KQUEUE_EV_SET(&delete_events[0],
390 descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
391 ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
392 descriptor_data->num_kevents_ = 1;
395 // Exception operations must be processed first to ensure that any
396 // out-of-band data is read before normal data.
397 #if defined(__NetBSD__)
398 static const unsigned int filter[max_ops] =
400 static const int filter[max_ops] =
402 { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
403 for (int j = max_ops - 1; j >= 0; --j)
405 if (events[i].filter == filter[j])
407 if (j != except_op || events[i].flags & EV_OOBAND)
409 while (reactor_op* op = descriptor_data->op_queue_[j].front())
411 if (events[i].flags & EV_ERROR)
413 op->ec_ = boost::system::error_code(
414 static_cast<int>(events[i].data),
415 boost::asio::error::get_system_category());
416 descriptor_data->op_queue_[j].pop();
421 descriptor_data->op_queue_[j].pop();
434 timer_queues_.get_ready_timers(ops);
437 void kqueue_reactor::interrupt()
439 interrupter_.interrupt();
442 int kqueue_reactor::do_kqueue_create()
447 boost::system::error_code ec(errno,
448 boost::asio::error::get_system_category());
449 boost::asio::detail::throw_error(ec, "kqueue");
454 kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
456 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
457 return registered_descriptors_.alloc();
460 void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
462 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
463 registered_descriptors_.free(s);
466 void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
468 mutex::scoped_lock lock(mutex_);
469 timer_queues_.insert(&queue);
472 void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
474 mutex::scoped_lock lock(mutex_);
475 timer_queues_.erase(&queue);
478 timespec* kqueue_reactor::get_timeout(timespec& ts)
480 // By default we will wait no longer than 5 minutes. This will ensure that
481 // any changes to the system clock are detected after no longer than this.
482 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
483 ts.tv_sec = usec / 1000000;
484 ts.tv_nsec = (usec % 1000000) * 1000;
488 } // namespace detail
492 #undef BOOST_ASIO_KQUEUE_EV_SET
494 #include <boost/asio/detail/pop_options.hpp>
496 #endif // defined(BOOST_ASIO_HAS_KQUEUE)
498 #endif // BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP