]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // detail/impl/dev_poll_reactor.ipp | |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
4 | // | |
5 | // Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
6 | // | |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP | |
12 | #define BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP | |
13 | ||
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
20 | #if defined(BOOST_ASIO_HAS_DEV_POLL) | |
21 | ||
22 | #include <boost/asio/detail/dev_poll_reactor.hpp> | |
23 | #include <boost/asio/detail/assert.hpp> | |
24 | #include <boost/asio/detail/throw_error.hpp> | |
25 | #include <boost/asio/error.hpp> | |
26 | ||
27 | #include <boost/asio/detail/push_options.hpp> | |
28 | ||
29 | namespace boost { | |
30 | namespace asio { | |
31 | namespace detail { | |
32 | ||
33 | dev_poll_reactor::dev_poll_reactor(boost::asio::io_service& io_service) | |
34 | : boost::asio::detail::service_base<dev_poll_reactor>(io_service), | |
35 | io_service_(use_service<io_service_impl>(io_service)), | |
36 | mutex_(), | |
37 | dev_poll_fd_(do_dev_poll_create()), | |
38 | interrupter_(), | |
39 | shutdown_(false) | |
40 | { | |
41 | // Add the interrupter's descriptor to /dev/poll. | |
42 | ::pollfd ev = { 0, 0, 0 }; | |
43 | ev.fd = interrupter_.read_descriptor(); | |
44 | ev.events = POLLIN | POLLERR; | |
45 | ev.revents = 0; | |
46 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
47 | } | |
48 | ||
49 | dev_poll_reactor::~dev_poll_reactor() | |
50 | { | |
51 | shutdown_service(); | |
52 | ::close(dev_poll_fd_); | |
53 | } | |
54 | ||
55 | void dev_poll_reactor::shutdown_service() | |
56 | { | |
57 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
58 | shutdown_ = true; | |
59 | lock.unlock(); | |
60 | ||
61 | op_queue<operation> ops; | |
62 | ||
63 | for (int i = 0; i < max_ops; ++i) | |
64 | op_queue_[i].get_all_operations(ops); | |
65 | ||
66 | timer_queues_.get_all_timers(ops); | |
67 | ||
68 | io_service_.abandon_operations(ops); | |
69 | } | |
70 | ||
71 | void dev_poll_reactor::fork_service(boost::asio::io_service::fork_event fork_ev) | |
72 | { | |
73 | if (fork_ev == boost::asio::io_service::fork_child) | |
74 | { | |
75 | detail::mutex::scoped_lock lock(mutex_); | |
76 | ||
77 | if (dev_poll_fd_ != -1) | |
78 | ::close(dev_poll_fd_); | |
79 | dev_poll_fd_ = -1; | |
80 | dev_poll_fd_ = do_dev_poll_create(); | |
81 | ||
82 | interrupter_.recreate(); | |
83 | ||
84 | // Add the interrupter's descriptor to /dev/poll. | |
85 | ::pollfd ev = { 0, 0, 0 }; | |
86 | ev.fd = interrupter_.read_descriptor(); | |
87 | ev.events = POLLIN | POLLERR; | |
88 | ev.revents = 0; | |
89 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
90 | ||
91 | // Re-register all descriptors with /dev/poll. The changes will be written | |
92 | // to the /dev/poll descriptor the next time the reactor is run. | |
93 | for (int i = 0; i < max_ops; ++i) | |
94 | { | |
95 | reactor_op_queue<socket_type>::iterator iter = op_queue_[i].begin(); | |
96 | reactor_op_queue<socket_type>::iterator end = op_queue_[i].end(); | |
97 | for (; iter != end; ++iter) | |
98 | { | |
99 | ::pollfd& pending_ev = add_pending_event_change(iter->first); | |
100 | pending_ev.events |= POLLERR | POLLHUP; | |
101 | switch (i) | |
102 | { | |
103 | case read_op: pending_ev.events |= POLLIN; break; | |
104 | case write_op: pending_ev.events |= POLLOUT; break; | |
105 | case except_op: pending_ev.events |= POLLPRI; break; | |
106 | default: break; | |
107 | } | |
108 | } | |
109 | } | |
110 | interrupter_.interrupt(); | |
111 | } | |
112 | } | |
113 | ||
114 | void dev_poll_reactor::init_task() | |
115 | { | |
116 | io_service_.init_task(); | |
117 | } | |
118 | ||
119 | int dev_poll_reactor::register_descriptor(socket_type, per_descriptor_data&) | |
120 | { | |
121 | return 0; | |
122 | } | |
123 | ||
124 | int dev_poll_reactor::register_internal_descriptor(int op_type, | |
125 | socket_type descriptor, per_descriptor_data&, reactor_op* op) | |
126 | { | |
127 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
128 | ||
129 | op_queue_[op_type].enqueue_operation(descriptor, op); | |
130 | ::pollfd& ev = add_pending_event_change(descriptor); | |
131 | ev.events = POLLERR | POLLHUP; | |
132 | switch (op_type) | |
133 | { | |
134 | case read_op: ev.events |= POLLIN; break; | |
135 | case write_op: ev.events |= POLLOUT; break; | |
136 | case except_op: ev.events |= POLLPRI; break; | |
137 | default: break; | |
138 | } | |
139 | interrupter_.interrupt(); | |
140 | ||
141 | return 0; | |
142 | } | |
143 | ||
144 | void dev_poll_reactor::move_descriptor(socket_type, | |
145 | dev_poll_reactor::per_descriptor_data&, | |
146 | dev_poll_reactor::per_descriptor_data&) | |
147 | { | |
148 | } | |
149 | ||
150 | void dev_poll_reactor::start_op(int op_type, socket_type descriptor, | |
151 | dev_poll_reactor::per_descriptor_data&, reactor_op* op, | |
152 | bool is_continuation, bool allow_speculative) | |
153 | { | |
154 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
155 | ||
156 | if (shutdown_) | |
157 | { | |
158 | post_immediate_completion(op, is_continuation); | |
159 | return; | |
160 | } | |
161 | ||
162 | if (allow_speculative) | |
163 | { | |
164 | if (op_type != read_op || !op_queue_[except_op].has_operation(descriptor)) | |
165 | { | |
166 | if (!op_queue_[op_type].has_operation(descriptor)) | |
167 | { | |
168 | if (op->perform()) | |
169 | { | |
170 | lock.unlock(); | |
171 | io_service_.post_immediate_completion(op, is_continuation); | |
172 | return; | |
173 | } | |
174 | } | |
175 | } | |
176 | } | |
177 | ||
178 | bool first = op_queue_[op_type].enqueue_operation(descriptor, op); | |
179 | io_service_.work_started(); | |
180 | if (first) | |
181 | { | |
182 | ::pollfd& ev = add_pending_event_change(descriptor); | |
183 | ev.events = POLLERR | POLLHUP; | |
184 | if (op_type == read_op | |
185 | || op_queue_[read_op].has_operation(descriptor)) | |
186 | ev.events |= POLLIN; | |
187 | if (op_type == write_op | |
188 | || op_queue_[write_op].has_operation(descriptor)) | |
189 | ev.events |= POLLOUT; | |
190 | if (op_type == except_op | |
191 | || op_queue_[except_op].has_operation(descriptor)) | |
192 | ev.events |= POLLPRI; | |
193 | interrupter_.interrupt(); | |
194 | } | |
195 | } | |
196 | ||
197 | void dev_poll_reactor::cancel_ops(socket_type descriptor, | |
198 | dev_poll_reactor::per_descriptor_data&) | |
199 | { | |
200 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
201 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
202 | } | |
203 | ||
204 | void dev_poll_reactor::deregister_descriptor(socket_type descriptor, | |
205 | dev_poll_reactor::per_descriptor_data&, bool) | |
206 | { | |
207 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
208 | ||
209 | // Remove the descriptor from /dev/poll. | |
210 | ::pollfd& ev = add_pending_event_change(descriptor); | |
211 | ev.events = POLLREMOVE; | |
212 | interrupter_.interrupt(); | |
213 | ||
214 | // Cancel any outstanding operations associated with the descriptor. | |
215 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
216 | } | |
217 | ||
218 | void dev_poll_reactor::deregister_internal_descriptor( | |
219 | socket_type descriptor, dev_poll_reactor::per_descriptor_data&) | |
220 | { | |
221 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
222 | ||
223 | // Remove the descriptor from /dev/poll. Since this function is only called | |
224 | // during a fork, we can apply the change immediately. | |
225 | ::pollfd ev = { 0, 0, 0 }; | |
226 | ev.fd = descriptor; | |
227 | ev.events = POLLREMOVE; | |
228 | ev.revents = 0; | |
229 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
230 | ||
231 | // Destroy all operations associated with the descriptor. | |
232 | op_queue<operation> ops; | |
233 | boost::system::error_code ec; | |
234 | for (int i = 0; i < max_ops; ++i) | |
235 | op_queue_[i].cancel_operations(descriptor, ops, ec); | |
236 | } | |
237 | ||
238 | void dev_poll_reactor::run(bool block, op_queue<operation>& ops) | |
239 | { | |
240 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
241 | ||
242 | // We can return immediately if there's no work to do and the reactor is | |
243 | // not supposed to block. | |
244 | if (!block && op_queue_[read_op].empty() && op_queue_[write_op].empty() | |
245 | && op_queue_[except_op].empty() && timer_queues_.all_empty()) | |
246 | return; | |
247 | ||
248 | // Write the pending event registration changes to the /dev/poll descriptor. | |
249 | std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size(); | |
250 | if (events_size > 0) | |
251 | { | |
252 | errno = 0; | |
253 | int result = ::write(dev_poll_fd_, | |
254 | &pending_event_changes_[0], events_size); | |
255 | if (result != static_cast<int>(events_size)) | |
256 | { | |
257 | boost::system::error_code ec = boost::system::error_code( | |
258 | errno, boost::asio::error::get_system_category()); | |
259 | for (std::size_t i = 0; i < pending_event_changes_.size(); ++i) | |
260 | { | |
261 | int descriptor = pending_event_changes_[i].fd; | |
262 | for (int j = 0; j < max_ops; ++j) | |
263 | op_queue_[j].cancel_operations(descriptor, ops, ec); | |
264 | } | |
265 | } | |
266 | pending_event_changes_.clear(); | |
267 | pending_event_change_index_.clear(); | |
268 | } | |
269 | ||
270 | int timeout = block ? get_timeout() : 0; | |
271 | lock.unlock(); | |
272 | ||
273 | // Block on the /dev/poll descriptor. | |
274 | ::pollfd events[128] = { { 0, 0, 0 } }; | |
275 | ::dvpoll dp = { 0, 0, 0 }; | |
276 | dp.dp_fds = events; | |
277 | dp.dp_nfds = 128; | |
278 | dp.dp_timeout = timeout; | |
279 | int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp); | |
280 | ||
281 | lock.lock(); | |
282 | ||
283 | // Dispatch the waiting events. | |
284 | for (int i = 0; i < num_events; ++i) | |
285 | { | |
286 | int descriptor = events[i].fd; | |
287 | if (descriptor == interrupter_.read_descriptor()) | |
288 | { | |
289 | interrupter_.reset(); | |
290 | } | |
291 | else | |
292 | { | |
293 | bool more_reads = false; | |
294 | bool more_writes = false; | |
295 | bool more_except = false; | |
296 | ||
297 | // Exception operations must be processed first to ensure that any | |
298 | // out-of-band data is read before normal data. | |
299 | if (events[i].events & (POLLPRI | POLLERR | POLLHUP)) | |
300 | more_except = | |
301 | op_queue_[except_op].perform_operations(descriptor, ops); | |
302 | else | |
303 | more_except = op_queue_[except_op].has_operation(descriptor); | |
304 | ||
305 | if (events[i].events & (POLLIN | POLLERR | POLLHUP)) | |
306 | more_reads = op_queue_[read_op].perform_operations(descriptor, ops); | |
307 | else | |
308 | more_reads = op_queue_[read_op].has_operation(descriptor); | |
309 | ||
310 | if (events[i].events & (POLLOUT | POLLERR | POLLHUP)) | |
311 | more_writes = op_queue_[write_op].perform_operations(descriptor, ops); | |
312 | else | |
313 | more_writes = op_queue_[write_op].has_operation(descriptor); | |
314 | ||
315 | if ((events[i].events & (POLLERR | POLLHUP)) != 0 | |
316 | && !more_except && !more_reads && !more_writes) | |
317 | { | |
318 | // If we have an event and no operations associated with the | |
319 | // descriptor then we need to delete the descriptor from /dev/poll. | |
320 | // The poll operation can produce POLLHUP or POLLERR events when there | |
321 | // is no operation pending, so if we do not remove the descriptor we | |
322 | // can end up in a tight polling loop. | |
323 | ::pollfd ev = { 0, 0, 0 }; | |
324 | ev.fd = descriptor; | |
325 | ev.events = POLLREMOVE; | |
326 | ev.revents = 0; | |
327 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
328 | } | |
329 | else | |
330 | { | |
331 | ::pollfd ev = { 0, 0, 0 }; | |
332 | ev.fd = descriptor; | |
333 | ev.events = POLLERR | POLLHUP; | |
334 | if (more_reads) | |
335 | ev.events |= POLLIN; | |
336 | if (more_writes) | |
337 | ev.events |= POLLOUT; | |
338 | if (more_except) | |
339 | ev.events |= POLLPRI; | |
340 | ev.revents = 0; | |
341 | int result = ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
342 | if (result != sizeof(ev)) | |
343 | { | |
344 | boost::system::error_code ec(errno, | |
345 | boost::asio::error::get_system_category()); | |
346 | for (int j = 0; j < max_ops; ++j) | |
347 | op_queue_[j].cancel_operations(descriptor, ops, ec); | |
348 | } | |
349 | } | |
350 | } | |
351 | } | |
352 | timer_queues_.get_ready_timers(ops); | |
353 | } | |
354 | ||
355 | void dev_poll_reactor::interrupt() | |
356 | { | |
357 | interrupter_.interrupt(); | |
358 | } | |
359 | ||
360 | int dev_poll_reactor::do_dev_poll_create() | |
361 | { | |
362 | int fd = ::open("/dev/poll", O_RDWR); | |
363 | if (fd == -1) | |
364 | { | |
365 | boost::system::error_code ec(errno, | |
366 | boost::asio::error::get_system_category()); | |
367 | boost::asio::detail::throw_error(ec, "/dev/poll"); | |
368 | } | |
369 | return fd; | |
370 | } | |
371 | ||
372 | void dev_poll_reactor::do_add_timer_queue(timer_queue_base& queue) | |
373 | { | |
374 | mutex::scoped_lock lock(mutex_); | |
375 | timer_queues_.insert(&queue); | |
376 | } | |
377 | ||
378 | void dev_poll_reactor::do_remove_timer_queue(timer_queue_base& queue) | |
379 | { | |
380 | mutex::scoped_lock lock(mutex_); | |
381 | timer_queues_.erase(&queue); | |
382 | } | |
383 | ||
384 | int dev_poll_reactor::get_timeout() | |
385 | { | |
386 | // By default we will wait no longer than 5 minutes. This will ensure that | |
387 | // any changes to the system clock are detected after no longer than this. | |
388 | return timer_queues_.wait_duration_msec(5 * 60 * 1000); | |
389 | } | |
390 | ||
391 | void dev_poll_reactor::cancel_ops_unlocked(socket_type descriptor, | |
392 | const boost::system::error_code& ec) | |
393 | { | |
394 | bool need_interrupt = false; | |
395 | op_queue<operation> ops; | |
396 | for (int i = 0; i < max_ops; ++i) | |
397 | need_interrupt = op_queue_[i].cancel_operations( | |
398 | descriptor, ops, ec) || need_interrupt; | |
399 | io_service_.post_deferred_completions(ops); | |
400 | if (need_interrupt) | |
401 | interrupter_.interrupt(); | |
402 | } | |
403 | ||
404 | ::pollfd& dev_poll_reactor::add_pending_event_change(int descriptor) | |
405 | { | |
406 | hash_map<int, std::size_t>::iterator iter | |
407 | = pending_event_change_index_.find(descriptor); | |
408 | if (iter == pending_event_change_index_.end()) | |
409 | { | |
410 | std::size_t index = pending_event_changes_.size(); | |
411 | pending_event_changes_.reserve(pending_event_changes_.size() + 1); | |
412 | pending_event_change_index_.insert(std::make_pair(descriptor, index)); | |
413 | pending_event_changes_.push_back(::pollfd()); | |
414 | pending_event_changes_[index].fd = descriptor; | |
415 | pending_event_changes_[index].revents = 0; | |
416 | return pending_event_changes_[index]; | |
417 | } | |
418 | else | |
419 | { | |
420 | return pending_event_changes_[iter->second]; | |
421 | } | |
422 | } | |
423 | ||
424 | } // namespace detail | |
425 | } // namespace asio | |
426 | } // namespace boost | |
427 | ||
428 | #include <boost/asio/detail/pop_options.hpp> | |
429 | ||
430 | #endif // defined(BOOST_ASIO_HAS_DEV_POLL) | |
431 | ||
432 | #endif // BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP |