]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/asio/detail/impl/scheduler.ipp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / boost / asio / detail / impl / scheduler.ipp
CommitLineData
7c673cae 1//
b32b8144
FG
2// detail/impl/scheduler.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~
7c673cae 4//
11fdf7f2 5// Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7c673cae
FG
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
b32b8144
FG
11#ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
12#define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
7c673cae
FG
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19
b32b8144 20#include <boost/asio/detail/concurrency_hint.hpp>
7c673cae
FG
21#include <boost/asio/detail/event.hpp>
22#include <boost/asio/detail/limits.hpp>
23#include <boost/asio/detail/reactor.hpp>
b32b8144
FG
24#include <boost/asio/detail/scheduler.hpp>
25#include <boost/asio/detail/scheduler_thread_info.hpp>
7c673cae
FG
26
27#include <boost/asio/detail/push_options.hpp>
28
29namespace boost {
30namespace asio {
31namespace detail {
32
b32b8144 33struct scheduler::task_cleanup
7c673cae
FG
34{
35 ~task_cleanup()
36 {
37 if (this_thread_->private_outstanding_work > 0)
38 {
39 boost::asio::detail::increment(
b32b8144 40 scheduler_->outstanding_work_,
7c673cae
FG
41 this_thread_->private_outstanding_work);
42 }
43 this_thread_->private_outstanding_work = 0;
44
45 // Enqueue the completed operations and reinsert the task at the end of
46 // the operation queue.
47 lock_->lock();
b32b8144
FG
48 scheduler_->task_interrupted_ = true;
49 scheduler_->op_queue_.push(this_thread_->private_op_queue);
50 scheduler_->op_queue_.push(&scheduler_->task_operation_);
7c673cae
FG
51 }
52
b32b8144 53 scheduler* scheduler_;
7c673cae
FG
54 mutex::scoped_lock* lock_;
55 thread_info* this_thread_;
56};
57
b32b8144 58struct scheduler::work_cleanup
7c673cae
FG
59{
60 ~work_cleanup()
61 {
62 if (this_thread_->private_outstanding_work > 1)
63 {
64 boost::asio::detail::increment(
b32b8144 65 scheduler_->outstanding_work_,
7c673cae
FG
66 this_thread_->private_outstanding_work - 1);
67 }
68 else if (this_thread_->private_outstanding_work < 1)
69 {
b32b8144 70 scheduler_->work_finished();
7c673cae
FG
71 }
72 this_thread_->private_outstanding_work = 0;
73
74#if defined(BOOST_ASIO_HAS_THREADS)
75 if (!this_thread_->private_op_queue.empty())
76 {
77 lock_->lock();
b32b8144 78 scheduler_->op_queue_.push(this_thread_->private_op_queue);
7c673cae
FG
79 }
80#endif // defined(BOOST_ASIO_HAS_THREADS)
81 }
82
b32b8144 83 scheduler* scheduler_;
7c673cae
FG
84 mutex::scoped_lock* lock_;
85 thread_info* this_thread_;
86};
87
b32b8144
FG
88scheduler::scheduler(
89 boost::asio::execution_context& ctx, int concurrency_hint)
90 : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
91 one_thread_(concurrency_hint == 1
92 || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
93 SCHEDULER, concurrency_hint)
94 || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
95 REACTOR_IO, concurrency_hint)),
96 mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
97 SCHEDULER, concurrency_hint)),
7c673cae
FG
98 task_(0),
99 task_interrupted_(true),
100 outstanding_work_(0),
101 stopped_(false),
b32b8144
FG
102 shutdown_(false),
103 concurrency_hint_(concurrency_hint)
7c673cae
FG
104{
105 BOOST_ASIO_HANDLER_TRACKING_INIT;
106}
107
b32b8144 108void scheduler::shutdown()
7c673cae
FG
109{
110 mutex::scoped_lock lock(mutex_);
111 shutdown_ = true;
112 lock.unlock();
113
114 // Destroy handler objects.
115 while (!op_queue_.empty())
116 {
117 operation* o = op_queue_.front();
118 op_queue_.pop();
119 if (o != &task_operation_)
120 o->destroy();
121 }
122
123 // Reset to initial state.
124 task_ = 0;
125}
126
b32b8144 127void scheduler::init_task()
7c673cae
FG
128{
129 mutex::scoped_lock lock(mutex_);
130 if (!shutdown_ && !task_)
131 {
b32b8144 132 task_ = &use_service<reactor>(this->context());
7c673cae
FG
133 op_queue_.push(&task_operation_);
134 wake_one_thread_and_unlock(lock);
135 }
136}
137
b32b8144 138std::size_t scheduler::run(boost::system::error_code& ec)
7c673cae
FG
139{
140 ec = boost::system::error_code();
141 if (outstanding_work_ == 0)
142 {
143 stop();
144 return 0;
145 }
146
147 thread_info this_thread;
148 this_thread.private_outstanding_work = 0;
149 thread_call_stack::context ctx(this, this_thread);
150
151 mutex::scoped_lock lock(mutex_);
152
153 std::size_t n = 0;
154 for (; do_run_one(lock, this_thread, ec); lock.lock())
155 if (n != (std::numeric_limits<std::size_t>::max)())
156 ++n;
157 return n;
158}
159
b32b8144 160std::size_t scheduler::run_one(boost::system::error_code& ec)
7c673cae
FG
161{
162 ec = boost::system::error_code();
163 if (outstanding_work_ == 0)
164 {
165 stop();
166 return 0;
167 }
168
169 thread_info this_thread;
170 this_thread.private_outstanding_work = 0;
171 thread_call_stack::context ctx(this, this_thread);
172
173 mutex::scoped_lock lock(mutex_);
174
175 return do_run_one(lock, this_thread, ec);
176}
177
b32b8144
FG
178std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
179{
180 ec = boost::system::error_code();
181 if (outstanding_work_ == 0)
182 {
183 stop();
184 return 0;
185 }
186
187 thread_info this_thread;
188 this_thread.private_outstanding_work = 0;
189 thread_call_stack::context ctx(this, this_thread);
190
191 mutex::scoped_lock lock(mutex_);
192
193 return do_wait_one(lock, this_thread, usec, ec);
194}
195
196std::size_t scheduler::poll(boost::system::error_code& ec)
7c673cae
FG
197{
198 ec = boost::system::error_code();
199 if (outstanding_work_ == 0)
200 {
201 stop();
202 return 0;
203 }
204
205 thread_info this_thread;
206 this_thread.private_outstanding_work = 0;
207 thread_call_stack::context ctx(this, this_thread);
208
209 mutex::scoped_lock lock(mutex_);
210
211#if defined(BOOST_ASIO_HAS_THREADS)
212 // We want to support nested calls to poll() and poll_one(), so any handlers
213 // that are already on a thread-private queue need to be put on to the main
214 // queue now.
215 if (one_thread_)
b32b8144
FG
216 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
217 op_queue_.push(outer_info->private_op_queue);
7c673cae
FG
218#endif // defined(BOOST_ASIO_HAS_THREADS)
219
220 std::size_t n = 0;
221 for (; do_poll_one(lock, this_thread, ec); lock.lock())
222 if (n != (std::numeric_limits<std::size_t>::max)())
223 ++n;
224 return n;
225}
226
b32b8144 227std::size_t scheduler::poll_one(boost::system::error_code& ec)
7c673cae
FG
228{
229 ec = boost::system::error_code();
230 if (outstanding_work_ == 0)
231 {
232 stop();
233 return 0;
234 }
235
236 thread_info this_thread;
237 this_thread.private_outstanding_work = 0;
238 thread_call_stack::context ctx(this, this_thread);
239
240 mutex::scoped_lock lock(mutex_);
241
242#if defined(BOOST_ASIO_HAS_THREADS)
243 // We want to support nested calls to poll() and poll_one(), so any handlers
244 // that are already on a thread-private queue need to be put on to the main
245 // queue now.
246 if (one_thread_)
b32b8144
FG
247 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
248 op_queue_.push(outer_info->private_op_queue);
7c673cae
FG
249#endif // defined(BOOST_ASIO_HAS_THREADS)
250
251 return do_poll_one(lock, this_thread, ec);
252}
253
b32b8144 254void scheduler::stop()
7c673cae
FG
255{
256 mutex::scoped_lock lock(mutex_);
257 stop_all_threads(lock);
258}
259
b32b8144 260bool scheduler::stopped() const
7c673cae
FG
261{
262 mutex::scoped_lock lock(mutex_);
263 return stopped_;
264}
265
b32b8144 266void scheduler::restart()
7c673cae
FG
267{
268 mutex::scoped_lock lock(mutex_);
269 stopped_ = false;
270}
271
b32b8144
FG
272void scheduler::compensating_work_started()
273{
274 thread_info_base* this_thread = thread_call_stack::contains(this);
275 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
276}
277
278void scheduler::post_immediate_completion(
279 scheduler::operation* op, bool is_continuation)
7c673cae
FG
280{
281#if defined(BOOST_ASIO_HAS_THREADS)
282 if (one_thread_ || is_continuation)
283 {
b32b8144 284 if (thread_info_base* this_thread = thread_call_stack::contains(this))
7c673cae 285 {
b32b8144
FG
286 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
287 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
7c673cae
FG
288 return;
289 }
290 }
291#else // defined(BOOST_ASIO_HAS_THREADS)
292 (void)is_continuation;
293#endif // defined(BOOST_ASIO_HAS_THREADS)
294
295 work_started();
296 mutex::scoped_lock lock(mutex_);
297 op_queue_.push(op);
298 wake_one_thread_and_unlock(lock);
299}
300
b32b8144 301void scheduler::post_deferred_completion(scheduler::operation* op)
7c673cae
FG
302{
303#if defined(BOOST_ASIO_HAS_THREADS)
304 if (one_thread_)
305 {
b32b8144 306 if (thread_info_base* this_thread = thread_call_stack::contains(this))
7c673cae 307 {
b32b8144 308 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
7c673cae
FG
309 return;
310 }
311 }
312#endif // defined(BOOST_ASIO_HAS_THREADS)
313
314 mutex::scoped_lock lock(mutex_);
315 op_queue_.push(op);
316 wake_one_thread_and_unlock(lock);
317}
318
b32b8144
FG
319void scheduler::post_deferred_completions(
320 op_queue<scheduler::operation>& ops)
7c673cae
FG
321{
322 if (!ops.empty())
323 {
324#if defined(BOOST_ASIO_HAS_THREADS)
325 if (one_thread_)
326 {
b32b8144 327 if (thread_info_base* this_thread = thread_call_stack::contains(this))
7c673cae 328 {
b32b8144 329 static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
7c673cae
FG
330 return;
331 }
332 }
333#endif // defined(BOOST_ASIO_HAS_THREADS)
334
335 mutex::scoped_lock lock(mutex_);
336 op_queue_.push(ops);
337 wake_one_thread_and_unlock(lock);
338 }
339}
340
b32b8144
FG
341void scheduler::do_dispatch(
342 scheduler::operation* op)
7c673cae
FG
343{
344 work_started();
345 mutex::scoped_lock lock(mutex_);
346 op_queue_.push(op);
347 wake_one_thread_and_unlock(lock);
348}
349
b32b8144
FG
350void scheduler::abandon_operations(
351 op_queue<scheduler::operation>& ops)
7c673cae 352{
b32b8144 353 op_queue<scheduler::operation> ops2;
7c673cae
FG
354 ops2.push(ops);
355}
356
b32b8144
FG
357std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
358 scheduler::thread_info& this_thread,
7c673cae
FG
359 const boost::system::error_code& ec)
360{
361 while (!stopped_)
362 {
363 if (!op_queue_.empty())
364 {
365 // Prepare to execute first handler from queue.
366 operation* o = op_queue_.front();
367 op_queue_.pop();
368 bool more_handlers = (!op_queue_.empty());
369
370 if (o == &task_operation_)
371 {
372 task_interrupted_ = more_handlers;
373
374 if (more_handlers && !one_thread_)
375 wakeup_event_.unlock_and_signal_one(lock);
376 else
377 lock.unlock();
378
379 task_cleanup on_exit = { this, &lock, &this_thread };
380 (void)on_exit;
381
382 // Run the task. May throw an exception. Only block if the operation
383 // queue is empty and we're not polling, otherwise we want to return
384 // as soon as possible.
b32b8144 385 task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
7c673cae
FG
386 }
387 else
388 {
389 std::size_t task_result = o->task_result_;
390
391 if (more_handlers && !one_thread_)
392 wake_one_thread_and_unlock(lock);
393 else
394 lock.unlock();
395
396 // Ensure the count of outstanding work is decremented on block exit.
397 work_cleanup on_exit = { this, &lock, &this_thread };
398 (void)on_exit;
399
400 // Complete the operation. May throw an exception. Deletes the object.
b32b8144 401 o->complete(this, ec, task_result);
7c673cae
FG
402
403 return 1;
404 }
405 }
406 else
407 {
408 wakeup_event_.clear(lock);
409 wakeup_event_.wait(lock);
410 }
411 }
412
413 return 0;
414}
415
b32b8144
FG
416std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
417 scheduler::thread_info& this_thread, long usec,
418 const boost::system::error_code& ec)
419{
420 if (stopped_)
421 return 0;
422
423 operation* o = op_queue_.front();
424 if (o == 0)
425 {
426 wakeup_event_.clear(lock);
427 wakeup_event_.wait_for_usec(lock, usec);
428 usec = 0; // Wait at most once.
429 o = op_queue_.front();
430 }
431
432 if (o == &task_operation_)
433 {
434 op_queue_.pop();
435 bool more_handlers = (!op_queue_.empty());
436
437 task_interrupted_ = more_handlers;
438
439 if (more_handlers && !one_thread_)
440 wakeup_event_.unlock_and_signal_one(lock);
441 else
442 lock.unlock();
443
444 {
445 task_cleanup on_exit = { this, &lock, &this_thread };
446 (void)on_exit;
447
448 // Run the task. May throw an exception. Only block if the operation
449 // queue is empty and we're not polling, otherwise we want to return
450 // as soon as possible.
451 task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
452 }
453
454 o = op_queue_.front();
455 if (o == &task_operation_)
456 {
457 if (!one_thread_)
458 wakeup_event_.maybe_unlock_and_signal_one(lock);
459 return 0;
460 }
461 }
462
463 if (o == 0)
464 return 0;
465
466 op_queue_.pop();
467 bool more_handlers = (!op_queue_.empty());
468
469 std::size_t task_result = o->task_result_;
470
471 if (more_handlers && !one_thread_)
472 wake_one_thread_and_unlock(lock);
473 else
474 lock.unlock();
475
476 // Ensure the count of outstanding work is decremented on block exit.
477 work_cleanup on_exit = { this, &lock, &this_thread };
478 (void)on_exit;
479
480 // Complete the operation. May throw an exception. Deletes the object.
481 o->complete(this, ec, task_result);
482
483 return 1;
484}
485
486std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
487 scheduler::thread_info& this_thread,
7c673cae
FG
488 const boost::system::error_code& ec)
489{
490 if (stopped_)
491 return 0;
492
493 operation* o = op_queue_.front();
494 if (o == &task_operation_)
495 {
496 op_queue_.pop();
497 lock.unlock();
498
499 {
500 task_cleanup c = { this, &lock, &this_thread };
501 (void)c;
502
503 // Run the task. May throw an exception. Only block if the operation
504 // queue is empty and we're not polling, otherwise we want to return
505 // as soon as possible.
b32b8144 506 task_->run(0, this_thread.private_op_queue);
7c673cae
FG
507 }
508
509 o = op_queue_.front();
510 if (o == &task_operation_)
511 {
512 wakeup_event_.maybe_unlock_and_signal_one(lock);
513 return 0;
514 }
515 }
516
517 if (o == 0)
518 return 0;
519
520 op_queue_.pop();
521 bool more_handlers = (!op_queue_.empty());
522
523 std::size_t task_result = o->task_result_;
524
525 if (more_handlers && !one_thread_)
526 wake_one_thread_and_unlock(lock);
527 else
528 lock.unlock();
529
530 // Ensure the count of outstanding work is decremented on block exit.
531 work_cleanup on_exit = { this, &lock, &this_thread };
532 (void)on_exit;
533
534 // Complete the operation. May throw an exception. Deletes the object.
b32b8144 535 o->complete(this, ec, task_result);
7c673cae
FG
536
537 return 1;
538}
539
b32b8144 540void scheduler::stop_all_threads(
7c673cae
FG
541 mutex::scoped_lock& lock)
542{
543 stopped_ = true;
544 wakeup_event_.signal_all(lock);
545
546 if (!task_interrupted_ && task_)
547 {
548 task_interrupted_ = true;
549 task_->interrupt();
550 }
551}
552
b32b8144 553void scheduler::wake_one_thread_and_unlock(
7c673cae
FG
554 mutex::scoped_lock& lock)
555{
556 if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
557 {
558 if (!task_interrupted_ && task_)
559 {
560 task_interrupted_ = true;
561 task_->interrupt();
562 }
563 lock.unlock();
564 }
565}
566
567} // namespace detail
568} // namespace asio
569} // namespace boost
570
571#include <boost/asio/detail/pop_options.hpp>
572
b32b8144 573#endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP