]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // detail/impl/task_io_service.ipp | |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
4 | // | |
5 | // Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
6 | // | |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP | |
12 | #define BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP | |
13 | ||
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
15 | # pragma once | |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
17 | ||
18 | #include <boost/asio/detail/config.hpp> | |
19 | ||
20 | #if !defined(BOOST_ASIO_HAS_IOCP) | |
21 | ||
22 | #include <boost/asio/detail/event.hpp> | |
23 | #include <boost/asio/detail/limits.hpp> | |
24 | #include <boost/asio/detail/reactor.hpp> | |
25 | #include <boost/asio/detail/task_io_service.hpp> | |
26 | #include <boost/asio/detail/task_io_service_thread_info.hpp> | |
27 | ||
28 | #include <boost/asio/detail/push_options.hpp> | |
29 | ||
30 | namespace boost { | |
31 | namespace asio { | |
32 | namespace detail { | |
33 | ||
34 | struct task_io_service::task_cleanup | |
35 | { | |
36 | ~task_cleanup() | |
37 | { | |
38 | if (this_thread_->private_outstanding_work > 0) | |
39 | { | |
40 | boost::asio::detail::increment( | |
41 | task_io_service_->outstanding_work_, | |
42 | this_thread_->private_outstanding_work); | |
43 | } | |
44 | this_thread_->private_outstanding_work = 0; | |
45 | ||
46 | // Enqueue the completed operations and reinsert the task at the end of | |
47 | // the operation queue. | |
48 | lock_->lock(); | |
49 | task_io_service_->task_interrupted_ = true; | |
50 | task_io_service_->op_queue_.push(this_thread_->private_op_queue); | |
51 | task_io_service_->op_queue_.push(&task_io_service_->task_operation_); | |
52 | } | |
53 | ||
54 | task_io_service* task_io_service_; | |
55 | mutex::scoped_lock* lock_; | |
56 | thread_info* this_thread_; | |
57 | }; | |
58 | ||
59 | struct task_io_service::work_cleanup | |
60 | { | |
61 | ~work_cleanup() | |
62 | { | |
63 | if (this_thread_->private_outstanding_work > 1) | |
64 | { | |
65 | boost::asio::detail::increment( | |
66 | task_io_service_->outstanding_work_, | |
67 | this_thread_->private_outstanding_work - 1); | |
68 | } | |
69 | else if (this_thread_->private_outstanding_work < 1) | |
70 | { | |
71 | task_io_service_->work_finished(); | |
72 | } | |
73 | this_thread_->private_outstanding_work = 0; | |
74 | ||
75 | #if defined(BOOST_ASIO_HAS_THREADS) | |
76 | if (!this_thread_->private_op_queue.empty()) | |
77 | { | |
78 | lock_->lock(); | |
79 | task_io_service_->op_queue_.push(this_thread_->private_op_queue); | |
80 | } | |
81 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
82 | } | |
83 | ||
84 | task_io_service* task_io_service_; | |
85 | mutex::scoped_lock* lock_; | |
86 | thread_info* this_thread_; | |
87 | }; | |
88 | ||
89 | task_io_service::task_io_service( | |
90 | boost::asio::io_service& io_service, std::size_t concurrency_hint) | |
91 | : boost::asio::detail::service_base<task_io_service>(io_service), | |
92 | one_thread_(concurrency_hint == 1), | |
93 | mutex_(), | |
94 | task_(0), | |
95 | task_interrupted_(true), | |
96 | outstanding_work_(0), | |
97 | stopped_(false), | |
98 | shutdown_(false) | |
99 | { | |
100 | BOOST_ASIO_HANDLER_TRACKING_INIT; | |
101 | } | |
102 | ||
103 | void task_io_service::shutdown_service() | |
104 | { | |
105 | mutex::scoped_lock lock(mutex_); | |
106 | shutdown_ = true; | |
107 | lock.unlock(); | |
108 | ||
109 | // Destroy handler objects. | |
110 | while (!op_queue_.empty()) | |
111 | { | |
112 | operation* o = op_queue_.front(); | |
113 | op_queue_.pop(); | |
114 | if (o != &task_operation_) | |
115 | o->destroy(); | |
116 | } | |
117 | ||
118 | // Reset to initial state. | |
119 | task_ = 0; | |
120 | } | |
121 | ||
122 | void task_io_service::init_task() | |
123 | { | |
124 | mutex::scoped_lock lock(mutex_); | |
125 | if (!shutdown_ && !task_) | |
126 | { | |
127 | task_ = &use_service<reactor>(this->get_io_service()); | |
128 | op_queue_.push(&task_operation_); | |
129 | wake_one_thread_and_unlock(lock); | |
130 | } | |
131 | } | |
132 | ||
133 | std::size_t task_io_service::run(boost::system::error_code& ec) | |
134 | { | |
135 | ec = boost::system::error_code(); | |
136 | if (outstanding_work_ == 0) | |
137 | { | |
138 | stop(); | |
139 | return 0; | |
140 | } | |
141 | ||
142 | thread_info this_thread; | |
143 | this_thread.private_outstanding_work = 0; | |
144 | thread_call_stack::context ctx(this, this_thread); | |
145 | ||
146 | mutex::scoped_lock lock(mutex_); | |
147 | ||
148 | std::size_t n = 0; | |
149 | for (; do_run_one(lock, this_thread, ec); lock.lock()) | |
150 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
151 | ++n; | |
152 | return n; | |
153 | } | |
154 | ||
155 | std::size_t task_io_service::run_one(boost::system::error_code& ec) | |
156 | { | |
157 | ec = boost::system::error_code(); | |
158 | if (outstanding_work_ == 0) | |
159 | { | |
160 | stop(); | |
161 | return 0; | |
162 | } | |
163 | ||
164 | thread_info this_thread; | |
165 | this_thread.private_outstanding_work = 0; | |
166 | thread_call_stack::context ctx(this, this_thread); | |
167 | ||
168 | mutex::scoped_lock lock(mutex_); | |
169 | ||
170 | return do_run_one(lock, this_thread, ec); | |
171 | } | |
172 | ||
173 | std::size_t task_io_service::poll(boost::system::error_code& ec) | |
174 | { | |
175 | ec = boost::system::error_code(); | |
176 | if (outstanding_work_ == 0) | |
177 | { | |
178 | stop(); | |
179 | return 0; | |
180 | } | |
181 | ||
182 | thread_info this_thread; | |
183 | this_thread.private_outstanding_work = 0; | |
184 | thread_call_stack::context ctx(this, this_thread); | |
185 | ||
186 | mutex::scoped_lock lock(mutex_); | |
187 | ||
188 | #if defined(BOOST_ASIO_HAS_THREADS) | |
189 | // We want to support nested calls to poll() and poll_one(), so any handlers | |
190 | // that are already on a thread-private queue need to be put on to the main | |
191 | // queue now. | |
192 | if (one_thread_) | |
193 | if (thread_info* outer_thread_info = ctx.next_by_key()) | |
194 | op_queue_.push(outer_thread_info->private_op_queue); | |
195 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
196 | ||
197 | std::size_t n = 0; | |
198 | for (; do_poll_one(lock, this_thread, ec); lock.lock()) | |
199 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
200 | ++n; | |
201 | return n; | |
202 | } | |
203 | ||
204 | std::size_t task_io_service::poll_one(boost::system::error_code& ec) | |
205 | { | |
206 | ec = boost::system::error_code(); | |
207 | if (outstanding_work_ == 0) | |
208 | { | |
209 | stop(); | |
210 | return 0; | |
211 | } | |
212 | ||
213 | thread_info this_thread; | |
214 | this_thread.private_outstanding_work = 0; | |
215 | thread_call_stack::context ctx(this, this_thread); | |
216 | ||
217 | mutex::scoped_lock lock(mutex_); | |
218 | ||
219 | #if defined(BOOST_ASIO_HAS_THREADS) | |
220 | // We want to support nested calls to poll() and poll_one(), so any handlers | |
221 | // that are already on a thread-private queue need to be put on to the main | |
222 | // queue now. | |
223 | if (one_thread_) | |
224 | if (thread_info* outer_thread_info = ctx.next_by_key()) | |
225 | op_queue_.push(outer_thread_info->private_op_queue); | |
226 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
227 | ||
228 | return do_poll_one(lock, this_thread, ec); | |
229 | } | |
230 | ||
231 | void task_io_service::stop() | |
232 | { | |
233 | mutex::scoped_lock lock(mutex_); | |
234 | stop_all_threads(lock); | |
235 | } | |
236 | ||
237 | bool task_io_service::stopped() const | |
238 | { | |
239 | mutex::scoped_lock lock(mutex_); | |
240 | return stopped_; | |
241 | } | |
242 | ||
243 | void task_io_service::reset() | |
244 | { | |
245 | mutex::scoped_lock lock(mutex_); | |
246 | stopped_ = false; | |
247 | } | |
248 | ||
249 | void task_io_service::post_immediate_completion( | |
250 | task_io_service::operation* op, bool is_continuation) | |
251 | { | |
252 | #if defined(BOOST_ASIO_HAS_THREADS) | |
253 | if (one_thread_ || is_continuation) | |
254 | { | |
255 | if (thread_info* this_thread = thread_call_stack::contains(this)) | |
256 | { | |
257 | ++this_thread->private_outstanding_work; | |
258 | this_thread->private_op_queue.push(op); | |
259 | return; | |
260 | } | |
261 | } | |
262 | #else // defined(BOOST_ASIO_HAS_THREADS) | |
263 | (void)is_continuation; | |
264 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
265 | ||
266 | work_started(); | |
267 | mutex::scoped_lock lock(mutex_); | |
268 | op_queue_.push(op); | |
269 | wake_one_thread_and_unlock(lock); | |
270 | } | |
271 | ||
272 | void task_io_service::post_deferred_completion(task_io_service::operation* op) | |
273 | { | |
274 | #if defined(BOOST_ASIO_HAS_THREADS) | |
275 | if (one_thread_) | |
276 | { | |
277 | if (thread_info* this_thread = thread_call_stack::contains(this)) | |
278 | { | |
279 | this_thread->private_op_queue.push(op); | |
280 | return; | |
281 | } | |
282 | } | |
283 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
284 | ||
285 | mutex::scoped_lock lock(mutex_); | |
286 | op_queue_.push(op); | |
287 | wake_one_thread_and_unlock(lock); | |
288 | } | |
289 | ||
290 | void task_io_service::post_deferred_completions( | |
291 | op_queue<task_io_service::operation>& ops) | |
292 | { | |
293 | if (!ops.empty()) | |
294 | { | |
295 | #if defined(BOOST_ASIO_HAS_THREADS) | |
296 | if (one_thread_) | |
297 | { | |
298 | if (thread_info* this_thread = thread_call_stack::contains(this)) | |
299 | { | |
300 | this_thread->private_op_queue.push(ops); | |
301 | return; | |
302 | } | |
303 | } | |
304 | #endif // defined(BOOST_ASIO_HAS_THREADS) | |
305 | ||
306 | mutex::scoped_lock lock(mutex_); | |
307 | op_queue_.push(ops); | |
308 | wake_one_thread_and_unlock(lock); | |
309 | } | |
310 | } | |
311 | ||
312 | void task_io_service::do_dispatch( | |
313 | task_io_service::operation* op) | |
314 | { | |
315 | work_started(); | |
316 | mutex::scoped_lock lock(mutex_); | |
317 | op_queue_.push(op); | |
318 | wake_one_thread_and_unlock(lock); | |
319 | } | |
320 | ||
321 | void task_io_service::abandon_operations( | |
322 | op_queue<task_io_service::operation>& ops) | |
323 | { | |
324 | op_queue<task_io_service::operation> ops2; | |
325 | ops2.push(ops); | |
326 | } | |
327 | ||
328 | std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, | |
329 | task_io_service::thread_info& this_thread, | |
330 | const boost::system::error_code& ec) | |
331 | { | |
332 | while (!stopped_) | |
333 | { | |
334 | if (!op_queue_.empty()) | |
335 | { | |
336 | // Prepare to execute first handler from queue. | |
337 | operation* o = op_queue_.front(); | |
338 | op_queue_.pop(); | |
339 | bool more_handlers = (!op_queue_.empty()); | |
340 | ||
341 | if (o == &task_operation_) | |
342 | { | |
343 | task_interrupted_ = more_handlers; | |
344 | ||
345 | if (more_handlers && !one_thread_) | |
346 | wakeup_event_.unlock_and_signal_one(lock); | |
347 | else | |
348 | lock.unlock(); | |
349 | ||
350 | task_cleanup on_exit = { this, &lock, &this_thread }; | |
351 | (void)on_exit; | |
352 | ||
353 | // Run the task. May throw an exception. Only block if the operation | |
354 | // queue is empty and we're not polling, otherwise we want to return | |
355 | // as soon as possible. | |
356 | task_->run(!more_handlers, this_thread.private_op_queue); | |
357 | } | |
358 | else | |
359 | { | |
360 | std::size_t task_result = o->task_result_; | |
361 | ||
362 | if (more_handlers && !one_thread_) | |
363 | wake_one_thread_and_unlock(lock); | |
364 | else | |
365 | lock.unlock(); | |
366 | ||
367 | // Ensure the count of outstanding work is decremented on block exit. | |
368 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
369 | (void)on_exit; | |
370 | ||
371 | // Complete the operation. May throw an exception. Deletes the object. | |
372 | o->complete(*this, ec, task_result); | |
373 | ||
374 | return 1; | |
375 | } | |
376 | } | |
377 | else | |
378 | { | |
379 | wakeup_event_.clear(lock); | |
380 | wakeup_event_.wait(lock); | |
381 | } | |
382 | } | |
383 | ||
384 | return 0; | |
385 | } | |
386 | ||
387 | std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, | |
388 | task_io_service::thread_info& this_thread, | |
389 | const boost::system::error_code& ec) | |
390 | { | |
391 | if (stopped_) | |
392 | return 0; | |
393 | ||
394 | operation* o = op_queue_.front(); | |
395 | if (o == &task_operation_) | |
396 | { | |
397 | op_queue_.pop(); | |
398 | lock.unlock(); | |
399 | ||
400 | { | |
401 | task_cleanup c = { this, &lock, &this_thread }; | |
402 | (void)c; | |
403 | ||
404 | // Run the task. May throw an exception. Only block if the operation | |
405 | // queue is empty and we're not polling, otherwise we want to return | |
406 | // as soon as possible. | |
407 | task_->run(false, this_thread.private_op_queue); | |
408 | } | |
409 | ||
410 | o = op_queue_.front(); | |
411 | if (o == &task_operation_) | |
412 | { | |
413 | wakeup_event_.maybe_unlock_and_signal_one(lock); | |
414 | return 0; | |
415 | } | |
416 | } | |
417 | ||
418 | if (o == 0) | |
419 | return 0; | |
420 | ||
421 | op_queue_.pop(); | |
422 | bool more_handlers = (!op_queue_.empty()); | |
423 | ||
424 | std::size_t task_result = o->task_result_; | |
425 | ||
426 | if (more_handlers && !one_thread_) | |
427 | wake_one_thread_and_unlock(lock); | |
428 | else | |
429 | lock.unlock(); | |
430 | ||
431 | // Ensure the count of outstanding work is decremented on block exit. | |
432 | work_cleanup on_exit = { this, &lock, &this_thread }; | |
433 | (void)on_exit; | |
434 | ||
435 | // Complete the operation. May throw an exception. Deletes the object. | |
436 | o->complete(*this, ec, task_result); | |
437 | ||
438 | return 1; | |
439 | } | |
440 | ||
441 | void task_io_service::stop_all_threads( | |
442 | mutex::scoped_lock& lock) | |
443 | { | |
444 | stopped_ = true; | |
445 | wakeup_event_.signal_all(lock); | |
446 | ||
447 | if (!task_interrupted_ && task_) | |
448 | { | |
449 | task_interrupted_ = true; | |
450 | task_->interrupt(); | |
451 | } | |
452 | } | |
453 | ||
454 | void task_io_service::wake_one_thread_and_unlock( | |
455 | mutex::scoped_lock& lock) | |
456 | { | |
457 | if (!wakeup_event_.maybe_unlock_and_signal_one(lock)) | |
458 | { | |
459 | if (!task_interrupted_ && task_) | |
460 | { | |
461 | task_interrupted_ = true; | |
462 | task_->interrupt(); | |
463 | } | |
464 | lock.unlock(); | |
465 | } | |
466 | } | |
467 | ||
468 | } // namespace detail | |
469 | } // namespace asio | |
470 | } // namespace boost | |
471 | ||
472 | #include <boost/asio/detail/pop_options.hpp> | |
473 | ||
474 | #endif // !defined(BOOST_ASIO_HAS_IOCP) | |
475 | ||
476 | #endif // BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP |