]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // detail/impl/dev_poll_reactor.ipp | |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
4 | // | |
1e59de90 | 5 | // Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
7c673cae FG |
6 | // |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP | |
12 | #define BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP | |
13 | ||
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
20 | #if defined(BOOST_ASIO_HAS_DEV_POLL) | |
21 | ||
22 | #include <boost/asio/detail/dev_poll_reactor.hpp> | |
23 | #include <boost/asio/detail/assert.hpp> | |
1e59de90 | 24 | #include <boost/asio/detail/scheduler.hpp> |
7c673cae FG |
25 | #include <boost/asio/detail/throw_error.hpp> |
26 | #include <boost/asio/error.hpp> | |
27 | ||
28 | #include <boost/asio/detail/push_options.hpp> | |
29 | ||
30 | namespace boost { | |
31 | namespace asio { | |
32 | namespace detail { | |
33 | ||
b32b8144 FG |
34 | dev_poll_reactor::dev_poll_reactor(boost::asio::execution_context& ctx) |
35 | : boost::asio::detail::execution_context_service_base<dev_poll_reactor>(ctx), | |
36 | scheduler_(use_service<scheduler>(ctx)), | |
7c673cae FG |
37 | mutex_(), |
38 | dev_poll_fd_(do_dev_poll_create()), | |
39 | interrupter_(), | |
40 | shutdown_(false) | |
41 | { | |
42 | // Add the interrupter's descriptor to /dev/poll. | |
43 | ::pollfd ev = { 0, 0, 0 }; | |
44 | ev.fd = interrupter_.read_descriptor(); | |
45 | ev.events = POLLIN | POLLERR; | |
46 | ev.revents = 0; | |
47 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
48 | } | |
49 | ||
50 | dev_poll_reactor::~dev_poll_reactor() | |
51 | { | |
b32b8144 | 52 | shutdown(); |
7c673cae FG |
53 | ::close(dev_poll_fd_); |
54 | } | |
55 | ||
b32b8144 | 56 | void dev_poll_reactor::shutdown() |
7c673cae FG |
57 | { |
58 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
59 | shutdown_ = true; | |
60 | lock.unlock(); | |
61 | ||
62 | op_queue<operation> ops; | |
63 | ||
64 | for (int i = 0; i < max_ops; ++i) | |
65 | op_queue_[i].get_all_operations(ops); | |
66 | ||
67 | timer_queues_.get_all_timers(ops); | |
68 | ||
b32b8144 | 69 | scheduler_.abandon_operations(ops); |
7c673cae FG |
70 | } |
71 | ||
b32b8144 FG |
72 | void dev_poll_reactor::notify_fork( |
73 | boost::asio::execution_context::fork_event fork_ev) | |
7c673cae | 74 | { |
b32b8144 | 75 | if (fork_ev == boost::asio::execution_context::fork_child) |
7c673cae FG |
76 | { |
77 | detail::mutex::scoped_lock lock(mutex_); | |
78 | ||
79 | if (dev_poll_fd_ != -1) | |
80 | ::close(dev_poll_fd_); | |
81 | dev_poll_fd_ = -1; | |
82 | dev_poll_fd_ = do_dev_poll_create(); | |
83 | ||
84 | interrupter_.recreate(); | |
85 | ||
86 | // Add the interrupter's descriptor to /dev/poll. | |
87 | ::pollfd ev = { 0, 0, 0 }; | |
88 | ev.fd = interrupter_.read_descriptor(); | |
89 | ev.events = POLLIN | POLLERR; | |
90 | ev.revents = 0; | |
91 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
92 | ||
93 | // Re-register all descriptors with /dev/poll. The changes will be written | |
94 | // to the /dev/poll descriptor the next time the reactor is run. | |
95 | for (int i = 0; i < max_ops; ++i) | |
96 | { | |
97 | reactor_op_queue<socket_type>::iterator iter = op_queue_[i].begin(); | |
98 | reactor_op_queue<socket_type>::iterator end = op_queue_[i].end(); | |
99 | for (; iter != end; ++iter) | |
100 | { | |
101 | ::pollfd& pending_ev = add_pending_event_change(iter->first); | |
102 | pending_ev.events |= POLLERR | POLLHUP; | |
103 | switch (i) | |
104 | { | |
105 | case read_op: pending_ev.events |= POLLIN; break; | |
106 | case write_op: pending_ev.events |= POLLOUT; break; | |
107 | case except_op: pending_ev.events |= POLLPRI; break; | |
108 | default: break; | |
109 | } | |
110 | } | |
111 | } | |
112 | interrupter_.interrupt(); | |
113 | } | |
114 | } | |
115 | ||
116 | void dev_poll_reactor::init_task() | |
117 | { | |
b32b8144 | 118 | scheduler_.init_task(); |
7c673cae FG |
119 | } |
120 | ||
121 | int dev_poll_reactor::register_descriptor(socket_type, per_descriptor_data&) | |
122 | { | |
123 | return 0; | |
124 | } | |
125 | ||
126 | int dev_poll_reactor::register_internal_descriptor(int op_type, | |
127 | socket_type descriptor, per_descriptor_data&, reactor_op* op) | |
128 | { | |
129 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
130 | ||
131 | op_queue_[op_type].enqueue_operation(descriptor, op); | |
132 | ::pollfd& ev = add_pending_event_change(descriptor); | |
133 | ev.events = POLLERR | POLLHUP; | |
134 | switch (op_type) | |
135 | { | |
136 | case read_op: ev.events |= POLLIN; break; | |
137 | case write_op: ev.events |= POLLOUT; break; | |
138 | case except_op: ev.events |= POLLPRI; break; | |
139 | default: break; | |
140 | } | |
141 | interrupter_.interrupt(); | |
142 | ||
143 | return 0; | |
144 | } | |
145 | ||
146 | void dev_poll_reactor::move_descriptor(socket_type, | |
147 | dev_poll_reactor::per_descriptor_data&, | |
148 | dev_poll_reactor::per_descriptor_data&) | |
149 | { | |
150 | } | |
151 | ||
152 | void dev_poll_reactor::start_op(int op_type, socket_type descriptor, | |
153 | dev_poll_reactor::per_descriptor_data&, reactor_op* op, | |
154 | bool is_continuation, bool allow_speculative) | |
155 | { | |
156 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
157 | ||
158 | if (shutdown_) | |
159 | { | |
160 | post_immediate_completion(op, is_continuation); | |
161 | return; | |
162 | } | |
163 | ||
164 | if (allow_speculative) | |
165 | { | |
166 | if (op_type != read_op || !op_queue_[except_op].has_operation(descriptor)) | |
167 | { | |
168 | if (!op_queue_[op_type].has_operation(descriptor)) | |
169 | { | |
170 | if (op->perform()) | |
171 | { | |
172 | lock.unlock(); | |
b32b8144 | 173 | scheduler_.post_immediate_completion(op, is_continuation); |
7c673cae FG |
174 | return; |
175 | } | |
176 | } | |
177 | } | |
178 | } | |
179 | ||
180 | bool first = op_queue_[op_type].enqueue_operation(descriptor, op); | |
b32b8144 | 181 | scheduler_.work_started(); |
7c673cae FG |
182 | if (first) |
183 | { | |
184 | ::pollfd& ev = add_pending_event_change(descriptor); | |
185 | ev.events = POLLERR | POLLHUP; | |
186 | if (op_type == read_op | |
187 | || op_queue_[read_op].has_operation(descriptor)) | |
188 | ev.events |= POLLIN; | |
189 | if (op_type == write_op | |
190 | || op_queue_[write_op].has_operation(descriptor)) | |
191 | ev.events |= POLLOUT; | |
192 | if (op_type == except_op | |
193 | || op_queue_[except_op].has_operation(descriptor)) | |
194 | ev.events |= POLLPRI; | |
195 | interrupter_.interrupt(); | |
196 | } | |
197 | } | |
198 | ||
199 | void dev_poll_reactor::cancel_ops(socket_type descriptor, | |
200 | dev_poll_reactor::per_descriptor_data&) | |
201 | { | |
202 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
203 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
204 | } | |
205 | ||
1e59de90 TL |
206 | void dev_poll_reactor::cancel_ops_by_key(socket_type descriptor, |
207 | dev_poll_reactor::per_descriptor_data&, | |
208 | int op_type, void* cancellation_key) | |
209 | { | |
210 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
211 | op_queue<operation> ops; | |
212 | bool need_interrupt = op_queue_[op_type].cancel_operations_by_key( | |
213 | descriptor, ops, cancellation_key, boost::asio::error::operation_aborted); | |
214 | scheduler_.post_deferred_completions(ops); | |
215 | if (need_interrupt) | |
216 | interrupter_.interrupt(); | |
217 | } | |
218 | ||
7c673cae FG |
219 | void dev_poll_reactor::deregister_descriptor(socket_type descriptor, |
220 | dev_poll_reactor::per_descriptor_data&, bool) | |
221 | { | |
222 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
223 | ||
224 | // Remove the descriptor from /dev/poll. | |
225 | ::pollfd& ev = add_pending_event_change(descriptor); | |
226 | ev.events = POLLREMOVE; | |
227 | interrupter_.interrupt(); | |
228 | ||
229 | // Cancel any outstanding operations associated with the descriptor. | |
230 | cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); | |
231 | } | |
232 | ||
233 | void dev_poll_reactor::deregister_internal_descriptor( | |
234 | socket_type descriptor, dev_poll_reactor::per_descriptor_data&) | |
235 | { | |
236 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
237 | ||
238 | // Remove the descriptor from /dev/poll. Since this function is only called | |
239 | // during a fork, we can apply the change immediately. | |
240 | ::pollfd ev = { 0, 0, 0 }; | |
241 | ev.fd = descriptor; | |
242 | ev.events = POLLREMOVE; | |
243 | ev.revents = 0; | |
244 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
245 | ||
246 | // Destroy all operations associated with the descriptor. | |
247 | op_queue<operation> ops; | |
248 | boost::system::error_code ec; | |
249 | for (int i = 0; i < max_ops; ++i) | |
250 | op_queue_[i].cancel_operations(descriptor, ops, ec); | |
251 | } | |
252 | ||
b32b8144 FG |
253 | void dev_poll_reactor::cleanup_descriptor_data( |
254 | dev_poll_reactor::per_descriptor_data&) | |
255 | { | |
256 | } | |
257 | ||
258 | void dev_poll_reactor::run(long usec, op_queue<operation>& ops) | |
7c673cae FG |
259 | { |
260 | boost::asio::detail::mutex::scoped_lock lock(mutex_); | |
261 | ||
262 | // We can return immediately if there's no work to do and the reactor is | |
263 | // not supposed to block. | |
b32b8144 | 264 | if (usec == 0 && op_queue_[read_op].empty() && op_queue_[write_op].empty() |
7c673cae FG |
265 | && op_queue_[except_op].empty() && timer_queues_.all_empty()) |
266 | return; | |
267 | ||
268 | // Write the pending event registration changes to the /dev/poll descriptor. | |
269 | std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size(); | |
270 | if (events_size > 0) | |
271 | { | |
272 | errno = 0; | |
273 | int result = ::write(dev_poll_fd_, | |
274 | &pending_event_changes_[0], events_size); | |
275 | if (result != static_cast<int>(events_size)) | |
276 | { | |
277 | boost::system::error_code ec = boost::system::error_code( | |
278 | errno, boost::asio::error::get_system_category()); | |
279 | for (std::size_t i = 0; i < pending_event_changes_.size(); ++i) | |
280 | { | |
281 | int descriptor = pending_event_changes_[i].fd; | |
282 | for (int j = 0; j < max_ops; ++j) | |
283 | op_queue_[j].cancel_operations(descriptor, ops, ec); | |
284 | } | |
285 | } | |
286 | pending_event_changes_.clear(); | |
287 | pending_event_change_index_.clear(); | |
288 | } | |
289 | ||
b32b8144 FG |
290 | // Calculate timeout. |
291 | int timeout; | |
292 | if (usec == 0) | |
293 | timeout = 0; | |
294 | else | |
295 | { | |
296 | timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1); | |
297 | timeout = get_timeout(timeout); | |
298 | } | |
7c673cae FG |
299 | lock.unlock(); |
300 | ||
301 | // Block on the /dev/poll descriptor. | |
302 | ::pollfd events[128] = { { 0, 0, 0 } }; | |
303 | ::dvpoll dp = { 0, 0, 0 }; | |
304 | dp.dp_fds = events; | |
305 | dp.dp_nfds = 128; | |
306 | dp.dp_timeout = timeout; | |
307 | int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp); | |
308 | ||
309 | lock.lock(); | |
310 | ||
311 | // Dispatch the waiting events. | |
312 | for (int i = 0; i < num_events; ++i) | |
313 | { | |
314 | int descriptor = events[i].fd; | |
315 | if (descriptor == interrupter_.read_descriptor()) | |
316 | { | |
317 | interrupter_.reset(); | |
318 | } | |
319 | else | |
320 | { | |
321 | bool more_reads = false; | |
322 | bool more_writes = false; | |
323 | bool more_except = false; | |
324 | ||
325 | // Exception operations must be processed first to ensure that any | |
326 | // out-of-band data is read before normal data. | |
327 | if (events[i].events & (POLLPRI | POLLERR | POLLHUP)) | |
328 | more_except = | |
329 | op_queue_[except_op].perform_operations(descriptor, ops); | |
330 | else | |
331 | more_except = op_queue_[except_op].has_operation(descriptor); | |
332 | ||
333 | if (events[i].events & (POLLIN | POLLERR | POLLHUP)) | |
334 | more_reads = op_queue_[read_op].perform_operations(descriptor, ops); | |
335 | else | |
336 | more_reads = op_queue_[read_op].has_operation(descriptor); | |
337 | ||
338 | if (events[i].events & (POLLOUT | POLLERR | POLLHUP)) | |
339 | more_writes = op_queue_[write_op].perform_operations(descriptor, ops); | |
340 | else | |
341 | more_writes = op_queue_[write_op].has_operation(descriptor); | |
342 | ||
343 | if ((events[i].events & (POLLERR | POLLHUP)) != 0 | |
344 | && !more_except && !more_reads && !more_writes) | |
345 | { | |
346 | // If we have an event and no operations associated with the | |
347 | // descriptor then we need to delete the descriptor from /dev/poll. | |
348 | // The poll operation can produce POLLHUP or POLLERR events when there | |
349 | // is no operation pending, so if we do not remove the descriptor we | |
350 | // can end up in a tight polling loop. | |
351 | ::pollfd ev = { 0, 0, 0 }; | |
352 | ev.fd = descriptor; | |
353 | ev.events = POLLREMOVE; | |
354 | ev.revents = 0; | |
355 | ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
356 | } | |
357 | else | |
358 | { | |
359 | ::pollfd ev = { 0, 0, 0 }; | |
360 | ev.fd = descriptor; | |
361 | ev.events = POLLERR | POLLHUP; | |
362 | if (more_reads) | |
363 | ev.events |= POLLIN; | |
364 | if (more_writes) | |
365 | ev.events |= POLLOUT; | |
366 | if (more_except) | |
367 | ev.events |= POLLPRI; | |
368 | ev.revents = 0; | |
369 | int result = ::write(dev_poll_fd_, &ev, sizeof(ev)); | |
370 | if (result != sizeof(ev)) | |
371 | { | |
372 | boost::system::error_code ec(errno, | |
373 | boost::asio::error::get_system_category()); | |
374 | for (int j = 0; j < max_ops; ++j) | |
375 | op_queue_[j].cancel_operations(descriptor, ops, ec); | |
376 | } | |
377 | } | |
378 | } | |
379 | } | |
380 | timer_queues_.get_ready_timers(ops); | |
381 | } | |
382 | ||
383 | void dev_poll_reactor::interrupt() | |
384 | { | |
385 | interrupter_.interrupt(); | |
386 | } | |
387 | ||
388 | int dev_poll_reactor::do_dev_poll_create() | |
389 | { | |
390 | int fd = ::open("/dev/poll", O_RDWR); | |
391 | if (fd == -1) | |
392 | { | |
393 | boost::system::error_code ec(errno, | |
394 | boost::asio::error::get_system_category()); | |
395 | boost::asio::detail::throw_error(ec, "/dev/poll"); | |
396 | } | |
397 | return fd; | |
398 | } | |
399 | ||
400 | void dev_poll_reactor::do_add_timer_queue(timer_queue_base& queue) | |
401 | { | |
402 | mutex::scoped_lock lock(mutex_); | |
403 | timer_queues_.insert(&queue); | |
404 | } | |
405 | ||
406 | void dev_poll_reactor::do_remove_timer_queue(timer_queue_base& queue) | |
407 | { | |
408 | mutex::scoped_lock lock(mutex_); | |
409 | timer_queues_.erase(&queue); | |
410 | } | |
411 | ||
b32b8144 | 412 | int dev_poll_reactor::get_timeout(int msec) |
7c673cae FG |
413 | { |
414 | // By default we will wait no longer than 5 minutes. This will ensure that | |
415 | // any changes to the system clock are detected after no longer than this. | |
b32b8144 FG |
416 | const int max_msec = 5 * 60 * 1000; |
417 | return timer_queues_.wait_duration_msec( | |
418 | (msec < 0 || max_msec < msec) ? max_msec : msec); | |
7c673cae FG |
419 | } |
420 | ||
421 | void dev_poll_reactor::cancel_ops_unlocked(socket_type descriptor, | |
422 | const boost::system::error_code& ec) | |
423 | { | |
424 | bool need_interrupt = false; | |
425 | op_queue<operation> ops; | |
426 | for (int i = 0; i < max_ops; ++i) | |
427 | need_interrupt = op_queue_[i].cancel_operations( | |
428 | descriptor, ops, ec) || need_interrupt; | |
b32b8144 | 429 | scheduler_.post_deferred_completions(ops); |
7c673cae FG |
430 | if (need_interrupt) |
431 | interrupter_.interrupt(); | |
432 | } | |
433 | ||
434 | ::pollfd& dev_poll_reactor::add_pending_event_change(int descriptor) | |
435 | { | |
436 | hash_map<int, std::size_t>::iterator iter | |
437 | = pending_event_change_index_.find(descriptor); | |
438 | if (iter == pending_event_change_index_.end()) | |
439 | { | |
440 | std::size_t index = pending_event_changes_.size(); | |
441 | pending_event_changes_.reserve(pending_event_changes_.size() + 1); | |
442 | pending_event_change_index_.insert(std::make_pair(descriptor, index)); | |
443 | pending_event_changes_.push_back(::pollfd()); | |
444 | pending_event_changes_[index].fd = descriptor; | |
445 | pending_event_changes_[index].revents = 0; | |
446 | return pending_event_changes_[index]; | |
447 | } | |
448 | else | |
449 | { | |
450 | return pending_event_changes_[iter->second]; | |
451 | } | |
452 | } | |
453 | ||
454 | } // namespace detail | |
455 | } // namespace asio | |
456 | } // namespace boost | |
457 | ||
458 | #include <boost/asio/detail/pop_options.hpp> | |
459 | ||
460 | #endif // defined(BOOST_ASIO_HAS_DEV_POLL) | |
461 | ||
462 | #endif // BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP |