]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // |
b32b8144 FG |
2 | // detail/impl/scheduler.ipp |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~ | |
7c673cae | 4 | // |
11fdf7f2 | 5 | // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
7c673cae FG |
6 | // |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
b32b8144 FG |
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP |
12 | #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP | |
7c673cae FG |
13 | |
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
b32b8144 | 20 | #include <boost/asio/detail/concurrency_hint.hpp> |
7c673cae FG |
21 | #include <boost/asio/detail/event.hpp> |
22 | #include <boost/asio/detail/limits.hpp> | |
23 | #include <boost/asio/detail/reactor.hpp> | |
b32b8144 FG |
24 | #include <boost/asio/detail/scheduler.hpp> |
25 | #include <boost/asio/detail/scheduler_thread_info.hpp> | |
7c673cae FG |
26 | |
27 | #include <boost/asio/detail/push_options.hpp> | |
28 | ||
29 | namespace boost { | |
30 | namespace asio { | |
31 | namespace detail { | |
32 | ||
b32b8144 | 33 | struct scheduler::task_cleanup |
7c673cae FG |
34 | { |
35 | ~task_cleanup() | |
36 | { | |
37 | if (this_thread_->private_outstanding_work > 0) | |
38 | { | |
39 | boost::asio::detail::increment( | |
b32b8144 | 40 | scheduler_->outstanding_work_, |
7c673cae FG |
41 | this_thread_->private_outstanding_work); |
42 | } | |
43 | this_thread_->private_outstanding_work = 0; | |
44 | ||
45 | // Enqueue the completed operations and reinsert the task at the end of | |
46 | // the operation queue. | |
47 | lock_->lock(); | |
b32b8144 FG |
48 | scheduler_->task_interrupted_ = true; |
49 | scheduler_->op_queue_.push(this_thread_->private_op_queue); | |
50 | scheduler_->op_queue_.push(&scheduler_->task_operation_); | |
7c673cae FG |
51 | } |
52 | ||
b32b8144 | 53 | scheduler* scheduler_; |
7c673cae FG |
54 | mutex::scoped_lock* lock_; |
55 | thread_info* this_thread_; | |
56 | }; | |
57 | ||
b32b8144 | 58 | struct scheduler::work_cleanup |
7c673cae FG |
59 | { |
60 | ~work_cleanup() | |
61 | { | |
62 | if (this_thread_->private_outstanding_work > 1) | |
63 | { | |
64 | boost::asio::detail::increment( | |
b32b8144 | 65 | scheduler_->outstanding_work_, |
7c673cae FG |
66 | this_thread_->private_outstanding_work - 1); |
67 | } | |
68 | else if (this_thread_->private_outstanding_work < 1) | |
69 | { | |
b32b8144 | 70 | scheduler_->work_finished(); |
7c673cae FG |
71 | } |
72 | this_thread_->private_outstanding_work = 0; | |
73 | ||
74 | #if defined(BOOST_ASIO_HAS_THREADS) | |
75 | if (!this_thread_->private_op_queue.empty()) | |
76 | { | |
77 | lock_->lock(); | |
b32b8144 | 78 | scheduler_->op_queue_.push(this_thread_->private_op_queue); |
7c673cae FG |
79 | } |
80 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
81 | } | |
82 | ||
b32b8144 | 83 | scheduler* scheduler_; |
7c673cae FG |
84 | mutex::scoped_lock* lock_; |
85 | thread_info* this_thread_; | |
86 | }; | |
87 | ||
b32b8144 FG |
88 | scheduler::scheduler( |
89 | boost::asio::execution_context& ctx, int concurrency_hint) | |
90 | : boost::asio::detail::execution_context_service_base<scheduler>(ctx), | |
91 | one_thread_(concurrency_hint == 1 | |
92 | || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( | |
93 | SCHEDULER, concurrency_hint) | |
94 | || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( | |
95 | REACTOR_IO, concurrency_hint)), | |
96 | mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING( | |
97 | SCHEDULER, concurrency_hint)), | |
7c673cae FG |
98 | task_(0), |
99 | task_interrupted_(true), | |
100 | outstanding_work_(0), | |
101 | stopped_(false), | |
b32b8144 FG |
102 | shutdown_(false), |
103 | concurrency_hint_(concurrency_hint) | |
7c673cae FG |
104 | { |
105 | BOOST_ASIO_HANDLER_TRACKING_INIT; | |
106 | } | |
107 | ||
b32b8144 | 108 | void scheduler::shutdown() |
7c673cae FG |
109 | { |
110 | mutex::scoped_lock lock(mutex_); | |
111 | shutdown_ = true; | |
112 | lock.unlock(); | |
113 | ||
114 | // Destroy handler objects. | |
115 | while (!op_queue_.empty()) | |
116 | { | |
117 | operation* o = op_queue_.front(); | |
118 | op_queue_.pop(); | |
119 | if (o != &task_operation_) | |
120 | o->destroy(); | |
121 | } | |
122 | ||
123 | // Reset to initial state. | |
124 | task_ = 0; | |
125 | } | |
126 | ||
b32b8144 | 127 | void scheduler::init_task() |
7c673cae FG |
128 | { |
129 | mutex::scoped_lock lock(mutex_); | |
130 | if (!shutdown_ && !task_) | |
131 | { | |
b32b8144 | 132 | task_ = &use_service<reactor>(this->context()); |
7c673cae FG |
133 | op_queue_.push(&task_operation_); |
134 | wake_one_thread_and_unlock(lock); | |
135 | } | |
136 | } | |
137 | ||
b32b8144 | 138 | std::size_t scheduler::run(boost::system::error_code& ec) |
7c673cae FG |
139 | { |
140 | ec = boost::system::error_code(); | |
141 | if (outstanding_work_ == 0) | |
142 | { | |
143 | stop(); | |
144 | return 0; | |
145 | } | |
146 | ||
147 | thread_info this_thread; | |
148 | this_thread.private_outstanding_work = 0; | |
149 | thread_call_stack::context ctx(this, this_thread); | |
150 | ||
151 | mutex::scoped_lock lock(mutex_); | |
152 | ||
153 | std::size_t n = 0; | |
154 | for (; do_run_one(lock, this_thread, ec); lock.lock()) | |
155 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
156 | ++n; | |
157 | return n; | |
158 | } | |
159 | ||
b32b8144 | 160 | std::size_t scheduler::run_one(boost::system::error_code& ec) |
7c673cae FG |
161 | { |
162 | ec = boost::system::error_code(); | |
163 | if (outstanding_work_ == 0) | |
164 | { | |
165 | stop(); | |
166 | return 0; | |
167 | } | |
168 | ||
169 | thread_info this_thread; | |
170 | this_thread.private_outstanding_work = 0; | |
171 | thread_call_stack::context ctx(this, this_thread); | |
172 | ||
173 | mutex::scoped_lock lock(mutex_); | |
174 | ||
175 | return do_run_one(lock, this_thread, ec); | |
176 | } | |
177 | ||
b32b8144 FG |
178 | std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec) |
179 | { | |
180 | ec = boost::system::error_code(); | |
181 | if (outstanding_work_ == 0) | |
182 | { | |
183 | stop(); | |
184 | return 0; | |
185 | } | |
186 | ||
187 | thread_info this_thread; | |
188 | this_thread.private_outstanding_work = 0; | |
189 | thread_call_stack::context ctx(this, this_thread); | |
190 | ||
191 | mutex::scoped_lock lock(mutex_); | |
192 | ||
193 | return do_wait_one(lock, this_thread, usec, ec); | |
194 | } | |
195 | ||
196 | std::size_t scheduler::poll(boost::system::error_code& ec) | |
7c673cae FG |
197 | { |
198 | ec = boost::system::error_code(); | |
199 | if (outstanding_work_ == 0) | |
200 | { | |
201 | stop(); | |
202 | return 0; | |
203 | } | |
204 | ||
205 | thread_info this_thread; | |
206 | this_thread.private_outstanding_work = 0; | |
207 | thread_call_stack::context ctx(this, this_thread); | |
208 | ||
209 | mutex::scoped_lock lock(mutex_); | |
210 | ||
211 | #if defined(BOOST_ASIO_HAS_THREADS) | |
212 | // We want to support nested calls to poll() and poll_one(), so any handlers | |
213 | // that are already on a thread-private queue need to be put on to the main | |
214 | // queue now. | |
215 | if (one_thread_) | |
b32b8144 FG |
216 | if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key())) |
217 | op_queue_.push(outer_info->private_op_queue); | |
7c673cae FG |
218 | #endif // defined(BOOST_ASIO_HAS_THREADS) |
219 | ||
220 | std::size_t n = 0; | |
221 | for (; do_poll_one(lock, this_thread, ec); lock.lock()) | |
222 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
223 | ++n; | |
224 | return n; | |
225 | } | |
226 | ||
b32b8144 | 227 | std::size_t scheduler::poll_one(boost::system::error_code& ec) |
7c673cae FG |
228 | { |
229 | ec = boost::system::error_code(); | |
230 | if (outstanding_work_ == 0) | |
231 | { | |
232 | stop(); | |
233 | return 0; | |
234 | } | |
235 | ||
236 | thread_info this_thread; | |
237 | this_thread.private_outstanding_work = 0; | |
238 | thread_call_stack::context ctx(this, this_thread); | |
239 | ||
240 | mutex::scoped_lock lock(mutex_); | |
241 | ||
242 | #if defined(BOOST_ASIO_HAS_THREADS) | |
243 | // We want to support nested calls to poll() and poll_one(), so any handlers | |
244 | // that are already on a thread-private queue need to be put on to the main | |
245 | // queue now. | |
246 | if (one_thread_) | |
b32b8144 FG |
247 | if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key())) |
248 | op_queue_.push(outer_info->private_op_queue); | |
7c673cae FG |
249 | #endif // defined(BOOST_ASIO_HAS_THREADS) |
250 | ||
251 | return do_poll_one(lock, this_thread, ec); | |
252 | } | |
253 | ||
b32b8144 | 254 | void scheduler::stop() |
7c673cae FG |
255 | { |
256 | mutex::scoped_lock lock(mutex_); | |
257 | stop_all_threads(lock); | |
258 | } | |
259 | ||
b32b8144 | 260 | bool scheduler::stopped() const |
7c673cae FG |
261 | { |
262 | mutex::scoped_lock lock(mutex_); | |
263 | return stopped_; | |
264 | } | |
265 | ||
b32b8144 | 266 | void scheduler::restart() |
7c673cae FG |
267 | { |
268 | mutex::scoped_lock lock(mutex_); | |
269 | stopped_ = false; | |
270 | } | |
271 | ||
b32b8144 FG |
272 | void scheduler::compensating_work_started() |
273 | { | |
274 | thread_info_base* this_thread = thread_call_stack::contains(this); | |
275 | ++static_cast<thread_info*>(this_thread)->private_outstanding_work; | |
276 | } | |
277 | ||
278 | void scheduler::post_immediate_completion( | |
279 | scheduler::operation* op, bool is_continuation) | |
7c673cae FG |
280 | { |
281 | #if defined(BOOST_ASIO_HAS_THREADS) | |
282 | if (one_thread_ || is_continuation) | |
283 | { | |
b32b8144 | 284 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) |
7c673cae | 285 | { |
b32b8144 FG |
286 | ++static_cast<thread_info*>(this_thread)->private_outstanding_work; |
287 | static_cast<thread_info*>(this_thread)->private_op_queue.push(op); | |
7c673cae FG |
288 | return; |
289 | } | |
290 | } | |
291 | #else // defined(BOOST_ASIO_HAS_THREADS) | |
292 | (void)is_continuation; | |
293 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
294 | ||
295 | work_started(); | |
296 | mutex::scoped_lock lock(mutex_); | |
297 | op_queue_.push(op); | |
298 | wake_one_thread_and_unlock(lock); | |
299 | } | |
300 | ||
b32b8144 | 301 | void scheduler::post_deferred_completion(scheduler::operation* op) |
7c673cae FG |
302 | { |
303 | #if defined(BOOST_ASIO_HAS_THREADS) | |
304 | if (one_thread_) | |
305 | { | |
b32b8144 | 306 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) |
7c673cae | 307 | { |
b32b8144 | 308 | static_cast<thread_info*>(this_thread)->private_op_queue.push(op); |
7c673cae FG |
309 | return; |
310 | } | |
311 | } | |
312 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
313 | ||
314 | mutex::scoped_lock lock(mutex_); | |
315 | op_queue_.push(op); | |
316 | wake_one_thread_and_unlock(lock); | |
317 | } | |
318 | ||
b32b8144 FG |
319 | void scheduler::post_deferred_completions( |
320 | op_queue<scheduler::operation>& ops) | |
7c673cae FG |
321 | { |
322 | if (!ops.empty()) | |
323 | { | |
324 | #if defined(BOOST_ASIO_HAS_THREADS) | |
325 | if (one_thread_) | |
326 | { | |
b32b8144 | 327 | if (thread_info_base* this_thread = thread_call_stack::contains(this)) |
7c673cae | 328 | { |
b32b8144 | 329 | static_cast<thread_info*>(this_thread)->private_op_queue.push(ops); |
7c673cae FG |
330 | return; |
331 | } | |
332 | } | |
333 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
334 | ||
335 | mutex::scoped_lock lock(mutex_); | |
336 | op_queue_.push(ops); | |
337 | wake_one_thread_and_unlock(lock); | |
338 | } | |
339 | } | |
340 | ||
b32b8144 FG |
341 | void scheduler::do_dispatch( |
342 | scheduler::operation* op) | |
7c673cae FG |
343 | { |
344 | work_started(); | |
345 | mutex::scoped_lock lock(mutex_); | |
346 | op_queue_.push(op); | |
347 | wake_one_thread_and_unlock(lock); | |
348 | } | |
349 | ||
b32b8144 FG |
350 | void scheduler::abandon_operations( |
351 | op_queue<scheduler::operation>& ops) | |
7c673cae | 352 | { |
b32b8144 | 353 | op_queue<scheduler::operation> ops2; |
7c673cae FG |
354 | ops2.push(ops); |
355 | } | |
356 | ||
b32b8144 FG |
357 | std::size_t scheduler::do_run_one(mutex::scoped_lock& lock, |
358 | scheduler::thread_info& this_thread, | |
7c673cae FG |
359 | const boost::system::error_code& ec) |
360 | { | |
361 | while (!stopped_) | |
362 | { | |
363 | if (!op_queue_.empty()) | |
364 | { | |
365 | // Prepare to execute first handler from queue. | |
366 | operation* o = op_queue_.front(); | |
367 | op_queue_.pop(); | |
368 | bool more_handlers = (!op_queue_.empty()); | |
369 | ||
370 | if (o == &task_operation_) | |
371 | { | |
372 | task_interrupted_ = more_handlers; | |
373 | ||
374 | if (more_handlers && !one_thread_) | |
375 | wakeup_event_.unlock_and_signal_one(lock); | |
376 | else | |
377 | lock.unlock(); | |
378 | ||
379 | task_cleanup on_exit = { this, &lock, &this_thread }; | |
380 | (void)on_exit; | |
381 | ||
382 | // Run the task. May throw an exception. Only block if the operation | |
383 | // queue is empty and we're not polling, otherwise we want to return | |
384 | // as soon as possible. | |
b32b8144 | 385 | task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue); |
7c673cae FG |
386 | } |
387 | else | |
388 | { | |
389 | std::size_t task_result = o->task_result_; | |
390 | ||
391 | if (more_handlers && !one_thread_) | |
392 | wake_one_thread_and_unlock(lock); | |
393 | else | |
394 | lock.unlock(); | |
395 | ||
396 | // Ensure the count of outstanding work is decremented on block exit. | |
397 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
398 | (void)on_exit; | |
399 | ||
400 | // Complete the operation. May throw an exception. Deletes the object. | |
b32b8144 | 401 | o->complete(this, ec, task_result); |
7c673cae FG |
402 | |
403 | return 1; | |
404 | } | |
405 | } | |
406 | else | |
407 | { | |
408 | wakeup_event_.clear(lock); | |
409 | wakeup_event_.wait(lock); | |
410 | } | |
411 | } | |
412 | ||
413 | return 0; | |
414 | } | |
415 | ||
b32b8144 FG |
416 | std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock, |
417 | scheduler::thread_info& this_thread, long usec, | |
418 | const boost::system::error_code& ec) | |
419 | { | |
420 | if (stopped_) | |
421 | return 0; | |
422 | ||
423 | operation* o = op_queue_.front(); | |
424 | if (o == 0) | |
425 | { | |
426 | wakeup_event_.clear(lock); | |
427 | wakeup_event_.wait_for_usec(lock, usec); | |
428 | usec = 0; // Wait at most once. | |
429 | o = op_queue_.front(); | |
430 | } | |
431 | ||
432 | if (o == &task_operation_) | |
433 | { | |
434 | op_queue_.pop(); | |
435 | bool more_handlers = (!op_queue_.empty()); | |
436 | ||
437 | task_interrupted_ = more_handlers; | |
438 | ||
439 | if (more_handlers && !one_thread_) | |
440 | wakeup_event_.unlock_and_signal_one(lock); | |
441 | else | |
442 | lock.unlock(); | |
443 | ||
444 | { | |
445 | task_cleanup on_exit = { this, &lock, &this_thread }; | |
446 | (void)on_exit; | |
447 | ||
448 | // Run the task. May throw an exception. Only block if the operation | |
449 | // queue is empty and we're not polling, otherwise we want to return | |
450 | // as soon as possible. | |
451 | task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue); | |
452 | } | |
453 | ||
454 | o = op_queue_.front(); | |
455 | if (o == &task_operation_) | |
456 | { | |
457 | if (!one_thread_) | |
458 | wakeup_event_.maybe_unlock_and_signal_one(lock); | |
459 | return 0; | |
460 | } | |
461 | } | |
462 | ||
463 | if (o == 0) | |
464 | return 0; | |
465 | ||
466 | op_queue_.pop(); | |
467 | bool more_handlers = (!op_queue_.empty()); | |
468 | ||
469 | std::size_t task_result = o->task_result_; | |
470 | ||
471 | if (more_handlers && !one_thread_) | |
472 | wake_one_thread_and_unlock(lock); | |
473 | else | |
474 | lock.unlock(); | |
475 | ||
476 | // Ensure the count of outstanding work is decremented on block exit. | |
477 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
478 | (void)on_exit; | |
479 | ||
480 | // Complete the operation. May throw an exception. Deletes the object. | |
481 | o->complete(this, ec, task_result); | |
482 | ||
483 | return 1; | |
484 | } | |
485 | ||
486 | std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock, | |
487 | scheduler::thread_info& this_thread, | |
7c673cae FG |
488 | const boost::system::error_code& ec) |
489 | { | |
490 | if (stopped_) | |
491 | return 0; | |
492 | ||
493 | operation* o = op_queue_.front(); | |
494 | if (o == &task_operation_) | |
495 | { | |
496 | op_queue_.pop(); | |
497 | lock.unlock(); | |
498 | ||
499 | { | |
500 | task_cleanup c = { this, &lock, &this_thread }; | |
501 | (void)c; | |
502 | ||
503 | // Run the task. May throw an exception. Only block if the operation | |
504 | // queue is empty and we're not polling, otherwise we want to return | |
505 | // as soon as possible. | |
b32b8144 | 506 | task_->run(0, this_thread.private_op_queue); |
7c673cae FG |
507 | } |
508 | ||
509 | o = op_queue_.front(); | |
510 | if (o == &task_operation_) | |
511 | { | |
512 | wakeup_event_.maybe_unlock_and_signal_one(lock); | |
513 | return 0; | |
514 | } | |
515 | } | |
516 | ||
517 | if (o == 0) | |
518 | return 0; | |
519 | ||
520 | op_queue_.pop(); | |
521 | bool more_handlers = (!op_queue_.empty()); | |
522 | ||
523 | std::size_t task_result = o->task_result_; | |
524 | ||
525 | if (more_handlers && !one_thread_) | |
526 | wake_one_thread_and_unlock(lock); | |
527 | else | |
528 | lock.unlock(); | |
529 | ||
530 | // Ensure the count of outstanding work is decremented on block exit. | |
531 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
532 | (void)on_exit; | |
533 | ||
534 | // Complete the operation. May throw an exception. Deletes the object. | |
b32b8144 | 535 | o->complete(this, ec, task_result); |
7c673cae FG |
536 | |
537 | return 1; | |
538 | } | |
539 | ||
b32b8144 | 540 | void scheduler::stop_all_threads( |
7c673cae FG |
541 | mutex::scoped_lock& lock) |
542 | { | |
543 | stopped_ = true; | |
544 | wakeup_event_.signal_all(lock); | |
545 | ||
546 | if (!task_interrupted_ && task_) | |
547 | { | |
548 | task_interrupted_ = true; | |
549 | task_->interrupt(); | |
550 | } | |
551 | } | |
552 | ||
b32b8144 | 553 | void scheduler::wake_one_thread_and_unlock( |
7c673cae FG |
554 | mutex::scoped_lock& lock) |
555 | { | |
556 | if (!wakeup_event_.maybe_unlock_and_signal_one(lock)) | |
557 | { | |
558 | if (!task_interrupted_ && task_) | |
559 | { | |
560 | task_interrupted_ = true; | |
561 | task_->interrupt(); | |
562 | } | |
563 | lock.unlock(); | |
564 | } | |
565 | } | |
566 | ||
567 | } // namespace detail | |
568 | } // namespace asio | |
569 | } // namespace boost | |
570 | ||
571 | #include <boost/asio/detail/pop_options.hpp> | |
572 | ||
b32b8144 | 573 | #endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP |