]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // detail/impl/epoll_reactor.ipp | |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
4 | // | |
11fdf7f2 | 5 | // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
7c673cae FG |
6 | // |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP | |
12 | #define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP | |
13 | ||
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
20 | #if defined(BOOST_ASIO_HAS_EPOLL) | |
21 | ||
22 | #include <cstddef> | |
23 | #include <sys/epoll.h> | |
24 | #include <boost/asio/detail/epoll_reactor.hpp> | |
25 | #include <boost/asio/detail/throw_error.hpp> | |
26 | #include <boost/asio/error.hpp> | |
27 | ||
28 | #if defined(BOOST_ASIO_HAS_TIMERFD) | |
29 | # include <sys/timerfd.h> | |
30 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
31 | ||
32 | #include <boost/asio/detail/push_options.hpp> | |
33 | ||
34 | namespace boost { | |
35 | namespace asio { | |
36 | namespace detail { | |
37 | ||
b32b8144 FG |
38 | epoll_reactor::epoll_reactor(boost::asio::execution_context& ctx) |
39 | : execution_context_service_base<epoll_reactor>(ctx), | |
40 | scheduler_(use_service<scheduler>(ctx)), | |
41 | mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( | |
42 | REACTOR_REGISTRATION, scheduler_.concurrency_hint())), | |
7c673cae FG |
43 | interrupter_(), |
44 | epoll_fd_(do_epoll_create()), | |
45 | timer_fd_(do_timerfd_create()), | |
b32b8144 FG |
46 | shutdown_(false), |
47 | registered_descriptors_mutex_(mutex_.enabled()) | |
7c673cae FG |
48 | { |
49 | // Add the interrupter's descriptor to epoll. | |
50 | epoll_event ev = { 0, { 0 } }; | |
51 | ev.events = EPOLLIN | EPOLLERR | EPOLLET; | |
52 | ev.data.ptr = &interrupter_; | |
53 | epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev); | |
54 | interrupter_.interrupt(); | |
55 | ||
56 | // Add the timer descriptor to epoll. | |
57 | if (timer_fd_ != -1) | |
58 | { | |
59 | ev.events = EPOLLIN | EPOLLERR; | |
60 | ev.data.ptr = &timer_fd_; | |
61 | epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev); | |
62 | } | |
63 | } | |
64 | ||
65 | epoll_reactor::~epoll_reactor() | |
66 | { | |
67 | if (epoll_fd_ != -1) | |
68 | close(epoll_fd_); | |
69 | if (timer_fd_ != -1) | |
70 | close(timer_fd_); | |
71 | } | |
72 | ||
b32b8144 | 73 | void epoll_reactor::shutdown() |
7c673cae FG |
74 | { |
75 | mutex::scoped_lock lock(mutex_); | |
76 | shutdown_ = true; | |
77 | lock.unlock(); | |
78 | ||
79 | op_queue<operation> ops; | |
80 | ||
81 | while (descriptor_state* state = registered_descriptors_.first()) | |
82 | { | |
83 | for (int i = 0; i < max_ops; ++i) | |
84 | ops.push(state->op_queue_[i]); | |
85 | state->shutdown_ = true; | |
86 | registered_descriptors_.free(state); | |
87 | } | |
88 | ||
89 | timer_queues_.get_all_timers(ops); | |
90 | ||
b32b8144 | 91 | scheduler_.abandon_operations(ops); |
7c673cae FG |
92 | } |
93 | ||
b32b8144 FG |
94 | void epoll_reactor::notify_fork( |
95 | boost::asio::execution_context::fork_event fork_ev) | |
7c673cae | 96 | { |
b32b8144 | 97 | if (fork_ev == boost::asio::execution_context::fork_child) |
7c673cae FG |
98 | { |
99 | if (epoll_fd_ != -1) | |
100 | ::close(epoll_fd_); | |
101 | epoll_fd_ = -1; | |
102 | epoll_fd_ = do_epoll_create(); | |
103 | ||
104 | if (timer_fd_ != -1) | |
105 | ::close(timer_fd_); | |
106 | timer_fd_ = -1; | |
107 | timer_fd_ = do_timerfd_create(); | |
108 | ||
109 | interrupter_.recreate(); | |
110 | ||
111 | // Add the interrupter's descriptor to epoll. | |
112 | epoll_event ev = { 0, { 0 } }; | |
113 | ev.events = EPOLLIN | EPOLLERR | EPOLLET; | |
114 | ev.data.ptr = &interrupter_; | |
115 | epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev); | |
116 | interrupter_.interrupt(); | |
117 | ||
118 | // Add the timer descriptor to epoll. | |
119 | if (timer_fd_ != -1) | |
120 | { | |
121 | ev.events = EPOLLIN | EPOLLERR; | |
122 | ev.data.ptr = &timer_fd_; | |
123 | epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev); | |
124 | } | |
125 | ||
126 | update_timeout(); | |
127 | ||
128 | // Re-register all descriptors with epoll. | |
129 | mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); | |
130 | for (descriptor_state* state = registered_descriptors_.first(); | |
131 | state != 0; state = state->next_) | |
132 | { | |
133 | ev.events = state->registered_events_; | |
134 | ev.data.ptr = state; | |
135 | int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev); | |
136 | if (result != 0) | |
137 | { | |
138 | boost::system::error_code ec(errno, | |
139 | boost::asio::error::get_system_category()); | |
140 | boost::asio::detail::throw_error(ec, "epoll re-registration"); | |
141 | } | |
142 | } | |
143 | } | |
144 | } | |
145 | ||
146 | void epoll_reactor::init_task() | |
147 | { | |
b32b8144 | 148 | scheduler_.init_task(); |
7c673cae FG |
149 | } |
150 | ||
151 | int epoll_reactor::register_descriptor(socket_type descriptor, | |
152 | epoll_reactor::per_descriptor_data& descriptor_data) | |
153 | { | |
154 | descriptor_data = allocate_descriptor_state(); | |
155 | ||
b32b8144 FG |
156 | BOOST_ASIO_HANDLER_REACTOR_REGISTRATION(( |
157 | context(), static_cast<uintmax_t>(descriptor), | |
158 | reinterpret_cast<uintmax_t>(descriptor_data))); | |
159 | ||
7c673cae FG |
160 | { |
161 | mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
162 | ||
163 | descriptor_data->reactor_ = this; | |
164 | descriptor_data->descriptor_ = descriptor; | |
165 | descriptor_data->shutdown_ = false; | |
b32b8144 FG |
166 | for (int i = 0; i < max_ops; ++i) |
167 | descriptor_data->try_speculative_[i] = true; | |
7c673cae FG |
168 | } |
169 | ||
170 | epoll_event ev = { 0, { 0 } }; | |
171 | ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; | |
172 | descriptor_data->registered_events_ = ev.events; | |
173 | ev.data.ptr = descriptor_data; | |
174 | int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); | |
175 | if (result != 0) | |
b32b8144 FG |
176 | { |
177 | if (errno == EPERM) | |
178 | { | |
179 | // This file descriptor type is not supported by epoll. However, if it is | |
180 | // a regular file then operations on it will not block. We will allow | |
181 | // this descriptor to be used and fail later if an operation on it would | |
182 | // otherwise require a trip through the reactor. | |
183 | descriptor_data->registered_events_ = 0; | |
184 | return 0; | |
185 | } | |
7c673cae | 186 | return errno; |
b32b8144 | 187 | } |
7c673cae FG |
188 | |
189 | return 0; | |
190 | } | |
191 | ||
192 | int epoll_reactor::register_internal_descriptor( | |
193 | int op_type, socket_type descriptor, | |
194 | epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op) | |
195 | { | |
196 | descriptor_data = allocate_descriptor_state(); | |
197 | ||
b32b8144 FG |
198 | BOOST_ASIO_HANDLER_REACTOR_REGISTRATION(( |
199 | context(), static_cast<uintmax_t>(descriptor), | |
200 | reinterpret_cast<uintmax_t>(descriptor_data))); | |
201 | ||
7c673cae FG |
202 | { |
203 | mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
204 | ||
205 | descriptor_data->reactor_ = this; | |
206 | descriptor_data->descriptor_ = descriptor; | |
207 | descriptor_data->shutdown_ = false; | |
208 | descriptor_data->op_queue_[op_type].push(op); | |
b32b8144 FG |
209 | for (int i = 0; i < max_ops; ++i) |
210 | descriptor_data->try_speculative_[i] = true; | |
7c673cae FG |
211 | } |
212 | ||
213 | epoll_event ev = { 0, { 0 } }; | |
214 | ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; | |
215 | descriptor_data->registered_events_ = ev.events; | |
216 | ev.data.ptr = descriptor_data; | |
217 | int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); | |
218 | if (result != 0) | |
219 | return errno; | |
220 | ||
221 | return 0; | |
222 | } | |
223 | ||
224 | void epoll_reactor::move_descriptor(socket_type, | |
225 | epoll_reactor::per_descriptor_data& target_descriptor_data, | |
226 | epoll_reactor::per_descriptor_data& source_descriptor_data) | |
227 | { | |
228 | target_descriptor_data = source_descriptor_data; | |
229 | source_descriptor_data = 0; | |
230 | } | |
231 | ||
232 | void epoll_reactor::start_op(int op_type, socket_type descriptor, | |
233 | epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op, | |
234 | bool is_continuation, bool allow_speculative) | |
235 | { | |
236 | if (!descriptor_data) | |
237 | { | |
238 | op->ec_ = boost::asio::error::bad_descriptor; | |
239 | post_immediate_completion(op, is_continuation); | |
240 | return; | |
241 | } | |
242 | ||
243 | mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
244 | ||
245 | if (descriptor_data->shutdown_) | |
246 | { | |
247 | post_immediate_completion(op, is_continuation); | |
248 | return; | |
249 | } | |
250 | ||
251 | if (descriptor_data->op_queue_[op_type].empty()) | |
252 | { | |
253 | if (allow_speculative | |
254 | && (op_type != read_op | |
255 | || descriptor_data->op_queue_[except_op].empty())) | |
256 | { | |
b32b8144 | 257 | if (descriptor_data->try_speculative_[op_type]) |
7c673cae | 258 | { |
b32b8144 FG |
259 | if (reactor_op::status status = op->perform()) |
260 | { | |
261 | if (status == reactor_op::done_and_exhausted) | |
262 | if (descriptor_data->registered_events_ != 0) | |
263 | descriptor_data->try_speculative_[op_type] = false; | |
264 | descriptor_lock.unlock(); | |
265 | scheduler_.post_immediate_completion(op, is_continuation); | |
266 | return; | |
267 | } | |
268 | } | |
269 | ||
270 | if (descriptor_data->registered_events_ == 0) | |
271 | { | |
272 | op->ec_ = boost::asio::error::operation_not_supported; | |
273 | scheduler_.post_immediate_completion(op, is_continuation); | |
7c673cae FG |
274 | return; |
275 | } | |
276 | ||
277 | if (op_type == write_op) | |
278 | { | |
279 | if ((descriptor_data->registered_events_ & EPOLLOUT) == 0) | |
280 | { | |
281 | epoll_event ev = { 0, { 0 } }; | |
282 | ev.events = descriptor_data->registered_events_ | EPOLLOUT; | |
283 | ev.data.ptr = descriptor_data; | |
284 | if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) | |
285 | { | |
286 | descriptor_data->registered_events_ |= ev.events; | |
287 | } | |
288 | else | |
289 | { | |
290 | op->ec_ = boost::system::error_code(errno, | |
291 | boost::asio::error::get_system_category()); | |
b32b8144 | 292 | scheduler_.post_immediate_completion(op, is_continuation); |
7c673cae FG |
293 | return; |
294 | } | |
295 | } | |
296 | } | |
297 | } | |
b32b8144 FG |
298 | else if (descriptor_data->registered_events_ == 0) |
299 | { | |
300 | op->ec_ = boost::asio::error::operation_not_supported; | |
301 | scheduler_.post_immediate_completion(op, is_continuation); | |
302 | return; | |
303 | } | |
7c673cae FG |
304 | else |
305 | { | |
306 | if (op_type == write_op) | |
307 | { | |
308 | descriptor_data->registered_events_ |= EPOLLOUT; | |
309 | } | |
310 | ||
311 | epoll_event ev = { 0, { 0 } }; | |
312 | ev.events = descriptor_data->registered_events_; | |
313 | ev.data.ptr = descriptor_data; | |
314 | epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); | |
315 | } | |
316 | } | |
317 | ||
318 | descriptor_data->op_queue_[op_type].push(op); | |
b32b8144 | 319 | scheduler_.work_started(); |
7c673cae FG |
320 | } |
321 | ||
322 | void epoll_reactor::cancel_ops(socket_type, | |
323 | epoll_reactor::per_descriptor_data& descriptor_data) | |
324 | { | |
325 | if (!descriptor_data) | |
326 | return; | |
327 | ||
328 | mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
329 | ||
330 | op_queue<operation> ops; | |
331 | for (int i = 0; i < max_ops; ++i) | |
332 | { | |
333 | while (reactor_op* op = descriptor_data->op_queue_[i].front()) | |
334 | { | |
335 | op->ec_ = boost::asio::error::operation_aborted; | |
336 | descriptor_data->op_queue_[i].pop(); | |
337 | ops.push(op); | |
338 | } | |
339 | } | |
340 | ||
341 | descriptor_lock.unlock(); | |
342 | ||
b32b8144 | 343 | scheduler_.post_deferred_completions(ops); |
7c673cae FG |
344 | } |
345 | ||
346 | void epoll_reactor::deregister_descriptor(socket_type descriptor, | |
347 | epoll_reactor::per_descriptor_data& descriptor_data, bool closing) | |
348 | { | |
349 | if (!descriptor_data) | |
350 | return; | |
351 | ||
352 | mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
353 | ||
354 | if (!descriptor_data->shutdown_) | |
355 | { | |
356 | if (closing) | |
357 | { | |
358 | // The descriptor will be automatically removed from the epoll set when | |
359 | // it is closed. | |
360 | } | |
b32b8144 | 361 | else if (descriptor_data->registered_events_ != 0) |
7c673cae FG |
362 | { |
363 | epoll_event ev = { 0, { 0 } }; | |
364 | epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev); | |
365 | } | |
366 | ||
367 | op_queue<operation> ops; | |
368 | for (int i = 0; i < max_ops; ++i) | |
369 | { | |
370 | while (reactor_op* op = descriptor_data->op_queue_[i].front()) | |
371 | { | |
372 | op->ec_ = boost::asio::error::operation_aborted; | |
373 | descriptor_data->op_queue_[i].pop(); | |
374 | ops.push(op); | |
375 | } | |
376 | } | |
377 | ||
378 | descriptor_data->descriptor_ = -1; | |
379 | descriptor_data->shutdown_ = true; | |
380 | ||
381 | descriptor_lock.unlock(); | |
382 | ||
b32b8144 FG |
383 | BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION(( |
384 | context(), static_cast<uintmax_t>(descriptor), | |
385 | reinterpret_cast<uintmax_t>(descriptor_data))); | |
7c673cae | 386 | |
b32b8144 FG |
387 | scheduler_.post_deferred_completions(ops); |
388 | ||
389 | // Leave descriptor_data set so that it will be freed by the subsequent | |
390 | // call to cleanup_descriptor_data. | |
391 | } | |
392 | else | |
393 | { | |
394 | // We are shutting down, so prevent cleanup_descriptor_data from freeing | |
395 | // the descriptor_data object and let the destructor free it instead. | |
396 | descriptor_data = 0; | |
7c673cae FG |
397 | } |
398 | } | |
399 | ||
400 | void epoll_reactor::deregister_internal_descriptor(socket_type descriptor, | |
401 | epoll_reactor::per_descriptor_data& descriptor_data) | |
402 | { | |
403 | if (!descriptor_data) | |
404 | return; | |
405 | ||
406 | mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
407 | ||
408 | if (!descriptor_data->shutdown_) | |
409 | { | |
410 | epoll_event ev = { 0, { 0 } }; | |
411 | epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev); | |
412 | ||
413 | op_queue<operation> ops; | |
414 | for (int i = 0; i < max_ops; ++i) | |
415 | ops.push(descriptor_data->op_queue_[i]); | |
416 | ||
417 | descriptor_data->descriptor_ = -1; | |
418 | descriptor_data->shutdown_ = true; | |
419 | ||
420 | descriptor_lock.unlock(); | |
421 | ||
b32b8144 FG |
422 | BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION(( |
423 | context(), static_cast<uintmax_t>(descriptor), | |
424 | reinterpret_cast<uintmax_t>(descriptor_data))); | |
425 | ||
426 | // Leave descriptor_data set so that it will be freed by the subsequent | |
427 | // call to cleanup_descriptor_data. | |
428 | } | |
429 | else | |
430 | { | |
431 | // We are shutting down, so prevent cleanup_descriptor_data from freeing | |
432 | // the descriptor_data object and let the destructor free it instead. | |
433 | descriptor_data = 0; | |
434 | } | |
435 | } | |
436 | ||
437 | void epoll_reactor::cleanup_descriptor_data( | |
438 | per_descriptor_data& descriptor_data) | |
439 | { | |
440 | if (descriptor_data) | |
441 | { | |
7c673cae FG |
442 | free_descriptor_state(descriptor_data); |
443 | descriptor_data = 0; | |
444 | } | |
445 | } | |
446 | ||
b32b8144 | 447 | void epoll_reactor::run(long usec, op_queue<operation>& ops) |
7c673cae | 448 | { |
b32b8144 FG |
449 | // This code relies on the fact that the scheduler queues the reactor task |
450 | // behind all descriptor operations generated by this function. This means, | |
451 | // that by the time we reach this point, any previously returned descriptor | |
452 | // operations have already been dequeued. Therefore it is now safe for us to | |
453 | // reuse and return them for the scheduler to queue again. | |
7c673cae | 454 | |
b32b8144 | 455 | // Calculate timeout. Check the timer queues only if timerfd is not in use. |
7c673cae | 456 | int timeout; |
b32b8144 FG |
457 | if (usec == 0) |
458 | timeout = 0; | |
7c673cae FG |
459 | else |
460 | { | |
b32b8144 FG |
461 | timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1); |
462 | if (timer_fd_ == -1) | |
463 | { | |
464 | mutex::scoped_lock lock(mutex_); | |
465 | timeout = get_timeout(timeout); | |
466 | } | |
7c673cae FG |
467 | } |
468 | ||
469 | // Block on the epoll descriptor. | |
470 | epoll_event events[128]; | |
471 | int num_events = epoll_wait(epoll_fd_, events, 128, timeout); | |
472 | ||
b32b8144 FG |
473 | #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) |
474 | // Trace the waiting events. | |
475 | for (int i = 0; i < num_events; ++i) | |
476 | { | |
477 | void* ptr = events[i].data.ptr; | |
478 | if (ptr == &interrupter_) | |
479 | { | |
480 | // Ignore. | |
481 | } | |
482 | # if defined(BOOST_ASIO_HAS_TIMERFD) | |
483 | else if (ptr == &timer_fd_) | |
484 | { | |
485 | // Ignore. | |
486 | } | |
487 | # endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
488 | else | |
489 | { | |
490 | unsigned event_mask = 0; | |
491 | if ((events[i].events & EPOLLIN) != 0) | |
492 | event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT; | |
493 | if ((events[i].events & EPOLLOUT)) | |
494 | event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT; | |
495 | if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0) | |
496 | event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT; | |
497 | BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(), | |
498 | reinterpret_cast<uintmax_t>(ptr), event_mask)); | |
499 | } | |
500 | } | |
501 | #endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) | |
502 | ||
7c673cae FG |
503 | #if defined(BOOST_ASIO_HAS_TIMERFD) |
504 | bool check_timers = (timer_fd_ == -1); | |
505 | #else // defined(BOOST_ASIO_HAS_TIMERFD) | |
506 | bool check_timers = true; | |
507 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
508 | ||
509 | // Dispatch the waiting events. | |
510 | for (int i = 0; i < num_events; ++i) | |
511 | { | |
512 | void* ptr = events[i].data.ptr; | |
513 | if (ptr == &interrupter_) | |
514 | { | |
515 | // No need to reset the interrupter since we're leaving the descriptor | |
516 | // in a ready-to-read state and relying on edge-triggered notifications | |
517 | // to make it so that we only get woken up when the descriptor's epoll | |
518 | // registration is updated. | |
519 | ||
520 | #if defined(BOOST_ASIO_HAS_TIMERFD) | |
521 | if (timer_fd_ == -1) | |
522 | check_timers = true; | |
523 | #else // defined(BOOST_ASIO_HAS_TIMERFD) | |
524 | check_timers = true; | |
525 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
526 | } | |
527 | #if defined(BOOST_ASIO_HAS_TIMERFD) | |
528 | else if (ptr == &timer_fd_) | |
529 | { | |
530 | check_timers = true; | |
531 | } | |
532 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
533 | else | |
534 | { | |
535 | // The descriptor operation doesn't count as work in and of itself, so we | |
b32b8144 | 536 | // don't call work_started() here. This still allows the scheduler to |
7c673cae FG |
537 | // stop if the only remaining operations are descriptor operations. |
538 | descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr); | |
b32b8144 FG |
539 | if (!ops.is_enqueued(descriptor_data)) |
540 | { | |
541 | descriptor_data->set_ready_events(events[i].events); | |
542 | ops.push(descriptor_data); | |
543 | } | |
544 | else | |
545 | { | |
546 | descriptor_data->add_ready_events(events[i].events); | |
547 | } | |
7c673cae FG |
548 | } |
549 | } | |
550 | ||
551 | if (check_timers) | |
552 | { | |
553 | mutex::scoped_lock common_lock(mutex_); | |
554 | timer_queues_.get_ready_timers(ops); | |
555 | ||
556 | #if defined(BOOST_ASIO_HAS_TIMERFD) | |
557 | if (timer_fd_ != -1) | |
558 | { | |
559 | itimerspec new_timeout; | |
560 | itimerspec old_timeout; | |
561 | int flags = get_timeout(new_timeout); | |
562 | timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); | |
563 | } | |
564 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
565 | } | |
566 | } | |
567 | ||
568 | void epoll_reactor::interrupt() | |
569 | { | |
570 | epoll_event ev = { 0, { 0 } }; | |
571 | ev.events = EPOLLIN | EPOLLERR | EPOLLET; | |
572 | ev.data.ptr = &interrupter_; | |
573 | epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev); | |
574 | } | |
575 | ||
576 | int epoll_reactor::do_epoll_create() | |
577 | { | |
578 | #if defined(EPOLL_CLOEXEC) | |
579 | int fd = epoll_create1(EPOLL_CLOEXEC); | |
580 | #else // defined(EPOLL_CLOEXEC) | |
581 | int fd = -1; | |
582 | errno = EINVAL; | |
583 | #endif // defined(EPOLL_CLOEXEC) | |
584 | ||
585 | if (fd == -1 && (errno == EINVAL || errno == ENOSYS)) | |
586 | { | |
587 | fd = epoll_create(epoll_size); | |
588 | if (fd != -1) | |
589 | ::fcntl(fd, F_SETFD, FD_CLOEXEC); | |
590 | } | |
591 | ||
592 | if (fd == -1) | |
593 | { | |
594 | boost::system::error_code ec(errno, | |
595 | boost::asio::error::get_system_category()); | |
596 | boost::asio::detail::throw_error(ec, "epoll"); | |
597 | } | |
598 | ||
599 | return fd; | |
600 | } | |
601 | ||
602 | int epoll_reactor::do_timerfd_create() | |
603 | { | |
604 | #if defined(BOOST_ASIO_HAS_TIMERFD) | |
605 | # if defined(TFD_CLOEXEC) | |
606 | int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC); | |
607 | # else // defined(TFD_CLOEXEC) | |
608 | int fd = -1; | |
609 | errno = EINVAL; | |
610 | # endif // defined(TFD_CLOEXEC) | |
611 | ||
612 | if (fd == -1 && errno == EINVAL) | |
613 | { | |
614 | fd = timerfd_create(CLOCK_MONOTONIC, 0); | |
615 | if (fd != -1) | |
616 | ::fcntl(fd, F_SETFD, FD_CLOEXEC); | |
617 | } | |
618 | ||
619 | return fd; | |
620 | #else // defined(BOOST_ASIO_HAS_TIMERFD) | |
621 | return -1; | |
622 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
623 | } | |
624 | ||
625 | epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state() | |
626 | { | |
627 | mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); | |
b32b8144 FG |
628 | return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( |
629 | REACTOR_IO, scheduler_.concurrency_hint())); | |
7c673cae FG |
630 | } |
631 | ||
632 | void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s) | |
633 | { | |
634 | mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); | |
635 | registered_descriptors_.free(s); | |
636 | } | |
637 | ||
638 | void epoll_reactor::do_add_timer_queue(timer_queue_base& queue) | |
639 | { | |
640 | mutex::scoped_lock lock(mutex_); | |
641 | timer_queues_.insert(&queue); | |
642 | } | |
643 | ||
644 | void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue) | |
645 | { | |
646 | mutex::scoped_lock lock(mutex_); | |
647 | timer_queues_.erase(&queue); | |
648 | } | |
649 | ||
650 | void epoll_reactor::update_timeout() | |
651 | { | |
652 | #if defined(BOOST_ASIO_HAS_TIMERFD) | |
653 | if (timer_fd_ != -1) | |
654 | { | |
655 | itimerspec new_timeout; | |
656 | itimerspec old_timeout; | |
657 | int flags = get_timeout(new_timeout); | |
658 | timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); | |
659 | return; | |
660 | } | |
661 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
662 | interrupt(); | |
663 | } | |
664 | ||
b32b8144 | 665 | int epoll_reactor::get_timeout(int msec) |
7c673cae FG |
666 | { |
667 | // By default we will wait no longer than 5 minutes. This will ensure that | |
668 | // any changes to the system clock are detected after no longer than this. | |
b32b8144 FG |
669 | const int max_msec = 5 * 60 * 1000; |
670 | return timer_queues_.wait_duration_msec( | |
671 | (msec < 0 || max_msec < msec) ? max_msec : msec); | |
7c673cae FG |
672 | } |
673 | ||
674 | #if defined(BOOST_ASIO_HAS_TIMERFD) | |
675 | int epoll_reactor::get_timeout(itimerspec& ts) | |
676 | { | |
677 | ts.it_interval.tv_sec = 0; | |
678 | ts.it_interval.tv_nsec = 0; | |
679 | ||
680 | long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000); | |
681 | ts.it_value.tv_sec = usec / 1000000; | |
682 | ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1; | |
683 | ||
684 | return usec ? 0 : TFD_TIMER_ABSTIME; | |
685 | } | |
686 | #endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
687 | ||
688 | struct epoll_reactor::perform_io_cleanup_on_block_exit | |
689 | { | |
690 | explicit perform_io_cleanup_on_block_exit(epoll_reactor* r) | |
691 | : reactor_(r), first_op_(0) | |
692 | { | |
693 | } | |
694 | ||
695 | ~perform_io_cleanup_on_block_exit() | |
696 | { | |
697 | if (first_op_) | |
698 | { | |
699 | // Post the remaining completed operations for invocation. | |
700 | if (!ops_.empty()) | |
b32b8144 | 701 | reactor_->scheduler_.post_deferred_completions(ops_); |
7c673cae FG |
702 | |
703 | // A user-initiated operation has completed, but there's no need to | |
704 | // explicitly call work_finished() here. Instead, we'll take advantage of | |
b32b8144 | 705 | // the fact that the scheduler will call work_finished() once we return. |
7c673cae FG |
706 | } |
707 | else | |
708 | { | |
709 | // No user-initiated operations have completed, so we need to compensate | |
b32b8144 FG |
710 | // for the work_finished() call that the scheduler will make once this |
711 | // operation returns. | |
712 | reactor_->scheduler_.compensating_work_started(); | |
7c673cae FG |
713 | } |
714 | } | |
715 | ||
716 | epoll_reactor* reactor_; | |
717 | op_queue<operation> ops_; | |
718 | operation* first_op_; | |
719 | }; | |
720 | ||
b32b8144 FG |
721 | epoll_reactor::descriptor_state::descriptor_state(bool locking) |
722 | : operation(&epoll_reactor::descriptor_state::do_complete), | |
723 | mutex_(locking) | |
7c673cae FG |
724 | { |
725 | } | |
726 | ||
727 | operation* epoll_reactor::descriptor_state::perform_io(uint32_t events) | |
728 | { | |
729 | mutex_.lock(); | |
730 | perform_io_cleanup_on_block_exit io_cleanup(reactor_); | |
731 | mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock); | |
732 | ||
733 | // Exception operations must be processed first to ensure that any | |
734 | // out-of-band data is read before normal data. | |
735 | static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI }; | |
736 | for (int j = max_ops - 1; j >= 0; --j) | |
737 | { | |
738 | if (events & (flag[j] | EPOLLERR | EPOLLHUP)) | |
739 | { | |
b32b8144 | 740 | try_speculative_[j] = true; |
7c673cae FG |
741 | while (reactor_op* op = op_queue_[j].front()) |
742 | { | |
b32b8144 | 743 | if (reactor_op::status status = op->perform()) |
7c673cae FG |
744 | { |
745 | op_queue_[j].pop(); | |
746 | io_cleanup.ops_.push(op); | |
b32b8144 FG |
747 | if (status == reactor_op::done_and_exhausted) |
748 | { | |
749 | try_speculative_[j] = false; | |
750 | break; | |
751 | } | |
7c673cae FG |
752 | } |
753 | else | |
754 | break; | |
755 | } | |
756 | } | |
757 | } | |
758 | ||
759 | // The first operation will be returned for completion now. The others will | |
760 | // be posted for later by the io_cleanup object's destructor. | |
761 | io_cleanup.first_op_ = io_cleanup.ops_.front(); | |
762 | io_cleanup.ops_.pop(); | |
763 | return io_cleanup.first_op_; | |
764 | } | |
765 | ||
766 | void epoll_reactor::descriptor_state::do_complete( | |
b32b8144 | 767 | void* owner, operation* base, |
7c673cae FG |
768 | const boost::system::error_code& ec, std::size_t bytes_transferred) |
769 | { | |
770 | if (owner) | |
771 | { | |
772 | descriptor_state* descriptor_data = static_cast<descriptor_state*>(base); | |
773 | uint32_t events = static_cast<uint32_t>(bytes_transferred); | |
774 | if (operation* op = descriptor_data->perform_io(events)) | |
775 | { | |
b32b8144 | 776 | op->complete(owner, ec, 0); |
7c673cae FG |
777 | } |
778 | } | |
779 | } | |
780 | ||
781 | } // namespace detail | |
782 | } // namespace asio | |
783 | } // namespace boost | |
784 | ||
785 | #include <boost/asio/detail/pop_options.hpp> | |
786 | ||
787 | #endif // defined(BOOST_ASIO_HAS_EPOLL) | |
788 | ||
789 | #endif // BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP |