]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // |
b32b8144 FG |
2 | // detail/impl/scheduler.ipp |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~ | |
7c673cae | 4 | // |
1e59de90 | 5 | // Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
7c673cae FG |
6 | // |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
b32b8144 FG |
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP |
12 | #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP | |
7c673cae FG |
13 | |
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
b32b8144 | 20 | #include <boost/asio/detail/concurrency_hint.hpp> |
7c673cae FG |
21 | #include <boost/asio/detail/event.hpp> |
22 | #include <boost/asio/detail/limits.hpp> | |
b32b8144 FG |
23 | #include <boost/asio/detail/scheduler.hpp> |
24 | #include <boost/asio/detail/scheduler_thread_info.hpp> | |
92f5a8d4 | 25 | #include <boost/asio/detail/signal_blocker.hpp> |
7c673cae | 26 | |
1e59de90 TL |
27 | #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT) |
28 | # include <boost/asio/detail/io_uring_service.hpp> | |
29 | #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT) | |
30 | # include <boost/asio/detail/reactor.hpp> | |
31 | #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT) | |
32 | ||
7c673cae FG |
33 | #include <boost/asio/detail/push_options.hpp> |
34 | ||
35 | namespace boost { | |
36 | namespace asio { | |
37 | namespace detail { | |
38 | ||
92f5a8d4 TL |
39 | class scheduler::thread_function |
40 | { | |
41 | public: | |
42 | explicit thread_function(scheduler* s) | |
43 | : this_(s) | |
44 | { | |
45 | } | |
46 | ||
47 | void operator()() | |
48 | { | |
49 | boost::system::error_code ec; | |
50 | this_->run(ec); | |
51 | } | |
52 | ||
53 | private: | |
54 | scheduler* this_; | |
55 | }; | |
56 | ||
b32b8144 | 57 | struct scheduler::task_cleanup |
7c673cae FG |
58 | { |
59 | ~task_cleanup() | |
60 | { | |
61 | if (this_thread_->private_outstanding_work > 0) | |
62 | { | |
63 | boost::asio::detail::increment( | |
b32b8144 | 64 | scheduler_->outstanding_work_, |
7c673cae FG |
65 | this_thread_->private_outstanding_work); |
66 | } | |
67 | this_thread_->private_outstanding_work = 0; | |
68 | ||
69 | // Enqueue the completed operations and reinsert the task at the end of | |
70 | // the operation queue. | |
71 | lock_->lock(); | |
b32b8144 FG |
72 | scheduler_->task_interrupted_ = true; |
73 | scheduler_->op_queue_.push(this_thread_->private_op_queue); | |
74 | scheduler_->op_queue_.push(&scheduler_->task_operation_); | |
7c673cae FG |
75 | } |
76 | ||
b32b8144 | 77 | scheduler* scheduler_; |
7c673cae FG |
78 | mutex::scoped_lock* lock_; |
79 | thread_info* this_thread_; | |
80 | }; | |
81 | ||
b32b8144 | 82 | struct scheduler::work_cleanup |
7c673cae FG |
83 | { |
84 | ~work_cleanup() | |
85 | { | |
86 | if (this_thread_->private_outstanding_work > 1) | |
87 | { | |
88 | boost::asio::detail::increment( | |
b32b8144 | 89 | scheduler_->outstanding_work_, |
7c673cae FG |
90 | this_thread_->private_outstanding_work - 1); |
91 | } | |
92 | else if (this_thread_->private_outstanding_work < 1) | |
93 | { | |
b32b8144 | 94 | scheduler_->work_finished(); |
7c673cae FG |
95 | } |
96 | this_thread_->private_outstanding_work = 0; | |
97 | ||
98 | #if defined(BOOST_ASIO_HAS_THREADS) | |
99 | if (!this_thread_->private_op_queue.empty()) | |
100 | { | |
101 | lock_->lock(); | |
b32b8144 | 102 | scheduler_->op_queue_.push(this_thread_->private_op_queue); |
7c673cae FG |
103 | } |
104 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
105 | } | |
106 | ||
b32b8144 | 107 | scheduler* scheduler_; |
7c673cae FG |
108 | mutex::scoped_lock* lock_; |
109 | thread_info* this_thread_; | |
110 | }; | |
111 | ||
92f5a8d4 | 112 | scheduler::scheduler(boost::asio::execution_context& ctx, |
1e59de90 | 113 | int concurrency_hint, bool own_thread, get_task_func_type get_task) |
b32b8144 FG |
114 | : boost::asio::detail::execution_context_service_base<scheduler>(ctx), |
115 | one_thread_(concurrency_hint == 1 | |
116 | || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( | |
117 | SCHEDULER, concurrency_hint) | |
118 | || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( | |
119 | REACTOR_IO, concurrency_hint)), | |
120 | mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( | |
121 | SCHEDULER, concurrency_hint)), | |
7c673cae | 122 | task_(0), |
1e59de90 | 123 | get_task_(get_task), |
7c673cae FG |
124 | task_interrupted_(true), |
125 | outstanding_work_(0), | |
126 | stopped_(false), | |
b32b8144 | 127 | shutdown_(false), |
92f5a8d4 TL |
128 | concurrency_hint_(concurrency_hint), |
129 | thread_(0) | |
7c673cae FG |
130 | { |
131 | BOOST_ASIO_HANDLER_TRACKING_INIT; | |
92f5a8d4 TL |
132 | |
133 | if (own_thread) | |
134 | { | |
135 | ++outstanding_work_; | |
136 | boost::asio::detail::signal_blocker sb; | |
137 | thread_ = new boost::asio::detail::thread(thread_function(this)); | |
138 | } | |
139 | } | |
140 | ||
141 | scheduler::~scheduler() | |
142 | { | |
143 | if (thread_) | |
144 | { | |
20effc67 TL |
145 | mutex::scoped_lock lock(mutex_); |
146 | shutdown_ = true; | |
147 | stop_all_threads(lock); | |
148 | lock.unlock(); | |
92f5a8d4 TL |
149 | thread_->join(); |
150 | delete thread_; | |
151 | } | |
7c673cae FG |
152 | } |
153 | ||
b32b8144 | 154 | void scheduler::shutdown() |
7c673cae FG |
155 | { |
156 | mutex::scoped_lock lock(mutex_); | |
157 | shutdown_ = true; | |
92f5a8d4 TL |
158 | if (thread_) |
159 | stop_all_threads(lock); | |
7c673cae FG |
160 | lock.unlock(); |
161 | ||
92f5a8d4 TL |
162 | // Join thread to ensure task operation is returned to queue. |
163 | if (thread_) | |
164 | { | |
165 | thread_->join(); | |
166 | delete thread_; | |
167 | thread_ = 0; | |
168 | } | |
169 | ||
7c673cae FG |
170 | // Destroy handler objects. |
171 | while (!op_queue_.empty()) | |
172 | { | |
173 | operation* o = op_queue_.front(); | |
174 | op_queue_.pop(); | |
175 | if (o != &task_operation_) | |
176 | o->destroy(); | |
177 | } | |
178 | ||
179 | // Reset to initial state. | |
180 | task_ = 0; | |
181 | } | |
182 | ||
b32b8144 | 183 | void scheduler::init_task() |
7c673cae FG |
184 | { |
185 | mutex::scoped_lock lock(mutex_); | |
186 | if (!shutdown_ && !task_) | |
187 | { | |
1e59de90 | 188 | task_ = get_task_(this->context()); |
7c673cae FG |
189 | op_queue_.push(&task_operation_); |
190 | wake_one_thread_and_unlock(lock); | |
191 | } | |
192 | } | |
193 | ||
b32b8144 | 194 | std::size_t scheduler::run(boost::system::error_code& ec) |
7c673cae FG |
195 | { |
196 | ec = boost::system::error_code(); | |
197 | if (outstanding_work_ == 0) | |
198 | { | |
199 | stop(); | |
200 | return 0; | |
201 | } | |
202 | ||
203 | thread_info this_thread; | |
204 | this_thread.private_outstanding_work = 0; | |
205 | thread_call_stack::context ctx(this, this_thread); | |
206 | ||
207 | mutex::scoped_lock lock(mutex_); | |
208 | ||
209 | std::size_t n = 0; | |
210 | for (; do_run_one(lock, this_thread, ec); lock.lock()) | |
211 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
212 | ++n; | |
213 | return n; | |
214 | } | |
215 | ||
b32b8144 | 216 | std::size_t scheduler::run_one(boost::system::error_code& ec) |
7c673cae FG |
217 | { |
218 | ec = boost::system::error_code(); | |
219 | if (outstanding_work_ == 0) | |
220 | { | |
221 | stop(); | |
222 | return 0; | |
223 | } | |
224 | ||
225 | thread_info this_thread; | |
226 | this_thread.private_outstanding_work = 0; | |
227 | thread_call_stack::context ctx(this, this_thread); | |
228 | ||
229 | mutex::scoped_lock lock(mutex_); | |
230 | ||
231 | return do_run_one(lock, this_thread, ec); | |
232 | } | |
233 | ||
b32b8144 FG |
234 | std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec) |
235 | { | |
236 | ec = boost::system::error_code(); | |
237 | if (outstanding_work_ == 0) | |
238 | { | |
239 | stop(); | |
240 | return 0; | |
241 | } | |
242 | ||
243 | thread_info this_thread; | |
244 | this_thread.private_outstanding_work = 0; | |
245 | thread_call_stack::context ctx(this, this_thread); | |
246 | ||
247 | mutex::scoped_lock lock(mutex_); | |
248 | ||
249 | return do_wait_one(lock, this_thread, usec, ec); | |
250 | } | |
251 | ||
252 | std::size_t scheduler::poll(boost::system::error_code& ec) | |
7c673cae FG |
253 | { |
254 | ec = boost::system::error_code(); | |
255 | if (outstanding_work_ == 0) | |
256 | { | |
257 | stop(); | |
258 | return 0; | |
259 | } | |
260 | ||
261 | thread_info this_thread; | |
262 | this_thread.private_outstanding_work = 0; | |
263 | thread_call_stack::context ctx(this, this_thread); | |
264 | ||
265 | mutex::scoped_lock lock(mutex_); | |
266 | ||
267 | #if defined(BOOST_ASIO_HAS_THREADS) | |
268 | // We want to support nested calls to poll() and poll_one(), so any handlers | |
269 | // that are already on a thread-private queue need to be put on to the main | |
270 | // queue now. | |
271 | if (one_thread_) | |
b32b8144 FG |
272 | if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key())) |
273 | op_queue_.push(outer_info->private_op_queue); | |
7c673cae FG |
274 | #endif // defined(BOOST_ASIO_HAS_THREADS) |
275 | ||
276 | std::size_t n = 0; | |
277 | for (; do_poll_one(lock, this_thread, ec); lock.lock()) | |
278 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
279 | ++n; | |
280 | return n; | |
281 | } | |
282 | ||
b32b8144 | 283 | std::size_t scheduler::poll_one(boost::system::error_code& ec) |
7c673cae FG |
284 | { |
285 | ec = boost::system::error_code(); | |
286 | if (outstanding_work_ == 0) | |
287 | { | |
288 | stop(); | |
289 | return 0; | |
290 | } | |
291 | ||
292 | thread_info this_thread; | |
293 | this_thread.private_outstanding_work = 0; | |
294 | thread_call_stack::context ctx(this, this_thread); | |
295 | ||
296 | mutex::scoped_lock lock(mutex_); | |
297 | ||
298 | #if defined(BOOST_ASIO_HAS_THREADS) | |
299 | // We want to support nested calls to poll() and poll_one(), so any handlers | |
300 | // that are already on a thread-private queue need to be put on to the main | |
301 | // queue now. | |
302 | if (one_thread_) | |
b32b8144 FG |
303 | if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key())) |
304 | op_queue_.push(outer_info->private_op_queue); | |
7c673cae FG |
305 | #endif // defined(BOOST_ASIO_HAS_THREADS) |
306 | ||
307 | return do_poll_one(lock, this_thread, ec); | |
308 | } | |
309 | ||
b32b8144 | 310 | void scheduler::stop() |
7c673cae FG |
311 | { |
312 | mutex::scoped_lock lock(mutex_); | |
313 | stop_all_threads(lock); | |
314 | } | |
315 | ||
b32b8144 | 316 | bool scheduler::stopped() const |
7c673cae FG |
317 | { |
318 | mutex::scoped_lock lock(mutex_); | |
319 | return stopped_; | |
320 | } | |
321 | ||
b32b8144 | 322 | void scheduler::restart() |
7c673cae FG |
323 | { |
324 | mutex::scoped_lock lock(mutex_); | |
325 | stopped_ = false; | |
326 | } | |
327 | ||
b32b8144 FG |
328 | void scheduler::compensating_work_started() |
329 | { | |
330 | thread_info_base* this_thread = thread_call_stack::contains(this); | |
331 | ++static_cast<thread_info*>(this_thread)->private_outstanding_work; | |
332 | } | |
333 | ||
1e59de90 TL |
334 | bool scheduler::can_dispatch() |
335 | { | |
336 | return thread_call_stack::contains(this) != 0; | |
337 | } | |
338 | ||
20effc67 TL |
339 | void scheduler::capture_current_exception() |
340 | { | |
341 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) | |
342 | this_thread->capture_current_exception(); | |
343 | } | |
344 | ||
b32b8144 FG |
345 | void scheduler::post_immediate_completion( |
346 | scheduler::operation* op, bool is_continuation) | |
7c673cae FG |
347 | { |
348 | #if defined(BOOST_ASIO_HAS_THREADS) | |
349 | if (one_thread_ || is_continuation) | |
350 | { | |
b32b8144 | 351 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) |
7c673cae | 352 | { |
b32b8144 FG |
353 | ++static_cast<thread_info*>(this_thread)->private_outstanding_work; |
354 | static_cast<thread_info*>(this_thread)->private_op_queue.push(op); | |
7c673cae FG |
355 | return; |
356 | } | |
357 | } | |
358 | #else // defined(BOOST_ASIO_HAS_THREADS) | |
359 | (void)is_continuation; | |
360 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
361 | ||
362 | work_started(); | |
363 | mutex::scoped_lock lock(mutex_); | |
364 | op_queue_.push(op); | |
365 | wake_one_thread_and_unlock(lock); | |
366 | } | |
367 | ||
20effc67 TL |
368 | void scheduler::post_immediate_completions(std::size_t n, |
369 | op_queue<scheduler::operation>& ops, bool is_continuation) | |
370 | { | |
371 | #if defined(BOOST_ASIO_HAS_THREADS) | |
372 | if (one_thread_ || is_continuation) | |
373 | { | |
374 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) | |
375 | { | |
376 | static_cast<thread_info*>(this_thread)->private_outstanding_work | |
377 | += static_cast<long>(n); | |
378 | static_cast<thread_info*>(this_thread)->private_op_queue.push(ops); | |
379 | return; | |
380 | } | |
381 | } | |
382 | #else // defined(BOOST_ASIO_HAS_THREADS) | |
383 | (void)is_continuation; | |
384 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
385 | ||
386 | increment(outstanding_work_, static_cast<long>(n)); | |
387 | mutex::scoped_lock lock(mutex_); | |
388 | op_queue_.push(ops); | |
389 | wake_one_thread_and_unlock(lock); | |
390 | } | |
391 | ||
b32b8144 | 392 | void scheduler::post_deferred_completion(scheduler::operation* op) |
7c673cae FG |
393 | { |
394 | #if defined(BOOST_ASIO_HAS_THREADS) | |
395 | if (one_thread_) | |
396 | { | |
b32b8144 | 397 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) |
7c673cae | 398 | { |
b32b8144 | 399 | static_cast<thread_info*>(this_thread)->private_op_queue.push(op); |
7c673cae FG |
400 | return; |
401 | } | |
402 | } | |
403 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
404 | ||
405 | mutex::scoped_lock lock(mutex_); | |
406 | op_queue_.push(op); | |
407 | wake_one_thread_and_unlock(lock); | |
408 | } | |
409 | ||
b32b8144 FG |
410 | void scheduler::post_deferred_completions( |
411 | op_queue<scheduler::operation>& ops) | |
7c673cae FG |
412 | { |
413 | if (!ops.empty()) | |
414 | { | |
415 | #if defined(BOOST_ASIO_HAS_THREADS) | |
416 | if (one_thread_) | |
417 | { | |
b32b8144 | 418 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) |
7c673cae | 419 | { |
b32b8144 | 420 | static_cast<thread_info*>(this_thread)->private_op_queue.push(ops); |
7c673cae FG |
421 | return; |
422 | } | |
423 | } | |
424 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
425 | ||
426 | mutex::scoped_lock lock(mutex_); | |
427 | op_queue_.push(ops); | |
428 | wake_one_thread_and_unlock(lock); | |
429 | } | |
430 | } | |
431 | ||
b32b8144 FG |
432 | void scheduler::do_dispatch( |
433 | scheduler::operation* op) | |
7c673cae FG |
434 | { |
435 | work_started(); | |
436 | mutex::scoped_lock lock(mutex_); | |
437 | op_queue_.push(op); | |
438 | wake_one_thread_and_unlock(lock); | |
439 | } | |
440 | ||
b32b8144 FG |
441 | void scheduler::abandon_operations( |
442 | op_queue<scheduler::operation>& ops) | |
7c673cae | 443 | { |
b32b8144 | 444 | op_queue<scheduler::operation> ops2; |
7c673cae FG |
445 | ops2.push(ops); |
446 | } | |
447 | ||
b32b8144 FG |
448 | std::size_t scheduler::do_run_one(mutex::scoped_lock& lock, |
449 | scheduler::thread_info& this_thread, | |
7c673cae FG |
450 | const boost::system::error_code& ec) |
451 | { | |
452 | while (!stopped_) | |
453 | { | |
454 | if (!op_queue_.empty()) | |
455 | { | |
456 | // Prepare to execute first handler from queue. | |
457 | operation* o = op_queue_.front(); | |
458 | op_queue_.pop(); | |
459 | bool more_handlers = (!op_queue_.empty()); | |
460 | ||
461 | if (o == &task_operation_) | |
462 | { | |
463 | task_interrupted_ = more_handlers; | |
464 | ||
465 | if (more_handlers && !one_thread_) | |
466 | wakeup_event_.unlock_and_signal_one(lock); | |
467 | else | |
468 | lock.unlock(); | |
469 | ||
470 | task_cleanup on_exit = { this, &lock, &this_thread }; | |
471 | (void)on_exit; | |
472 | ||
473 | // Run the task. May throw an exception. Only block if the operation | |
474 | // queue is empty and we're not polling, otherwise we want to return | |
475 | // as soon as possible. | |
b32b8144 | 476 | task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue); |
7c673cae FG |
477 | } |
478 | else | |
479 | { | |
480 | std::size_t task_result = o->task_result_; | |
481 | ||
482 | if (more_handlers && !one_thread_) | |
483 | wake_one_thread_and_unlock(lock); | |
484 | else | |
485 | lock.unlock(); | |
486 | ||
487 | // Ensure the count of outstanding work is decremented on block exit. | |
488 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
489 | (void)on_exit; | |
490 | ||
491 | // Complete the operation. May throw an exception. Deletes the object. | |
b32b8144 | 492 | o->complete(this, ec, task_result); |
20effc67 | 493 | this_thread.rethrow_pending_exception(); |
7c673cae FG |
494 | |
495 | return 1; | |
496 | } | |
497 | } | |
498 | else | |
499 | { | |
500 | wakeup_event_.clear(lock); | |
501 | wakeup_event_.wait(lock); | |
502 | } | |
503 | } | |
504 | ||
505 | return 0; | |
506 | } | |
507 | ||
b32b8144 FG |
508 | std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock, |
509 | scheduler::thread_info& this_thread, long usec, | |
510 | const boost::system::error_code& ec) | |
511 | { | |
512 | if (stopped_) | |
513 | return 0; | |
514 | ||
515 | operation* o = op_queue_.front(); | |
516 | if (o == 0) | |
517 | { | |
518 | wakeup_event_.clear(lock); | |
519 | wakeup_event_.wait_for_usec(lock, usec); | |
520 | usec = 0; // Wait at most once. | |
521 | o = op_queue_.front(); | |
522 | } | |
523 | ||
524 | if (o == &task_operation_) | |
525 | { | |
526 | op_queue_.pop(); | |
527 | bool more_handlers = (!op_queue_.empty()); | |
528 | ||
529 | task_interrupted_ = more_handlers; | |
530 | ||
531 | if (more_handlers && !one_thread_) | |
532 | wakeup_event_.unlock_and_signal_one(lock); | |
533 | else | |
534 | lock.unlock(); | |
535 | ||
536 | { | |
537 | task_cleanup on_exit = { this, &lock, &this_thread }; | |
538 | (void)on_exit; | |
539 | ||
540 | // Run the task. May throw an exception. Only block if the operation | |
541 | // queue is empty and we're not polling, otherwise we want to return | |
542 | // as soon as possible. | |
543 | task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue); | |
544 | } | |
545 | ||
546 | o = op_queue_.front(); | |
547 | if (o == &task_operation_) | |
548 | { | |
549 | if (!one_thread_) | |
550 | wakeup_event_.maybe_unlock_and_signal_one(lock); | |
551 | return 0; | |
552 | } | |
553 | } | |
554 | ||
555 | if (o == 0) | |
556 | return 0; | |
557 | ||
558 | op_queue_.pop(); | |
559 | bool more_handlers = (!op_queue_.empty()); | |
560 | ||
561 | std::size_t task_result = o->task_result_; | |
562 | ||
563 | if (more_handlers && !one_thread_) | |
564 | wake_one_thread_and_unlock(lock); | |
565 | else | |
566 | lock.unlock(); | |
567 | ||
568 | // Ensure the count of outstanding work is decremented on block exit. | |
569 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
570 | (void)on_exit; | |
571 | ||
572 | // Complete the operation. May throw an exception. Deletes the object. | |
573 | o->complete(this, ec, task_result); | |
20effc67 | 574 | this_thread.rethrow_pending_exception(); |
b32b8144 FG |
575 | |
576 | return 1; | |
577 | } | |
578 | ||
579 | std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock, | |
580 | scheduler::thread_info& this_thread, | |
7c673cae FG |
581 | const boost::system::error_code& ec) |
582 | { | |
583 | if (stopped_) | |
584 | return 0; | |
585 | ||
586 | operation* o = op_queue_.front(); | |
587 | if (o == &task_operation_) | |
588 | { | |
589 | op_queue_.pop(); | |
590 | lock.unlock(); | |
591 | ||
592 | { | |
593 | task_cleanup c = { this, &lock, &this_thread }; | |
594 | (void)c; | |
595 | ||
596 | // Run the task. May throw an exception. Only block if the operation | |
597 | // queue is empty and we're not polling, otherwise we want to return | |
598 | // as soon as possible. | |
b32b8144 | 599 | task_->run(0, this_thread.private_op_queue); |
7c673cae FG |
600 | } |
601 | ||
602 | o = op_queue_.front(); | |
603 | if (o == &task_operation_) | |
604 | { | |
605 | wakeup_event_.maybe_unlock_and_signal_one(lock); | |
606 | return 0; | |
607 | } | |
608 | } | |
609 | ||
610 | if (o == 0) | |
611 | return 0; | |
612 | ||
613 | op_queue_.pop(); | |
614 | bool more_handlers = (!op_queue_.empty()); | |
615 | ||
616 | std::size_t task_result = o->task_result_; | |
617 | ||
618 | if (more_handlers && !one_thread_) | |
619 | wake_one_thread_and_unlock(lock); | |
620 | else | |
621 | lock.unlock(); | |
622 | ||
623 | // Ensure the count of outstanding work is decremented on block exit. | |
624 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
625 | (void)on_exit; | |
626 | ||
627 | // Complete the operation. May throw an exception. Deletes the object. | |
b32b8144 | 628 | o->complete(this, ec, task_result); |
20effc67 | 629 | this_thread.rethrow_pending_exception(); |
7c673cae FG |
630 | |
631 | return 1; | |
632 | } | |
633 | ||
b32b8144 | 634 | void scheduler::stop_all_threads( |
7c673cae FG |
635 | mutex::scoped_lock& lock) |
636 | { | |
637 | stopped_ = true; | |
638 | wakeup_event_.signal_all(lock); | |
639 | ||
640 | if (!task_interrupted_ && task_) | |
641 | { | |
642 | task_interrupted_ = true; | |
643 | task_->interrupt(); | |
644 | } | |
645 | } | |
646 | ||
b32b8144 | 647 | void scheduler::wake_one_thread_and_unlock( |
7c673cae FG |
648 | mutex::scoped_lock& lock) |
649 | { | |
650 | if (!wakeup_event_.maybe_unlock_and_signal_one(lock)) | |
651 | { | |
652 | if (!task_interrupted_ && task_) | |
653 | { | |
654 | task_interrupted_ = true; | |
655 | task_->interrupt(); | |
656 | } | |
657 | lock.unlock(); | |
658 | } | |
659 | } | |
660 | ||
1e59de90 TL |
661 | scheduler_task* scheduler::get_default_task(boost::asio::execution_context& ctx) |
662 | { | |
663 | #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT) | |
664 | return &use_service<io_uring_service>(ctx); | |
665 | #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT) | |
666 | return &use_service<reactor>(ctx); | |
667 | #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT) | |
668 | } | |
669 | ||
7c673cae FG |
670 | } // namespace detail |
671 | } // namespace asio | |
672 | } // namespace boost | |
673 | ||
674 | #include <boost/asio/detail/pop_options.hpp> | |
675 | ||
b32b8144 | 676 | #endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP |