]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // detail/impl/select_reactor.ipp | |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
4 | // | |
92f5a8d4 | 5 | // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
7c673cae FG |
6 | // |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP | |
12 | #define BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP | |
13 | ||
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
20 | #if defined(BOOST_ASIO_HAS_IOCP) \ | |
21 | || (!defined(BOOST_ASIO_HAS_DEV_POLL) \ | |
22 | && !defined(BOOST_ASIO_HAS_EPOLL) \ | |
23 | && !defined(BOOST_ASIO_HAS_KQUEUE) \ | |
24 | && !defined(BOOST_ASIO_WINDOWS_RUNTIME)) | |
25 | ||
7c673cae FG |
26 | #include <boost/asio/detail/fd_set_adapter.hpp> |
27 | #include <boost/asio/detail/select_reactor.hpp> | |
28 | #include <boost/asio/detail/signal_blocker.hpp> | |
29 | #include <boost/asio/detail/socket_ops.hpp> | |
30 | ||
31 | #include <boost/asio/detail/push_options.hpp> | |
32 | ||
33 | namespace boost { | |
34 | namespace asio { | |
35 | namespace detail { | |
36 | ||
b32b8144 FG |
37 | #if defined(BOOST_ASIO_HAS_IOCP) |
38 | class select_reactor::thread_function | |
39 | { | |
40 | public: | |
41 | explicit thread_function(select_reactor* r) | |
42 | : this_(r) | |
43 | { | |
44 | } | |
45 | ||
46 | void operator()() | |
47 | { | |
48 | this_->run_thread(); | |
49 | } | |
50 | ||
51 | private: | |
52 | select_reactor* this_; | |
53 | }; | |
54 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
55 | ||
56 | select_reactor::select_reactor(boost::asio::execution_context& ctx) | |
57 | : execution_context_service_base<select_reactor>(ctx), | |
58 | scheduler_(use_service<scheduler_type>(ctx)), | |
7c673cae FG |
59 | mutex_(), |
60 | interrupter_(), | |
61 | #if defined(BOOST_ASIO_HAS_IOCP) | |
62 | stop_thread_(false), | |
63 | thread_(0), | |
64 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
65 | shutdown_(false) | |
66 | { | |
67 | #if defined(BOOST_ASIO_HAS_IOCP) | |
68 | boost::asio::detail::signal_blocker sb; | |
b32b8144 | 69 | thread_ = new boost::asio::detail::thread(thread_function(this)); |
7c673cae FG |
70 | #endif // defined(BOOST_ASIO_HAS_IOCP) |
71 | } | |
72 | ||
73 | select_reactor::~select_reactor() | |
74 | { | |
b32b8144 | 75 | shutdown(); |
7c673cae FG |
76 | } |
77 | ||
b32b8144 | 78 | void select_reactor::shutdown() |
7c673cae FG |
79 | { |
80 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
81 | shutdown_ = true; | |
82 | #if defined(BOOST_ASIO_HAS_IOCP) | |
83 | stop_thread_ = true; | |
84 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
85 | lock.unlock(); | |
86 | ||
87 | #if defined(BOOST_ASIO_HAS_IOCP) | |
88 | if (thread_) | |
89 | { | |
90 | interrupter_.interrupt(); | |
91 | thread_->join(); | |
92 | delete thread_; | |
93 | thread_ = 0; | |
94 | } | |
95 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
96 | ||
97 | op_queue<operation> ops; | |
98 | ||
99 | for (int i = 0; i < max_ops; ++i) | |
100 | op_queue_[i].get_all_operations(ops); | |
101 | ||
102 | timer_queues_.get_all_timers(ops); | |
103 | ||
b32b8144 | 104 | scheduler_.abandon_operations(ops); |
7c673cae FG |
105 | } |
106 | ||
b32b8144 FG |
107 | void select_reactor::notify_fork( |
108 | boost::asio::execution_context::fork_event fork_ev) | |
7c673cae | 109 | { |
b32b8144 | 110 | if (fork_ev == boost::asio::execution_context::fork_child) |
7c673cae FG |
111 | interrupter_.recreate(); |
112 | } | |
113 | ||
114 | void select_reactor::init_task() | |
115 | { | |
b32b8144 | 116 | scheduler_.init_task(); |
7c673cae FG |
117 | } |
118 | ||
119 | int select_reactor::register_descriptor(socket_type, | |
120 | select_reactor::per_descriptor_data&) | |
121 | { | |
122 | return 0; | |
123 | } | |
124 | ||
125 | int select_reactor::register_internal_descriptor( | |
126 | int op_type, socket_type descriptor, | |
127 | select_reactor::per_descriptor_data&, reactor_op* op) | |
128 | { | |
129 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
130 | ||
131 | op_queue_[op_type].enqueue_operation(descriptor, op); | |
132 | interrupter_.interrupt(); | |
133 | ||
134 | return 0; | |
135 | } | |
136 | ||
137 | void select_reactor::move_descriptor(socket_type, | |
138 | select_reactor::per_descriptor_data&, | |
139 | select_reactor::per_descriptor_data&) | |
140 | { | |
141 | } | |
142 | ||
143 | void select_reactor::start_op(int op_type, socket_type descriptor, | |
144 | select_reactor::per_descriptor_data&, reactor_op* op, | |
145 | bool is_continuation, bool) | |
146 | { | |
147 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
148 | ||
149 | if (shutdown_) | |
150 | { | |
151 | post_immediate_completion(op, is_continuation); | |
152 | return; | |
153 | } | |
154 | ||
155 | bool first = op_queue_[op_type].enqueue_operation(descriptor, op); | |
b32b8144 | 156 | scheduler_.work_started(); |
7c673cae FG |
157 | if (first) |
158 | interrupter_.interrupt(); | |
159 | } | |
160 | ||
161 | void select_reactor::cancel_ops(socket_type descriptor, | |
162 | select_reactor::per_descriptor_data&) | |
163 | { | |
164 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
165 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
166 | } | |
167 | ||
168 | void select_reactor::deregister_descriptor(socket_type descriptor, | |
169 | select_reactor::per_descriptor_data&, bool) | |
170 | { | |
171 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
172 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
173 | } | |
174 | ||
175 | void select_reactor::deregister_internal_descriptor( | |
176 | socket_type descriptor, select_reactor::per_descriptor_data&) | |
177 | { | |
178 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
179 | op_queue<operation> ops; | |
180 | for (int i = 0; i < max_ops; ++i) | |
181 | op_queue_[i].cancel_operations(descriptor, ops); | |
182 | } | |
183 | ||
b32b8144 FG |
184 | void select_reactor::cleanup_descriptor_data( |
185 | select_reactor::per_descriptor_data&) | |
186 | { | |
187 | } | |
188 | ||
189 | void select_reactor::run(long usec, op_queue<operation>& ops) | |
7c673cae FG |
190 | { |
191 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
192 | ||
193 | #if defined(BOOST_ASIO_HAS_IOCP) | |
194 | // Check if the thread is supposed to stop. | |
195 | if (stop_thread_) | |
196 | return; | |
197 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
198 | ||
199 | // Set up the descriptor sets. | |
200 | for (int i = 0; i < max_select_ops; ++i) | |
201 | fd_sets_[i].reset(); | |
202 | fd_sets_[read_op].set(interrupter_.read_descriptor()); | |
203 | socket_type max_fd = 0; | |
204 | bool have_work_to_do = !timer_queues_.all_empty(); | |
205 | for (int i = 0; i < max_select_ops; ++i) | |
206 | { | |
207 | have_work_to_do = have_work_to_do || !op_queue_[i].empty(); | |
208 | fd_sets_[i].set(op_queue_[i], ops); | |
209 | if (fd_sets_[i].max_descriptor() > max_fd) | |
210 | max_fd = fd_sets_[i].max_descriptor(); | |
211 | } | |
212 | ||
213 | #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
214 | // Connection operations on Windows use both except and write fd_sets. | |
215 | have_work_to_do = have_work_to_do || !op_queue_[connect_op].empty(); | |
216 | fd_sets_[write_op].set(op_queue_[connect_op], ops); | |
217 | if (fd_sets_[write_op].max_descriptor() > max_fd) | |
218 | max_fd = fd_sets_[write_op].max_descriptor(); | |
219 | fd_sets_[except_op].set(op_queue_[connect_op], ops); | |
220 | if (fd_sets_[except_op].max_descriptor() > max_fd) | |
221 | max_fd = fd_sets_[except_op].max_descriptor(); | |
222 | #endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
223 | ||
224 | // We can return immediately if there's no work to do and the reactor is | |
225 | // not supposed to block. | |
b32b8144 | 226 | if (!usec && !have_work_to_do) |
7c673cae FG |
227 | return; |
228 | ||
229 | // Determine how long to block while waiting for events. | |
230 | timeval tv_buf = { 0, 0 }; | |
b32b8144 | 231 | timeval* tv = usec ? get_timeout(usec, tv_buf) : &tv_buf; |
7c673cae FG |
232 | |
233 | lock.unlock(); | |
234 | ||
235 | // Block on the select call until descriptors become ready. | |
236 | boost::system::error_code ec; | |
237 | int retval = socket_ops::select(static_cast<int>(max_fd + 1), | |
238 | fd_sets_[read_op], fd_sets_[write_op], fd_sets_[except_op], tv, ec); | |
239 | ||
240 | // Reset the interrupter. | |
241 | if (retval > 0 && fd_sets_[read_op].is_set(interrupter_.read_descriptor())) | |
242 | { | |
243 | interrupter_.reset(); | |
244 | --retval; | |
245 | } | |
246 | ||
247 | lock.lock(); | |
248 | ||
249 | // Dispatch all ready operations. | |
250 | if (retval > 0) | |
251 | { | |
252 | #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
253 | // Connection operations on Windows use both except and write fd_sets. | |
254 | fd_sets_[except_op].perform(op_queue_[connect_op], ops); | |
255 | fd_sets_[write_op].perform(op_queue_[connect_op], ops); | |
256 | #endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) | |
257 | ||
258 | // Exception operations must be processed first to ensure that any | |
259 | // out-of-band data is read before normal data. | |
260 | for (int i = max_select_ops - 1; i >= 0; --i) | |
261 | fd_sets_[i].perform(op_queue_[i], ops); | |
262 | } | |
263 | timer_queues_.get_ready_timers(ops); | |
264 | } | |
265 | ||
266 | void select_reactor::interrupt() | |
267 | { | |
268 | interrupter_.interrupt(); | |
269 | } | |
270 | ||
271 | #if defined(BOOST_ASIO_HAS_IOCP) | |
272 | void select_reactor::run_thread() | |
273 | { | |
274 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
275 | while (!stop_thread_) | |
276 | { | |
277 | lock.unlock(); | |
278 | op_queue<operation> ops; | |
279 | run(true, ops); | |
b32b8144 | 280 | scheduler_.post_deferred_completions(ops); |
7c673cae FG |
281 | lock.lock(); |
282 | } | |
283 | } | |
7c673cae FG |
284 | #endif // defined(BOOST_ASIO_HAS_IOCP) |
285 | ||
286 | void select_reactor::do_add_timer_queue(timer_queue_base& queue) | |
287 | { | |
288 | mutex::scoped_lock lock(mutex_); | |
289 | timer_queues_.insert(&queue); | |
290 | } | |
291 | ||
292 | void select_reactor::do_remove_timer_queue(timer_queue_base& queue) | |
293 | { | |
294 | mutex::scoped_lock lock(mutex_); | |
295 | timer_queues_.erase(&queue); | |
296 | } | |
297 | ||
b32b8144 | 298 | timeval* select_reactor::get_timeout(long usec, timeval& tv) |
7c673cae FG |
299 | { |
300 | // By default we will wait no longer than 5 minutes. This will ensure that | |
301 | // any changes to the system clock are detected after no longer than this. | |
b32b8144 FG |
302 | const long max_usec = 5 * 60 * 1000 * 1000; |
303 | usec = timer_queues_.wait_duration_usec( | |
304 | (usec < 0 || max_usec < usec) ? max_usec : usec); | |
7c673cae FG |
305 | tv.tv_sec = usec / 1000000; |
306 | tv.tv_usec = usec % 1000000; | |
307 | return &tv; | |
308 | } | |
309 | ||
310 | void select_reactor::cancel_ops_unlocked(socket_type descriptor, | |
311 | const boost::system::error_code& ec) | |
312 | { | |
313 | bool need_interrupt = false; | |
314 | op_queue<operation> ops; | |
315 | for (int i = 0; i < max_ops; ++i) | |
316 | need_interrupt = op_queue_[i].cancel_operations( | |
317 | descriptor, ops, ec) || need_interrupt; | |
b32b8144 | 318 | scheduler_.post_deferred_completions(ops); |
7c673cae FG |
319 | if (need_interrupt) |
320 | interrupter_.interrupt(); | |
321 | } | |
322 | ||
323 | } // namespace detail | |
324 | } // namespace asio | |
325 | } // namespace boost | |
326 | ||
327 | #include <boost/asio/detail/pop_options.hpp> | |
328 | ||
329 | #endif // defined(BOOST_ASIO_HAS_IOCP) | |
330 | // || (!defined(BOOST_ASIO_HAS_DEV_POLL) | |
331 | // && !defined(BOOST_ASIO_HAS_EPOLL) | |
332 | // && !defined(BOOST_ASIO_HAS_KQUEUE)) | |
333 | // && !defined(BOOST_ASIO_WINDOWS_RUNTIME)) | |
334 | ||
335 | #endif // BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP |