]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/asio/detail/impl/dev_poll_reactor.ipp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / boost / asio / detail / impl / dev_poll_reactor.ipp
CommitLineData
7c673cae
FG
1//
2// detail/impl/dev_poll_reactor.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
1e59de90 5// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7c673cae
FG
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#ifndef BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
12#define BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19
20#if defined(BOOST_ASIO_HAS_DEV_POLL)
21
22#include <boost/asio/detail/dev_poll_reactor.hpp>
23#include <boost/asio/detail/assert.hpp>
1e59de90 24#include <boost/asio/detail/scheduler.hpp>
7c673cae
FG
25#include <boost/asio/detail/throw_error.hpp>
26#include <boost/asio/error.hpp>
27
28#include <boost/asio/detail/push_options.hpp>
29
30namespace boost {
31namespace asio {
32namespace detail {
33
b32b8144
FG
34dev_poll_reactor::dev_poll_reactor(boost::asio::execution_context& ctx)
35 : boost::asio::detail::execution_context_service_base<dev_poll_reactor>(ctx),
36 scheduler_(use_service<scheduler>(ctx)),
7c673cae
FG
37 mutex_(),
38 dev_poll_fd_(do_dev_poll_create()),
39 interrupter_(),
40 shutdown_(false)
41{
42 // Add the interrupter's descriptor to /dev/poll.
43 ::pollfd ev = { 0, 0, 0 };
44 ev.fd = interrupter_.read_descriptor();
45 ev.events = POLLIN | POLLERR;
46 ev.revents = 0;
47 ::write(dev_poll_fd_, &ev, sizeof(ev));
48}
49
50dev_poll_reactor::~dev_poll_reactor()
51{
b32b8144 52 shutdown();
7c673cae
FG
53 ::close(dev_poll_fd_);
54}
55
b32b8144 56void dev_poll_reactor::shutdown()
7c673cae
FG
57{
58 boost::asio::detail::mutex::scoped_lock lock(mutex_);
59 shutdown_ = true;
60 lock.unlock();
61
62 op_queue<operation> ops;
63
64 for (int i = 0; i < max_ops; ++i)
65 op_queue_[i].get_all_operations(ops);
66
67 timer_queues_.get_all_timers(ops);
68
b32b8144 69 scheduler_.abandon_operations(ops);
7c673cae
FG
70}
71
b32b8144
FG
72void dev_poll_reactor::notify_fork(
73 boost::asio::execution_context::fork_event fork_ev)
7c673cae 74{
b32b8144 75 if (fork_ev == boost::asio::execution_context::fork_child)
7c673cae
FG
76 {
77 detail::mutex::scoped_lock lock(mutex_);
78
79 if (dev_poll_fd_ != -1)
80 ::close(dev_poll_fd_);
81 dev_poll_fd_ = -1;
82 dev_poll_fd_ = do_dev_poll_create();
83
84 interrupter_.recreate();
85
86 // Add the interrupter's descriptor to /dev/poll.
87 ::pollfd ev = { 0, 0, 0 };
88 ev.fd = interrupter_.read_descriptor();
89 ev.events = POLLIN | POLLERR;
90 ev.revents = 0;
91 ::write(dev_poll_fd_, &ev, sizeof(ev));
92
93 // Re-register all descriptors with /dev/poll. The changes will be written
94 // to the /dev/poll descriptor the next time the reactor is run.
95 for (int i = 0; i < max_ops; ++i)
96 {
97 reactor_op_queue<socket_type>::iterator iter = op_queue_[i].begin();
98 reactor_op_queue<socket_type>::iterator end = op_queue_[i].end();
99 for (; iter != end; ++iter)
100 {
101 ::pollfd& pending_ev = add_pending_event_change(iter->first);
102 pending_ev.events |= POLLERR | POLLHUP;
103 switch (i)
104 {
105 case read_op: pending_ev.events |= POLLIN; break;
106 case write_op: pending_ev.events |= POLLOUT; break;
107 case except_op: pending_ev.events |= POLLPRI; break;
108 default: break;
109 }
110 }
111 }
112 interrupter_.interrupt();
113 }
114}
115
116void dev_poll_reactor::init_task()
117{
b32b8144 118 scheduler_.init_task();
7c673cae
FG
119}
120
121int dev_poll_reactor::register_descriptor(socket_type, per_descriptor_data&)
122{
123 return 0;
124}
125
126int dev_poll_reactor::register_internal_descriptor(int op_type,
127 socket_type descriptor, per_descriptor_data&, reactor_op* op)
128{
129 boost::asio::detail::mutex::scoped_lock lock(mutex_);
130
131 op_queue_[op_type].enqueue_operation(descriptor, op);
132 ::pollfd& ev = add_pending_event_change(descriptor);
133 ev.events = POLLERR | POLLHUP;
134 switch (op_type)
135 {
136 case read_op: ev.events |= POLLIN; break;
137 case write_op: ev.events |= POLLOUT; break;
138 case except_op: ev.events |= POLLPRI; break;
139 default: break;
140 }
141 interrupter_.interrupt();
142
143 return 0;
144}
145
146void dev_poll_reactor::move_descriptor(socket_type,
147 dev_poll_reactor::per_descriptor_data&,
148 dev_poll_reactor::per_descriptor_data&)
149{
150}
151
152void dev_poll_reactor::start_op(int op_type, socket_type descriptor,
153 dev_poll_reactor::per_descriptor_data&, reactor_op* op,
154 bool is_continuation, bool allow_speculative)
155{
156 boost::asio::detail::mutex::scoped_lock lock(mutex_);
157
158 if (shutdown_)
159 {
160 post_immediate_completion(op, is_continuation);
161 return;
162 }
163
164 if (allow_speculative)
165 {
166 if (op_type != read_op || !op_queue_[except_op].has_operation(descriptor))
167 {
168 if (!op_queue_[op_type].has_operation(descriptor))
169 {
170 if (op->perform())
171 {
172 lock.unlock();
b32b8144 173 scheduler_.post_immediate_completion(op, is_continuation);
7c673cae
FG
174 return;
175 }
176 }
177 }
178 }
179
180 bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
b32b8144 181 scheduler_.work_started();
7c673cae
FG
182 if (first)
183 {
184 ::pollfd& ev = add_pending_event_change(descriptor);
185 ev.events = POLLERR | POLLHUP;
186 if (op_type == read_op
187 || op_queue_[read_op].has_operation(descriptor))
188 ev.events |= POLLIN;
189 if (op_type == write_op
190 || op_queue_[write_op].has_operation(descriptor))
191 ev.events |= POLLOUT;
192 if (op_type == except_op
193 || op_queue_[except_op].has_operation(descriptor))
194 ev.events |= POLLPRI;
195 interrupter_.interrupt();
196 }
197}
198
199void dev_poll_reactor::cancel_ops(socket_type descriptor,
200 dev_poll_reactor::per_descriptor_data&)
201{
202 boost::asio::detail::mutex::scoped_lock lock(mutex_);
203 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
204}
205
1e59de90
TL
206void dev_poll_reactor::cancel_ops_by_key(socket_type descriptor,
207 dev_poll_reactor::per_descriptor_data&,
208 int op_type, void* cancellation_key)
209{
210 boost::asio::detail::mutex::scoped_lock lock(mutex_);
211 op_queue<operation> ops;
212 bool need_interrupt = op_queue_[op_type].cancel_operations_by_key(
213 descriptor, ops, cancellation_key, boost::asio::error::operation_aborted);
214 scheduler_.post_deferred_completions(ops);
215 if (need_interrupt)
216 interrupter_.interrupt();
217}
218
7c673cae
FG
219void dev_poll_reactor::deregister_descriptor(socket_type descriptor,
220 dev_poll_reactor::per_descriptor_data&, bool)
221{
222 boost::asio::detail::mutex::scoped_lock lock(mutex_);
223
224 // Remove the descriptor from /dev/poll.
225 ::pollfd& ev = add_pending_event_change(descriptor);
226 ev.events = POLLREMOVE;
227 interrupter_.interrupt();
228
229 // Cancel any outstanding operations associated with the descriptor.
230 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
231}
232
233void dev_poll_reactor::deregister_internal_descriptor(
234 socket_type descriptor, dev_poll_reactor::per_descriptor_data&)
235{
236 boost::asio::detail::mutex::scoped_lock lock(mutex_);
237
238 // Remove the descriptor from /dev/poll. Since this function is only called
239 // during a fork, we can apply the change immediately.
240 ::pollfd ev = { 0, 0, 0 };
241 ev.fd = descriptor;
242 ev.events = POLLREMOVE;
243 ev.revents = 0;
244 ::write(dev_poll_fd_, &ev, sizeof(ev));
245
246 // Destroy all operations associated with the descriptor.
247 op_queue<operation> ops;
248 boost::system::error_code ec;
249 for (int i = 0; i < max_ops; ++i)
250 op_queue_[i].cancel_operations(descriptor, ops, ec);
251}
252
b32b8144
FG
253void dev_poll_reactor::cleanup_descriptor_data(
254 dev_poll_reactor::per_descriptor_data&)
255{
256}
257
258void dev_poll_reactor::run(long usec, op_queue<operation>& ops)
7c673cae
FG
259{
260 boost::asio::detail::mutex::scoped_lock lock(mutex_);
261
262 // We can return immediately if there's no work to do and the reactor is
263 // not supposed to block.
b32b8144 264 if (usec == 0 && op_queue_[read_op].empty() && op_queue_[write_op].empty()
7c673cae
FG
265 && op_queue_[except_op].empty() && timer_queues_.all_empty())
266 return;
267
268 // Write the pending event registration changes to the /dev/poll descriptor.
269 std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size();
270 if (events_size > 0)
271 {
272 errno = 0;
273 int result = ::write(dev_poll_fd_,
274 &pending_event_changes_[0], events_size);
275 if (result != static_cast<int>(events_size))
276 {
277 boost::system::error_code ec = boost::system::error_code(
278 errno, boost::asio::error::get_system_category());
279 for (std::size_t i = 0; i < pending_event_changes_.size(); ++i)
280 {
281 int descriptor = pending_event_changes_[i].fd;
282 for (int j = 0; j < max_ops; ++j)
283 op_queue_[j].cancel_operations(descriptor, ops, ec);
284 }
285 }
286 pending_event_changes_.clear();
287 pending_event_change_index_.clear();
288 }
289
b32b8144
FG
290 // Calculate timeout.
291 int timeout;
292 if (usec == 0)
293 timeout = 0;
294 else
295 {
296 timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
297 timeout = get_timeout(timeout);
298 }
7c673cae
FG
299 lock.unlock();
300
301 // Block on the /dev/poll descriptor.
302 ::pollfd events[128] = { { 0, 0, 0 } };
303 ::dvpoll dp = { 0, 0, 0 };
304 dp.dp_fds = events;
305 dp.dp_nfds = 128;
306 dp.dp_timeout = timeout;
307 int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp);
308
309 lock.lock();
310
311 // Dispatch the waiting events.
312 for (int i = 0; i < num_events; ++i)
313 {
314 int descriptor = events[i].fd;
315 if (descriptor == interrupter_.read_descriptor())
316 {
317 interrupter_.reset();
318 }
319 else
320 {
321 bool more_reads = false;
322 bool more_writes = false;
323 bool more_except = false;
324
325 // Exception operations must be processed first to ensure that any
326 // out-of-band data is read before normal data.
327 if (events[i].events & (POLLPRI | POLLERR | POLLHUP))
328 more_except =
329 op_queue_[except_op].perform_operations(descriptor, ops);
330 else
331 more_except = op_queue_[except_op].has_operation(descriptor);
332
333 if (events[i].events & (POLLIN | POLLERR | POLLHUP))
334 more_reads = op_queue_[read_op].perform_operations(descriptor, ops);
335 else
336 more_reads = op_queue_[read_op].has_operation(descriptor);
337
338 if (events[i].events & (POLLOUT | POLLERR | POLLHUP))
339 more_writes = op_queue_[write_op].perform_operations(descriptor, ops);
340 else
341 more_writes = op_queue_[write_op].has_operation(descriptor);
342
343 if ((events[i].events & (POLLERR | POLLHUP)) != 0
344 && !more_except && !more_reads && !more_writes)
345 {
346 // If we have an event and no operations associated with the
347 // descriptor then we need to delete the descriptor from /dev/poll.
348 // The poll operation can produce POLLHUP or POLLERR events when there
349 // is no operation pending, so if we do not remove the descriptor we
350 // can end up in a tight polling loop.
351 ::pollfd ev = { 0, 0, 0 };
352 ev.fd = descriptor;
353 ev.events = POLLREMOVE;
354 ev.revents = 0;
355 ::write(dev_poll_fd_, &ev, sizeof(ev));
356 }
357 else
358 {
359 ::pollfd ev = { 0, 0, 0 };
360 ev.fd = descriptor;
361 ev.events = POLLERR | POLLHUP;
362 if (more_reads)
363 ev.events |= POLLIN;
364 if (more_writes)
365 ev.events |= POLLOUT;
366 if (more_except)
367 ev.events |= POLLPRI;
368 ev.revents = 0;
369 int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
370 if (result != sizeof(ev))
371 {
372 boost::system::error_code ec(errno,
373 boost::asio::error::get_system_category());
374 for (int j = 0; j < max_ops; ++j)
375 op_queue_[j].cancel_operations(descriptor, ops, ec);
376 }
377 }
378 }
379 }
380 timer_queues_.get_ready_timers(ops);
381}
382
383void dev_poll_reactor::interrupt()
384{
385 interrupter_.interrupt();
386}
387
388int dev_poll_reactor::do_dev_poll_create()
389{
390 int fd = ::open("/dev/poll", O_RDWR);
391 if (fd == -1)
392 {
393 boost::system::error_code ec(errno,
394 boost::asio::error::get_system_category());
395 boost::asio::detail::throw_error(ec, "/dev/poll");
396 }
397 return fd;
398}
399
400void dev_poll_reactor::do_add_timer_queue(timer_queue_base& queue)
401{
402 mutex::scoped_lock lock(mutex_);
403 timer_queues_.insert(&queue);
404}
405
406void dev_poll_reactor::do_remove_timer_queue(timer_queue_base& queue)
407{
408 mutex::scoped_lock lock(mutex_);
409 timer_queues_.erase(&queue);
410}
411
b32b8144 412int dev_poll_reactor::get_timeout(int msec)
7c673cae
FG
413{
414 // By default we will wait no longer than 5 minutes. This will ensure that
415 // any changes to the system clock are detected after no longer than this.
b32b8144
FG
416 const int max_msec = 5 * 60 * 1000;
417 return timer_queues_.wait_duration_msec(
418 (msec < 0 || max_msec < msec) ? max_msec : msec);
7c673cae
FG
419}
420
421void dev_poll_reactor::cancel_ops_unlocked(socket_type descriptor,
422 const boost::system::error_code& ec)
423{
424 bool need_interrupt = false;
425 op_queue<operation> ops;
426 for (int i = 0; i < max_ops; ++i)
427 need_interrupt = op_queue_[i].cancel_operations(
428 descriptor, ops, ec) || need_interrupt;
b32b8144 429 scheduler_.post_deferred_completions(ops);
7c673cae
FG
430 if (need_interrupt)
431 interrupter_.interrupt();
432}
433
434::pollfd& dev_poll_reactor::add_pending_event_change(int descriptor)
435{
436 hash_map<int, std::size_t>::iterator iter
437 = pending_event_change_index_.find(descriptor);
438 if (iter == pending_event_change_index_.end())
439 {
440 std::size_t index = pending_event_changes_.size();
441 pending_event_changes_.reserve(pending_event_changes_.size() + 1);
442 pending_event_change_index_.insert(std::make_pair(descriptor, index));
443 pending_event_changes_.push_back(::pollfd());
444 pending_event_changes_[index].fd = descriptor;
445 pending_event_changes_[index].revents = 0;
446 return pending_event_changes_[index];
447 }
448 else
449 {
450 return pending_event_changes_[iter->second];
451 }
452}
453
454} // namespace detail
455} // namespace asio
456} // namespace boost
457
458#include <boost/asio/detail/pop_options.hpp>
459
460#endif // defined(BOOST_ASIO_HAS_DEV_POLL)
461
462#endif // BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP