]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/asio/detail/impl/scheduler.ipp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / boost / asio / detail / impl / scheduler.ipp
1 //
2 // detail/impl/scheduler.ipp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
12 #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
13
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18 #include <boost/asio/detail/config.hpp>
19
20 #include <boost/asio/detail/concurrency_hint.hpp>
21 #include <boost/asio/detail/event.hpp>
22 #include <boost/asio/detail/limits.hpp>
23 #include <boost/asio/detail/reactor.hpp>
24 #include <boost/asio/detail/scheduler.hpp>
25 #include <boost/asio/detail/scheduler_thread_info.hpp>
26
27 #include <boost/asio/detail/push_options.hpp>
28
29 namespace boost {
30 namespace asio {
31 namespace detail {
32
33 struct scheduler::task_cleanup
34 {
35 ~task_cleanup()
36 {
37 if (this_thread_->private_outstanding_work > 0)
38 {
39 boost::asio::detail::increment(
40 scheduler_->outstanding_work_,
41 this_thread_->private_outstanding_work);
42 }
43 this_thread_->private_outstanding_work = 0;
44
45 // Enqueue the completed operations and reinsert the task at the end of
46 // the operation queue.
47 lock_->lock();
48 scheduler_->task_interrupted_ = true;
49 scheduler_->op_queue_.push(this_thread_->private_op_queue);
50 scheduler_->op_queue_.push(&scheduler_->task_operation_);
51 }
52
53 scheduler* scheduler_;
54 mutex::scoped_lock* lock_;
55 thread_info* this_thread_;
56 };
57
58 struct scheduler::work_cleanup
59 {
60 ~work_cleanup()
61 {
62 if (this_thread_->private_outstanding_work > 1)
63 {
64 boost::asio::detail::increment(
65 scheduler_->outstanding_work_,
66 this_thread_->private_outstanding_work - 1);
67 }
68 else if (this_thread_->private_outstanding_work < 1)
69 {
70 scheduler_->work_finished();
71 }
72 this_thread_->private_outstanding_work = 0;
73
74 #if defined(BOOST_ASIO_HAS_THREADS)
75 if (!this_thread_->private_op_queue.empty())
76 {
77 lock_->lock();
78 scheduler_->op_queue_.push(this_thread_->private_op_queue);
79 }
80 #endif // defined(BOOST_ASIO_HAS_THREADS)
81 }
82
83 scheduler* scheduler_;
84 mutex::scoped_lock* lock_;
85 thread_info* this_thread_;
86 };
87
88 scheduler::scheduler(
89 boost::asio::execution_context& ctx, int concurrency_hint)
90 : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
91 one_thread_(concurrency_hint == 1
92 || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
93 SCHEDULER, concurrency_hint)
94 || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
95 REACTOR_IO, concurrency_hint)),
96 mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
97 SCHEDULER, concurrency_hint)),
98 task_(0),
99 task_interrupted_(true),
100 outstanding_work_(0),
101 stopped_(false),
102 shutdown_(false),
103 concurrency_hint_(concurrency_hint)
104 {
105 BOOST_ASIO_HANDLER_TRACKING_INIT;
106 }
107
108 void scheduler::shutdown()
109 {
110 mutex::scoped_lock lock(mutex_);
111 shutdown_ = true;
112 lock.unlock();
113
114 // Destroy handler objects.
115 while (!op_queue_.empty())
116 {
117 operation* o = op_queue_.front();
118 op_queue_.pop();
119 if (o != &task_operation_)
120 o->destroy();
121 }
122
123 // Reset to initial state.
124 task_ = 0;
125 }
126
127 void scheduler::init_task()
128 {
129 mutex::scoped_lock lock(mutex_);
130 if (!shutdown_ && !task_)
131 {
132 task_ = &use_service<reactor>(this->context());
133 op_queue_.push(&task_operation_);
134 wake_one_thread_and_unlock(lock);
135 }
136 }
137
138 std::size_t scheduler::run(boost::system::error_code& ec)
139 {
140 ec = boost::system::error_code();
141 if (outstanding_work_ == 0)
142 {
143 stop();
144 return 0;
145 }
146
147 thread_info this_thread;
148 this_thread.private_outstanding_work = 0;
149 thread_call_stack::context ctx(this, this_thread);
150
151 mutex::scoped_lock lock(mutex_);
152
153 std::size_t n = 0;
154 for (; do_run_one(lock, this_thread, ec); lock.lock())
155 if (n != (std::numeric_limits<std::size_t>::max)())
156 ++n;
157 return n;
158 }
159
160 std::size_t scheduler::run_one(boost::system::error_code& ec)
161 {
162 ec = boost::system::error_code();
163 if (outstanding_work_ == 0)
164 {
165 stop();
166 return 0;
167 }
168
169 thread_info this_thread;
170 this_thread.private_outstanding_work = 0;
171 thread_call_stack::context ctx(this, this_thread);
172
173 mutex::scoped_lock lock(mutex_);
174
175 return do_run_one(lock, this_thread, ec);
176 }
177
178 std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
179 {
180 ec = boost::system::error_code();
181 if (outstanding_work_ == 0)
182 {
183 stop();
184 return 0;
185 }
186
187 thread_info this_thread;
188 this_thread.private_outstanding_work = 0;
189 thread_call_stack::context ctx(this, this_thread);
190
191 mutex::scoped_lock lock(mutex_);
192
193 return do_wait_one(lock, this_thread, usec, ec);
194 }
195
196 std::size_t scheduler::poll(boost::system::error_code& ec)
197 {
198 ec = boost::system::error_code();
199 if (outstanding_work_ == 0)
200 {
201 stop();
202 return 0;
203 }
204
205 thread_info this_thread;
206 this_thread.private_outstanding_work = 0;
207 thread_call_stack::context ctx(this, this_thread);
208
209 mutex::scoped_lock lock(mutex_);
210
211 #if defined(BOOST_ASIO_HAS_THREADS)
212 // We want to support nested calls to poll() and poll_one(), so any handlers
213 // that are already on a thread-private queue need to be put on to the main
214 // queue now.
215 if (one_thread_)
216 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
217 op_queue_.push(outer_info->private_op_queue);
218 #endif // defined(BOOST_ASIO_HAS_THREADS)
219
220 std::size_t n = 0;
221 for (; do_poll_one(lock, this_thread, ec); lock.lock())
222 if (n != (std::numeric_limits<std::size_t>::max)())
223 ++n;
224 return n;
225 }
226
227 std::size_t scheduler::poll_one(boost::system::error_code& ec)
228 {
229 ec = boost::system::error_code();
230 if (outstanding_work_ == 0)
231 {
232 stop();
233 return 0;
234 }
235
236 thread_info this_thread;
237 this_thread.private_outstanding_work = 0;
238 thread_call_stack::context ctx(this, this_thread);
239
240 mutex::scoped_lock lock(mutex_);
241
242 #if defined(BOOST_ASIO_HAS_THREADS)
243 // We want to support nested calls to poll() and poll_one(), so any handlers
244 // that are already on a thread-private queue need to be put on to the main
245 // queue now.
246 if (one_thread_)
247 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
248 op_queue_.push(outer_info->private_op_queue);
249 #endif // defined(BOOST_ASIO_HAS_THREADS)
250
251 return do_poll_one(lock, this_thread, ec);
252 }
253
254 void scheduler::stop()
255 {
256 mutex::scoped_lock lock(mutex_);
257 stop_all_threads(lock);
258 }
259
260 bool scheduler::stopped() const
261 {
262 mutex::scoped_lock lock(mutex_);
263 return stopped_;
264 }
265
266 void scheduler::restart()
267 {
268 mutex::scoped_lock lock(mutex_);
269 stopped_ = false;
270 }
271
272 void scheduler::compensating_work_started()
273 {
274 thread_info_base* this_thread = thread_call_stack::contains(this);
275 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
276 }
277
278 void scheduler::post_immediate_completion(
279 scheduler::operation* op, bool is_continuation)
280 {
281 #if defined(BOOST_ASIO_HAS_THREADS)
282 if (one_thread_ || is_continuation)
283 {
284 if (thread_info_base* this_thread = thread_call_stack::contains(this))
285 {
286 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
287 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
288 return;
289 }
290 }
291 #else // defined(BOOST_ASIO_HAS_THREADS)
292 (void)is_continuation;
293 #endif // defined(BOOST_ASIO_HAS_THREADS)
294
295 work_started();
296 mutex::scoped_lock lock(mutex_);
297 op_queue_.push(op);
298 wake_one_thread_and_unlock(lock);
299 }
300
301 void scheduler::post_deferred_completion(scheduler::operation* op)
302 {
303 #if defined(BOOST_ASIO_HAS_THREADS)
304 if (one_thread_)
305 {
306 if (thread_info_base* this_thread = thread_call_stack::contains(this))
307 {
308 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
309 return;
310 }
311 }
312 #endif // defined(BOOST_ASIO_HAS_THREADS)
313
314 mutex::scoped_lock lock(mutex_);
315 op_queue_.push(op);
316 wake_one_thread_and_unlock(lock);
317 }
318
319 void scheduler::post_deferred_completions(
320 op_queue<scheduler::operation>& ops)
321 {
322 if (!ops.empty())
323 {
324 #if defined(BOOST_ASIO_HAS_THREADS)
325 if (one_thread_)
326 {
327 if (thread_info_base* this_thread = thread_call_stack::contains(this))
328 {
329 static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
330 return;
331 }
332 }
333 #endif // defined(BOOST_ASIO_HAS_THREADS)
334
335 mutex::scoped_lock lock(mutex_);
336 op_queue_.push(ops);
337 wake_one_thread_and_unlock(lock);
338 }
339 }
340
341 void scheduler::do_dispatch(
342 scheduler::operation* op)
343 {
344 work_started();
345 mutex::scoped_lock lock(mutex_);
346 op_queue_.push(op);
347 wake_one_thread_and_unlock(lock);
348 }
349
350 void scheduler::abandon_operations(
351 op_queue<scheduler::operation>& ops)
352 {
353 op_queue<scheduler::operation> ops2;
354 ops2.push(ops);
355 }
356
357 std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
358 scheduler::thread_info& this_thread,
359 const boost::system::error_code& ec)
360 {
361 while (!stopped_)
362 {
363 if (!op_queue_.empty())
364 {
365 // Prepare to execute first handler from queue.
366 operation* o = op_queue_.front();
367 op_queue_.pop();
368 bool more_handlers = (!op_queue_.empty());
369
370 if (o == &task_operation_)
371 {
372 task_interrupted_ = more_handlers;
373
374 if (more_handlers && !one_thread_)
375 wakeup_event_.unlock_and_signal_one(lock);
376 else
377 lock.unlock();
378
379 task_cleanup on_exit = { this, &lock, &this_thread };
380 (void)on_exit;
381
382 // Run the task. May throw an exception. Only block if the operation
383 // queue is empty and we're not polling, otherwise we want to return
384 // as soon as possible.
385 task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
386 }
387 else
388 {
389 std::size_t task_result = o->task_result_;
390
391 if (more_handlers && !one_thread_)
392 wake_one_thread_and_unlock(lock);
393 else
394 lock.unlock();
395
396 // Ensure the count of outstanding work is decremented on block exit.
397 work_cleanup on_exit = { this, &lock, &this_thread };
398 (void)on_exit;
399
400 // Complete the operation. May throw an exception. Deletes the object.
401 o->complete(this, ec, task_result);
402
403 return 1;
404 }
405 }
406 else
407 {
408 wakeup_event_.clear(lock);
409 wakeup_event_.wait(lock);
410 }
411 }
412
413 return 0;
414 }
415
416 std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
417 scheduler::thread_info& this_thread, long usec,
418 const boost::system::error_code& ec)
419 {
420 if (stopped_)
421 return 0;
422
423 operation* o = op_queue_.front();
424 if (o == 0)
425 {
426 wakeup_event_.clear(lock);
427 wakeup_event_.wait_for_usec(lock, usec);
428 usec = 0; // Wait at most once.
429 o = op_queue_.front();
430 }
431
432 if (o == &task_operation_)
433 {
434 op_queue_.pop();
435 bool more_handlers = (!op_queue_.empty());
436
437 task_interrupted_ = more_handlers;
438
439 if (more_handlers && !one_thread_)
440 wakeup_event_.unlock_and_signal_one(lock);
441 else
442 lock.unlock();
443
444 {
445 task_cleanup on_exit = { this, &lock, &this_thread };
446 (void)on_exit;
447
448 // Run the task. May throw an exception. Only block if the operation
449 // queue is empty and we're not polling, otherwise we want to return
450 // as soon as possible.
451 task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
452 }
453
454 o = op_queue_.front();
455 if (o == &task_operation_)
456 {
457 if (!one_thread_)
458 wakeup_event_.maybe_unlock_and_signal_one(lock);
459 return 0;
460 }
461 }
462
463 if (o == 0)
464 return 0;
465
466 op_queue_.pop();
467 bool more_handlers = (!op_queue_.empty());
468
469 std::size_t task_result = o->task_result_;
470
471 if (more_handlers && !one_thread_)
472 wake_one_thread_and_unlock(lock);
473 else
474 lock.unlock();
475
476 // Ensure the count of outstanding work is decremented on block exit.
477 work_cleanup on_exit = { this, &lock, &this_thread };
478 (void)on_exit;
479
480 // Complete the operation. May throw an exception. Deletes the object.
481 o->complete(this, ec, task_result);
482
483 return 1;
484 }
485
486 std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
487 scheduler::thread_info& this_thread,
488 const boost::system::error_code& ec)
489 {
490 if (stopped_)
491 return 0;
492
493 operation* o = op_queue_.front();
494 if (o == &task_operation_)
495 {
496 op_queue_.pop();
497 lock.unlock();
498
499 {
500 task_cleanup c = { this, &lock, &this_thread };
501 (void)c;
502
503 // Run the task. May throw an exception. Only block if the operation
504 // queue is empty and we're not polling, otherwise we want to return
505 // as soon as possible.
506 task_->run(0, this_thread.private_op_queue);
507 }
508
509 o = op_queue_.front();
510 if (o == &task_operation_)
511 {
512 wakeup_event_.maybe_unlock_and_signal_one(lock);
513 return 0;
514 }
515 }
516
517 if (o == 0)
518 return 0;
519
520 op_queue_.pop();
521 bool more_handlers = (!op_queue_.empty());
522
523 std::size_t task_result = o->task_result_;
524
525 if (more_handlers && !one_thread_)
526 wake_one_thread_and_unlock(lock);
527 else
528 lock.unlock();
529
530 // Ensure the count of outstanding work is decremented on block exit.
531 work_cleanup on_exit = { this, &lock, &this_thread };
532 (void)on_exit;
533
534 // Complete the operation. May throw an exception. Deletes the object.
535 o->complete(this, ec, task_result);
536
537 return 1;
538 }
539
540 void scheduler::stop_all_threads(
541 mutex::scoped_lock& lock)
542 {
543 stopped_ = true;
544 wakeup_event_.signal_all(lock);
545
546 if (!task_interrupted_ && task_)
547 {
548 task_interrupted_ = true;
549 task_->interrupt();
550 }
551 }
552
553 void scheduler::wake_one_thread_and_unlock(
554 mutex::scoped_lock& lock)
555 {
556 if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
557 {
558 if (!task_interrupted_ && task_)
559 {
560 task_interrupted_ = true;
561 task_->interrupt();
562 }
563 lock.unlock();
564 }
565 }
566
567 } // namespace detail
568 } // namespace asio
569 } // namespace boost
570
571 #include <boost/asio/detail/pop_options.hpp>
572
573 #endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP