]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // detail/impl/select_reactor.ipp | |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
4 | // | |
5 | // Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
6 | // | |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP | |
12 | #define BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP | |
13 | ||
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
20 | #if defined(BOOST_ASIO_HAS_IOCP) \ | |
21 | || (!defined(BOOST_ASIO_HAS_DEV_POLL) \ | |
22 | && !defined(BOOST_ASIO_HAS_EPOLL) \ | |
23 | && !defined(BOOST_ASIO_HAS_KQUEUE) \ | |
24 | && !defined(BOOST_ASIO_WINDOWS_RUNTIME)) | |
25 | ||
26 | #include <boost/asio/detail/bind_handler.hpp> | |
27 | #include <boost/asio/detail/fd_set_adapter.hpp> | |
28 | #include <boost/asio/detail/select_reactor.hpp> | |
29 | #include <boost/asio/detail/signal_blocker.hpp> | |
30 | #include <boost/asio/detail/socket_ops.hpp> | |
31 | ||
32 | #include <boost/asio/detail/push_options.hpp> | |
33 | ||
34 | namespace boost { | |
35 | namespace asio { | |
36 | namespace detail { | |
37 | ||
38 | select_reactor::select_reactor(boost::asio::io_service& io_service) | |
39 | : boost::asio::detail::service_base<select_reactor>(io_service), | |
40 | io_service_(use_service<io_service_impl>(io_service)), | |
41 | mutex_(), | |
42 | interrupter_(), | |
43 | #if defined(BOOST_ASIO_HAS_IOCP) | |
44 | stop_thread_(false), | |
45 | thread_(0), | |
46 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
47 | shutdown_(false) | |
48 | { | |
49 | #if defined(BOOST_ASIO_HAS_IOCP) | |
50 | boost::asio::detail::signal_blocker sb; | |
51 | thread_ = new boost::asio::detail::thread( | |
52 | bind_handler(&select_reactor::call_run_thread, this)); | |
53 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
54 | } | |
55 | ||
56 | select_reactor::~select_reactor() | |
57 | { | |
58 | shutdown_service(); | |
59 | } | |
60 | ||
61 | void select_reactor::shutdown_service() | |
62 | { | |
63 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
64 | shutdown_ = true; | |
65 | #if defined(BOOST_ASIO_HAS_IOCP) | |
66 | stop_thread_ = true; | |
67 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
68 | lock.unlock(); | |
69 | ||
70 | #if defined(BOOST_ASIO_HAS_IOCP) | |
71 | if (thread_) | |
72 | { | |
73 | interrupter_.interrupt(); | |
74 | thread_->join(); | |
75 | delete thread_; | |
76 | thread_ = 0; | |
77 | } | |
78 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
79 | ||
80 | op_queue<operation> ops; | |
81 | ||
82 | for (int i = 0; i < max_ops; ++i) | |
83 | op_queue_[i].get_all_operations(ops); | |
84 | ||
85 | timer_queues_.get_all_timers(ops); | |
86 | ||
87 | io_service_.abandon_operations(ops); | |
88 | } | |
89 | ||
90 | void select_reactor::fork_service(boost::asio::io_service::fork_event fork_ev) | |
91 | { | |
92 | if (fork_ev == boost::asio::io_service::fork_child) | |
93 | interrupter_.recreate(); | |
94 | } | |
95 | ||
96 | void select_reactor::init_task() | |
97 | { | |
98 | io_service_.init_task(); | |
99 | } | |
100 | ||
101 | int select_reactor::register_descriptor(socket_type, | |
102 | select_reactor::per_descriptor_data&) | |
103 | { | |
104 | return 0; | |
105 | } | |
106 | ||
107 | int select_reactor::register_internal_descriptor( | |
108 | int op_type, socket_type descriptor, | |
109 | select_reactor::per_descriptor_data&, reactor_op* op) | |
110 | { | |
111 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
112 | ||
113 | op_queue_[op_type].enqueue_operation(descriptor, op); | |
114 | interrupter_.interrupt(); | |
115 | ||
116 | return 0; | |
117 | } | |
118 | ||
119 | void select_reactor::move_descriptor(socket_type, | |
120 | select_reactor::per_descriptor_data&, | |
121 | select_reactor::per_descriptor_data&) | |
122 | { | |
123 | } | |
124 | ||
125 | void select_reactor::start_op(int op_type, socket_type descriptor, | |
126 | select_reactor::per_descriptor_data&, reactor_op* op, | |
127 | bool is_continuation, bool) | |
128 | { | |
129 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
130 | ||
131 | if (shutdown_) | |
132 | { | |
133 | post_immediate_completion(op, is_continuation); | |
134 | return; | |
135 | } | |
136 | ||
137 | bool first = op_queue_[op_type].enqueue_operation(descriptor, op); | |
138 | io_service_.work_started(); | |
139 | if (first) | |
140 | interrupter_.interrupt(); | |
141 | } | |
142 | ||
143 | void select_reactor::cancel_ops(socket_type descriptor, | |
144 | select_reactor::per_descriptor_data&) | |
145 | { | |
146 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
147 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
148 | } | |
149 | ||
150 | void select_reactor::deregister_descriptor(socket_type descriptor, | |
151 | select_reactor::per_descriptor_data&, bool) | |
152 | { | |
153 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
154 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
155 | } | |
156 | ||
157 | void select_reactor::deregister_internal_descriptor( | |
158 | socket_type descriptor, select_reactor::per_descriptor_data&) | |
159 | { | |
160 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
161 | op_queue<operation> ops; | |
162 | for (int i = 0; i < max_ops; ++i) | |
163 | op_queue_[i].cancel_operations(descriptor, ops); | |
164 | } | |
165 | ||
166 | void select_reactor::run(bool block, op_queue<operation>& ops) | |
167 | { | |
168 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
169 | ||
170 | #if defined(BOOST_ASIO_HAS_IOCP) | |
171 | // Check if the thread is supposed to stop. | |
172 | if (stop_thread_) | |
173 | return; | |
174 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
175 | ||
176 | // Set up the descriptor sets. | |
177 | for (int i = 0; i < max_select_ops; ++i) | |
178 | fd_sets_[i].reset(); | |
179 | fd_sets_[read_op].set(interrupter_.read_descriptor()); | |
180 | socket_type max_fd = 0; | |
181 | bool have_work_to_do = !timer_queues_.all_empty(); | |
182 | for (int i = 0; i < max_select_ops; ++i) | |
183 | { | |
184 | have_work_to_do = have_work_to_do || !op_queue_[i].empty(); | |
185 | fd_sets_[i].set(op_queue_[i], ops); | |
186 | if (fd_sets_[i].max_descriptor() > max_fd) | |
187 | max_fd = fd_sets_[i].max_descriptor(); | |
188 | } | |
189 | ||
190 | #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
191 | // Connection operations on Windows use both except and write fd_sets. | |
192 | have_work_to_do = have_work_to_do || !op_queue_[connect_op].empty(); | |
193 | fd_sets_[write_op].set(op_queue_[connect_op], ops); | |
194 | if (fd_sets_[write_op].max_descriptor() > max_fd) | |
195 | max_fd = fd_sets_[write_op].max_descriptor(); | |
196 | fd_sets_[except_op].set(op_queue_[connect_op], ops); | |
197 | if (fd_sets_[except_op].max_descriptor() > max_fd) | |
198 | max_fd = fd_sets_[except_op].max_descriptor(); | |
199 | #endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
200 | ||
201 | // We can return immediately if there's no work to do and the reactor is | |
202 | // not supposed to block. | |
203 | if (!block && !have_work_to_do) | |
204 | return; | |
205 | ||
206 | // Determine how long to block while waiting for events. | |
207 | timeval tv_buf = { 0, 0 }; | |
208 | timeval* tv = block ? get_timeout(tv_buf) : &tv_buf; | |
209 | ||
210 | lock.unlock(); | |
211 | ||
212 | // Block on the select call until descriptors become ready. | |
213 | boost::system::error_code ec; | |
214 | int retval = socket_ops::select(static_cast<int>(max_fd + 1), | |
215 | fd_sets_[read_op], fd_sets_[write_op], fd_sets_[except_op], tv, ec); | |
216 | ||
217 | // Reset the interrupter. | |
218 | if (retval > 0 && fd_sets_[read_op].is_set(interrupter_.read_descriptor())) | |
219 | { | |
220 | interrupter_.reset(); | |
221 | --retval; | |
222 | } | |
223 | ||
224 | lock.lock(); | |
225 | ||
226 | // Dispatch all ready operations. | |
227 | if (retval > 0) | |
228 | { | |
229 | #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
230 | // Connection operations on Windows use both except and write fd_sets. | |
231 | fd_sets_[except_op].perform(op_queue_[connect_op], ops); | |
232 | fd_sets_[write_op].perform(op_queue_[connect_op], ops); | |
233 | #endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
234 | ||
235 | // Exception operations must be processed first to ensure that any | |
236 | // out-of-band data is read before normal data. | |
237 | for (int i = max_select_ops - 1; i >= 0; --i) | |
238 | fd_sets_[i].perform(op_queue_[i], ops); | |
239 | } | |
240 | timer_queues_.get_ready_timers(ops); | |
241 | } | |
242 | ||
243 | void select_reactor::interrupt() | |
244 | { | |
245 | interrupter_.interrupt(); | |
246 | } | |
247 | ||
248 | #if defined(BOOST_ASIO_HAS_IOCP) | |
249 | void select_reactor::run_thread() | |
250 | { | |
251 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
252 | while (!stop_thread_) | |
253 | { | |
254 | lock.unlock(); | |
255 | op_queue<operation> ops; | |
256 | run(true, ops); | |
257 | io_service_.post_deferred_completions(ops); | |
258 | lock.lock(); | |
259 | } | |
260 | } | |
261 | ||
262 | void select_reactor::call_run_thread(select_reactor* reactor) | |
263 | { | |
264 | reactor->run_thread(); | |
265 | } | |
266 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
267 | ||
268 | void select_reactor::do_add_timer_queue(timer_queue_base& queue) | |
269 | { | |
270 | mutex::scoped_lock lock(mutex_); | |
271 | timer_queues_.insert(&queue); | |
272 | } | |
273 | ||
274 | void select_reactor::do_remove_timer_queue(timer_queue_base& queue) | |
275 | { | |
276 | mutex::scoped_lock lock(mutex_); | |
277 | timer_queues_.erase(&queue); | |
278 | } | |
279 | ||
280 | timeval* select_reactor::get_timeout(timeval& tv) | |
281 | { | |
282 | // By default we will wait no longer than 5 minutes. This will ensure that | |
283 | // any changes to the system clock are detected after no longer than this. | |
284 | long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000); | |
285 | tv.tv_sec = usec / 1000000; | |
286 | tv.tv_usec = usec % 1000000; | |
287 | return &tv; | |
288 | } | |
289 | ||
290 | void select_reactor::cancel_ops_unlocked(socket_type descriptor, | |
291 | const boost::system::error_code& ec) | |
292 | { | |
293 | bool need_interrupt = false; | |
294 | op_queue<operation> ops; | |
295 | for (int i = 0; i < max_ops; ++i) | |
296 | need_interrupt = op_queue_[i].cancel_operations( | |
297 | descriptor, ops, ec) || need_interrupt; | |
298 | io_service_.post_deferred_completions(ops); | |
299 | if (need_interrupt) | |
300 | interrupter_.interrupt(); | |
301 | } | |
302 | ||
303 | } // namespace detail | |
304 | } // namespace asio | |
305 | } // namespace boost | |
306 | ||
307 | #include <boost/asio/detail/pop_options.hpp> | |
308 | ||
309 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
310 | // || (!defined(BOOST_ASIO_HAS_DEV_POLL) | |
311 | // && !defined(BOOST_ASIO_HAS_EPOLL) | |
312 | // && !defined(BOOST_ASIO_HAS_KQUEUE)) | |
313 | // && !defined(BOOST_ASIO_WINDOWS_RUNTIME)) | |
314 | ||
315 | #endif // BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP |