]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/asio/detail/impl/scheduler.ipp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / boost / asio / detail / impl / scheduler.ipp
CommitLineData
7c673cae 1//
b32b8144
FG
2// detail/impl/scheduler.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~
7c673cae 4//
1e59de90 5// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7c673cae
FG
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
b32b8144
FG
11#ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
12#define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
7c673cae
FG
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19
b32b8144 20#include <boost/asio/detail/concurrency_hint.hpp>
7c673cae
FG
21#include <boost/asio/detail/event.hpp>
22#include <boost/asio/detail/limits.hpp>
b32b8144
FG
23#include <boost/asio/detail/scheduler.hpp>
24#include <boost/asio/detail/scheduler_thread_info.hpp>
92f5a8d4 25#include <boost/asio/detail/signal_blocker.hpp>
7c673cae 26
1e59de90
TL
27#if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
28# include <boost/asio/detail/io_uring_service.hpp>
29#else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
30# include <boost/asio/detail/reactor.hpp>
31#endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
32
7c673cae
FG
33#include <boost/asio/detail/push_options.hpp>
34
35namespace boost {
36namespace asio {
37namespace detail {
38
92f5a8d4
TL
39class scheduler::thread_function
40{
41public:
42 explicit thread_function(scheduler* s)
43 : this_(s)
44 {
45 }
46
47 void operator()()
48 {
49 boost::system::error_code ec;
50 this_->run(ec);
51 }
52
53private:
54 scheduler* this_;
55};
56
b32b8144 57struct scheduler::task_cleanup
7c673cae
FG
58{
59 ~task_cleanup()
60 {
61 if (this_thread_->private_outstanding_work > 0)
62 {
63 boost::asio::detail::increment(
b32b8144 64 scheduler_->outstanding_work_,
7c673cae
FG
65 this_thread_->private_outstanding_work);
66 }
67 this_thread_->private_outstanding_work = 0;
68
69 // Enqueue the completed operations and reinsert the task at the end of
70 // the operation queue.
71 lock_->lock();
b32b8144
FG
72 scheduler_->task_interrupted_ = true;
73 scheduler_->op_queue_.push(this_thread_->private_op_queue);
74 scheduler_->op_queue_.push(&scheduler_->task_operation_);
7c673cae
FG
75 }
76
b32b8144 77 scheduler* scheduler_;
7c673cae
FG
78 mutex::scoped_lock* lock_;
79 thread_info* this_thread_;
80};
81
b32b8144 82struct scheduler::work_cleanup
7c673cae
FG
83{
84 ~work_cleanup()
85 {
86 if (this_thread_->private_outstanding_work > 1)
87 {
88 boost::asio::detail::increment(
b32b8144 89 scheduler_->outstanding_work_,
7c673cae
FG
90 this_thread_->private_outstanding_work - 1);
91 }
92 else if (this_thread_->private_outstanding_work < 1)
93 {
b32b8144 94 scheduler_->work_finished();
7c673cae
FG
95 }
96 this_thread_->private_outstanding_work = 0;
97
98#if defined(BOOST_ASIO_HAS_THREADS)
99 if (!this_thread_->private_op_queue.empty())
100 {
101 lock_->lock();
b32b8144 102 scheduler_->op_queue_.push(this_thread_->private_op_queue);
7c673cae
FG
103 }
104#endif // defined(BOOST_ASIO_HAS_THREADS)
105 }
106
b32b8144 107 scheduler* scheduler_;
7c673cae
FG
108 mutex::scoped_lock* lock_;
109 thread_info* this_thread_;
110};
111
92f5a8d4 112scheduler::scheduler(boost::asio::execution_context& ctx,
1e59de90 113 int concurrency_hint, bool own_thread, get_task_func_type get_task)
b32b8144
FG
114 : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
115 one_thread_(concurrency_hint == 1
116 || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
117 SCHEDULER, concurrency_hint)
118 || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
119 REACTOR_IO, concurrency_hint)),
120 mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
121 SCHEDULER, concurrency_hint)),
7c673cae 122 task_(0),
1e59de90 123 get_task_(get_task),
7c673cae
FG
124 task_interrupted_(true),
125 outstanding_work_(0),
126 stopped_(false),
b32b8144 127 shutdown_(false),
92f5a8d4
TL
128 concurrency_hint_(concurrency_hint),
129 thread_(0)
7c673cae
FG
130{
131 BOOST_ASIO_HANDLER_TRACKING_INIT;
92f5a8d4
TL
132
133 if (own_thread)
134 {
135 ++outstanding_work_;
136 boost::asio::detail::signal_blocker sb;
137 thread_ = new boost::asio::detail::thread(thread_function(this));
138 }
139}
140
141scheduler::~scheduler()
142{
143 if (thread_)
144 {
20effc67
TL
145 mutex::scoped_lock lock(mutex_);
146 shutdown_ = true;
147 stop_all_threads(lock);
148 lock.unlock();
92f5a8d4
TL
149 thread_->join();
150 delete thread_;
151 }
7c673cae
FG
152}
153
b32b8144 154void scheduler::shutdown()
7c673cae
FG
155{
156 mutex::scoped_lock lock(mutex_);
157 shutdown_ = true;
92f5a8d4
TL
158 if (thread_)
159 stop_all_threads(lock);
7c673cae
FG
160 lock.unlock();
161
92f5a8d4
TL
162 // Join thread to ensure task operation is returned to queue.
163 if (thread_)
164 {
165 thread_->join();
166 delete thread_;
167 thread_ = 0;
168 }
169
7c673cae
FG
170 // Destroy handler objects.
171 while (!op_queue_.empty())
172 {
173 operation* o = op_queue_.front();
174 op_queue_.pop();
175 if (o != &task_operation_)
176 o->destroy();
177 }
178
179 // Reset to initial state.
180 task_ = 0;
181}
182
b32b8144 183void scheduler::init_task()
7c673cae
FG
184{
185 mutex::scoped_lock lock(mutex_);
186 if (!shutdown_ && !task_)
187 {
1e59de90 188 task_ = get_task_(this->context());
7c673cae
FG
189 op_queue_.push(&task_operation_);
190 wake_one_thread_and_unlock(lock);
191 }
192}
193
b32b8144 194std::size_t scheduler::run(boost::system::error_code& ec)
7c673cae
FG
195{
196 ec = boost::system::error_code();
197 if (outstanding_work_ == 0)
198 {
199 stop();
200 return 0;
201 }
202
203 thread_info this_thread;
204 this_thread.private_outstanding_work = 0;
205 thread_call_stack::context ctx(this, this_thread);
206
207 mutex::scoped_lock lock(mutex_);
208
209 std::size_t n = 0;
210 for (; do_run_one(lock, this_thread, ec); lock.lock())
211 if (n != (std::numeric_limits<std::size_t>::max)())
212 ++n;
213 return n;
214}
215
b32b8144 216std::size_t scheduler::run_one(boost::system::error_code& ec)
7c673cae
FG
217{
218 ec = boost::system::error_code();
219 if (outstanding_work_ == 0)
220 {
221 stop();
222 return 0;
223 }
224
225 thread_info this_thread;
226 this_thread.private_outstanding_work = 0;
227 thread_call_stack::context ctx(this, this_thread);
228
229 mutex::scoped_lock lock(mutex_);
230
231 return do_run_one(lock, this_thread, ec);
232}
233
b32b8144
FG
234std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
235{
236 ec = boost::system::error_code();
237 if (outstanding_work_ == 0)
238 {
239 stop();
240 return 0;
241 }
242
243 thread_info this_thread;
244 this_thread.private_outstanding_work = 0;
245 thread_call_stack::context ctx(this, this_thread);
246
247 mutex::scoped_lock lock(mutex_);
248
249 return do_wait_one(lock, this_thread, usec, ec);
250}
251
252std::size_t scheduler::poll(boost::system::error_code& ec)
7c673cae
FG
253{
254 ec = boost::system::error_code();
255 if (outstanding_work_ == 0)
256 {
257 stop();
258 return 0;
259 }
260
261 thread_info this_thread;
262 this_thread.private_outstanding_work = 0;
263 thread_call_stack::context ctx(this, this_thread);
264
265 mutex::scoped_lock lock(mutex_);
266
267#if defined(BOOST_ASIO_HAS_THREADS)
268 // We want to support nested calls to poll() and poll_one(), so any handlers
269 // that are already on a thread-private queue need to be put on to the main
270 // queue now.
271 if (one_thread_)
b32b8144
FG
272 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
273 op_queue_.push(outer_info->private_op_queue);
7c673cae
FG
274#endif // defined(BOOST_ASIO_HAS_THREADS)
275
276 std::size_t n = 0;
277 for (; do_poll_one(lock, this_thread, ec); lock.lock())
278 if (n != (std::numeric_limits<std::size_t>::max)())
279 ++n;
280 return n;
281}
282
b32b8144 283std::size_t scheduler::poll_one(boost::system::error_code& ec)
7c673cae
FG
284{
285 ec = boost::system::error_code();
286 if (outstanding_work_ == 0)
287 {
288 stop();
289 return 0;
290 }
291
292 thread_info this_thread;
293 this_thread.private_outstanding_work = 0;
294 thread_call_stack::context ctx(this, this_thread);
295
296 mutex::scoped_lock lock(mutex_);
297
298#if defined(BOOST_ASIO_HAS_THREADS)
299 // We want to support nested calls to poll() and poll_one(), so any handlers
300 // that are already on a thread-private queue need to be put on to the main
301 // queue now.
302 if (one_thread_)
b32b8144
FG
303 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
304 op_queue_.push(outer_info->private_op_queue);
7c673cae
FG
305#endif // defined(BOOST_ASIO_HAS_THREADS)
306
307 return do_poll_one(lock, this_thread, ec);
308}
309
b32b8144 310void scheduler::stop()
7c673cae
FG
311{
312 mutex::scoped_lock lock(mutex_);
313 stop_all_threads(lock);
314}
315
b32b8144 316bool scheduler::stopped() const
7c673cae
FG
317{
318 mutex::scoped_lock lock(mutex_);
319 return stopped_;
320}
321
b32b8144 322void scheduler::restart()
7c673cae
FG
323{
324 mutex::scoped_lock lock(mutex_);
325 stopped_ = false;
326}
327
b32b8144
FG
328void scheduler::compensating_work_started()
329{
330 thread_info_base* this_thread = thread_call_stack::contains(this);
331 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
332}
333
1e59de90
TL
334bool scheduler::can_dispatch()
335{
336 return thread_call_stack::contains(this) != 0;
337}
338
20effc67
TL
339void scheduler::capture_current_exception()
340{
341 if (thread_info_base* this_thread = thread_call_stack::contains(this))
342 this_thread->capture_current_exception();
343}
344
b32b8144
FG
345void scheduler::post_immediate_completion(
346 scheduler::operation* op, bool is_continuation)
7c673cae
FG
347{
348#if defined(BOOST_ASIO_HAS_THREADS)
349 if (one_thread_ || is_continuation)
350 {
b32b8144 351 if (thread_info_base* this_thread = thread_call_stack::contains(this))
7c673cae 352 {
b32b8144
FG
353 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
354 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
7c673cae
FG
355 return;
356 }
357 }
358#else // defined(BOOST_ASIO_HAS_THREADS)
359 (void)is_continuation;
360#endif // defined(BOOST_ASIO_HAS_THREADS)
361
362 work_started();
363 mutex::scoped_lock lock(mutex_);
364 op_queue_.push(op);
365 wake_one_thread_and_unlock(lock);
366}
367
20effc67
TL
368void scheduler::post_immediate_completions(std::size_t n,
369 op_queue<scheduler::operation>& ops, bool is_continuation)
370{
371#if defined(BOOST_ASIO_HAS_THREADS)
372 if (one_thread_ || is_continuation)
373 {
374 if (thread_info_base* this_thread = thread_call_stack::contains(this))
375 {
376 static_cast<thread_info*>(this_thread)->private_outstanding_work
377 += static_cast<long>(n);
378 static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
379 return;
380 }
381 }
382#else // defined(BOOST_ASIO_HAS_THREADS)
383 (void)is_continuation;
384#endif // defined(BOOST_ASIO_HAS_THREADS)
385
386 increment(outstanding_work_, static_cast<long>(n));
387 mutex::scoped_lock lock(mutex_);
388 op_queue_.push(ops);
389 wake_one_thread_and_unlock(lock);
390}
391
b32b8144 392void scheduler::post_deferred_completion(scheduler::operation* op)
7c673cae
FG
393{
394#if defined(BOOST_ASIO_HAS_THREADS)
395 if (one_thread_)
396 {
b32b8144 397 if (thread_info_base* this_thread = thread_call_stack::contains(this))
7c673cae 398 {
b32b8144 399 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
7c673cae
FG
400 return;
401 }
402 }
403#endif // defined(BOOST_ASIO_HAS_THREADS)
404
405 mutex::scoped_lock lock(mutex_);
406 op_queue_.push(op);
407 wake_one_thread_and_unlock(lock);
408}
409
b32b8144
FG
410void scheduler::post_deferred_completions(
411 op_queue<scheduler::operation>& ops)
7c673cae
FG
412{
413 if (!ops.empty())
414 {
415#if defined(BOOST_ASIO_HAS_THREADS)
416 if (one_thread_)
417 {
b32b8144 418 if (thread_info_base* this_thread = thread_call_stack::contains(this))
7c673cae 419 {
b32b8144 420 static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
7c673cae
FG
421 return;
422 }
423 }
424#endif // defined(BOOST_ASIO_HAS_THREADS)
425
426 mutex::scoped_lock lock(mutex_);
427 op_queue_.push(ops);
428 wake_one_thread_and_unlock(lock);
429 }
430}
431
b32b8144
FG
432void scheduler::do_dispatch(
433 scheduler::operation* op)
7c673cae
FG
434{
435 work_started();
436 mutex::scoped_lock lock(mutex_);
437 op_queue_.push(op);
438 wake_one_thread_and_unlock(lock);
439}
440
b32b8144
FG
441void scheduler::abandon_operations(
442 op_queue<scheduler::operation>& ops)
7c673cae 443{
b32b8144 444 op_queue<scheduler::operation> ops2;
7c673cae
FG
445 ops2.push(ops);
446}
447
b32b8144
FG
448std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
449 scheduler::thread_info& this_thread,
7c673cae
FG
450 const boost::system::error_code& ec)
451{
452 while (!stopped_)
453 {
454 if (!op_queue_.empty())
455 {
456 // Prepare to execute first handler from queue.
457 operation* o = op_queue_.front();
458 op_queue_.pop();
459 bool more_handlers = (!op_queue_.empty());
460
461 if (o == &task_operation_)
462 {
463 task_interrupted_ = more_handlers;
464
465 if (more_handlers && !one_thread_)
466 wakeup_event_.unlock_and_signal_one(lock);
467 else
468 lock.unlock();
469
470 task_cleanup on_exit = { this, &lock, &this_thread };
471 (void)on_exit;
472
473 // Run the task. May throw an exception. Only block if the operation
474 // queue is empty and we're not polling, otherwise we want to return
475 // as soon as possible.
b32b8144 476 task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
7c673cae
FG
477 }
478 else
479 {
480 std::size_t task_result = o->task_result_;
481
482 if (more_handlers && !one_thread_)
483 wake_one_thread_and_unlock(lock);
484 else
485 lock.unlock();
486
487 // Ensure the count of outstanding work is decremented on block exit.
488 work_cleanup on_exit = { this, &lock, &this_thread };
489 (void)on_exit;
490
491 // Complete the operation. May throw an exception. Deletes the object.
b32b8144 492 o->complete(this, ec, task_result);
20effc67 493 this_thread.rethrow_pending_exception();
7c673cae
FG
494
495 return 1;
496 }
497 }
498 else
499 {
500 wakeup_event_.clear(lock);
501 wakeup_event_.wait(lock);
502 }
503 }
504
505 return 0;
506}
507
b32b8144
FG
508std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
509 scheduler::thread_info& this_thread, long usec,
510 const boost::system::error_code& ec)
511{
512 if (stopped_)
513 return 0;
514
515 operation* o = op_queue_.front();
516 if (o == 0)
517 {
518 wakeup_event_.clear(lock);
519 wakeup_event_.wait_for_usec(lock, usec);
520 usec = 0; // Wait at most once.
521 o = op_queue_.front();
522 }
523
524 if (o == &task_operation_)
525 {
526 op_queue_.pop();
527 bool more_handlers = (!op_queue_.empty());
528
529 task_interrupted_ = more_handlers;
530
531 if (more_handlers && !one_thread_)
532 wakeup_event_.unlock_and_signal_one(lock);
533 else
534 lock.unlock();
535
536 {
537 task_cleanup on_exit = { this, &lock, &this_thread };
538 (void)on_exit;
539
540 // Run the task. May throw an exception. Only block if the operation
541 // queue is empty and we're not polling, otherwise we want to return
542 // as soon as possible.
543 task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
544 }
545
546 o = op_queue_.front();
547 if (o == &task_operation_)
548 {
549 if (!one_thread_)
550 wakeup_event_.maybe_unlock_and_signal_one(lock);
551 return 0;
552 }
553 }
554
555 if (o == 0)
556 return 0;
557
558 op_queue_.pop();
559 bool more_handlers = (!op_queue_.empty());
560
561 std::size_t task_result = o->task_result_;
562
563 if (more_handlers && !one_thread_)
564 wake_one_thread_and_unlock(lock);
565 else
566 lock.unlock();
567
568 // Ensure the count of outstanding work is decremented on block exit.
569 work_cleanup on_exit = { this, &lock, &this_thread };
570 (void)on_exit;
571
572 // Complete the operation. May throw an exception. Deletes the object.
573 o->complete(this, ec, task_result);
20effc67 574 this_thread.rethrow_pending_exception();
b32b8144
FG
575
576 return 1;
577}
578
579std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
580 scheduler::thread_info& this_thread,
7c673cae
FG
581 const boost::system::error_code& ec)
582{
583 if (stopped_)
584 return 0;
585
586 operation* o = op_queue_.front();
587 if (o == &task_operation_)
588 {
589 op_queue_.pop();
590 lock.unlock();
591
592 {
593 task_cleanup c = { this, &lock, &this_thread };
594 (void)c;
595
596 // Run the task. May throw an exception. Only block if the operation
597 // queue is empty and we're not polling, otherwise we want to return
598 // as soon as possible.
b32b8144 599 task_->run(0, this_thread.private_op_queue);
7c673cae
FG
600 }
601
602 o = op_queue_.front();
603 if (o == &task_operation_)
604 {
605 wakeup_event_.maybe_unlock_and_signal_one(lock);
606 return 0;
607 }
608 }
609
610 if (o == 0)
611 return 0;
612
613 op_queue_.pop();
614 bool more_handlers = (!op_queue_.empty());
615
616 std::size_t task_result = o->task_result_;
617
618 if (more_handlers && !one_thread_)
619 wake_one_thread_and_unlock(lock);
620 else
621 lock.unlock();
622
623 // Ensure the count of outstanding work is decremented on block exit.
624 work_cleanup on_exit = { this, &lock, &this_thread };
625 (void)on_exit;
626
627 // Complete the operation. May throw an exception. Deletes the object.
b32b8144 628 o->complete(this, ec, task_result);
20effc67 629 this_thread.rethrow_pending_exception();
7c673cae
FG
630
631 return 1;
632}
633
b32b8144 634void scheduler::stop_all_threads(
7c673cae
FG
635 mutex::scoped_lock& lock)
636{
637 stopped_ = true;
638 wakeup_event_.signal_all(lock);
639
640 if (!task_interrupted_ && task_)
641 {
642 task_interrupted_ = true;
643 task_->interrupt();
644 }
645}
646
b32b8144 647void scheduler::wake_one_thread_and_unlock(
7c673cae
FG
648 mutex::scoped_lock& lock)
649{
650 if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
651 {
652 if (!task_interrupted_ && task_)
653 {
654 task_interrupted_ = true;
655 task_->interrupt();
656 }
657 lock.unlock();
658 }
659}
660
1e59de90
TL
661scheduler_task* scheduler::get_default_task(boost::asio::execution_context& ctx)
662{
663#if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
664 return &use_service<io_uring_service>(ctx);
665#else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
666 return &use_service<reactor>(ctx);
667#endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
668}
669
7c673cae
FG
670} // namespace detail
671} // namespace asio
672} // namespace boost
673
674#include <boost/asio/detail/pop_options.hpp>
675
b32b8144 676#endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP