]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/asio/include/boost/asio/detail/impl/win_iocp_io_service.ipp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / asio / include / boost / asio / detail / impl / win_iocp_io_service.ipp
1 //
2 // detail/impl/win_iocp_io_service.ipp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_SERVICE_IPP
12 #define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_SERVICE_IPP
13
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18 #include <boost/asio/detail/config.hpp>
19
20 #if defined(BOOST_ASIO_HAS_IOCP)
21
22 #include <boost/asio/error.hpp>
23 #include <boost/asio/io_service.hpp>
24 #include <boost/asio/detail/cstdint.hpp>
25 #include <boost/asio/detail/handler_alloc_helpers.hpp>
26 #include <boost/asio/detail/handler_invoke_helpers.hpp>
27 #include <boost/asio/detail/limits.hpp>
28 #include <boost/asio/detail/throw_error.hpp>
29 #include <boost/asio/detail/win_iocp_io_service.hpp>
30
31 #include <boost/asio/detail/push_options.hpp>
32
33 namespace boost {
34 namespace asio {
35 namespace detail {
36
37 struct win_iocp_io_service::work_finished_on_block_exit
38 {
39 ~work_finished_on_block_exit()
40 {
41 io_service_->work_finished();
42 }
43
44 win_iocp_io_service* io_service_;
45 };
46
47 struct win_iocp_io_service::timer_thread_function
48 {
49 void operator()()
50 {
51 while (::InterlockedExchangeAdd(&io_service_->shutdown_, 0) == 0)
52 {
53 if (::WaitForSingleObject(io_service_->waitable_timer_.handle,
54 INFINITE) == WAIT_OBJECT_0)
55 {
56 ::InterlockedExchange(&io_service_->dispatch_required_, 1);
57 ::PostQueuedCompletionStatus(io_service_->iocp_.handle,
58 0, wake_for_dispatch, 0);
59 }
60 }
61 }
62
63 win_iocp_io_service* io_service_;
64 };
65
66 win_iocp_io_service::win_iocp_io_service(
67 boost::asio::io_service& io_service, size_t concurrency_hint)
68 : boost::asio::detail::service_base<win_iocp_io_service>(io_service),
69 iocp_(),
70 outstanding_work_(0),
71 stopped_(0),
72 stop_event_posted_(0),
73 shutdown_(0),
74 gqcs_timeout_(get_gqcs_timeout()),
75 dispatch_required_(0)
76 {
77 BOOST_ASIO_HANDLER_TRACKING_INIT;
78
79 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
80 static_cast<DWORD>(concurrency_hint < DWORD(~0)
81 ? concurrency_hint : DWORD(~0)));
82 if (!iocp_.handle)
83 {
84 DWORD last_error = ::GetLastError();
85 boost::system::error_code ec(last_error,
86 boost::asio::error::get_system_category());
87 boost::asio::detail::throw_error(ec, "iocp");
88 }
89 }
90
91 void win_iocp_io_service::shutdown_service()
92 {
93 ::InterlockedExchange(&shutdown_, 1);
94
95 if (timer_thread_.get())
96 {
97 LARGE_INTEGER timeout;
98 timeout.QuadPart = 1;
99 ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
100 }
101
102 while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
103 {
104 op_queue<win_iocp_operation> ops;
105 timer_queues_.get_all_timers(ops);
106 ops.push(completed_ops_);
107 if (!ops.empty())
108 {
109 while (win_iocp_operation* op = ops.front())
110 {
111 ops.pop();
112 ::InterlockedDecrement(&outstanding_work_);
113 op->destroy();
114 }
115 }
116 else
117 {
118 DWORD bytes_transferred = 0;
119 dword_ptr_t completion_key = 0;
120 LPOVERLAPPED overlapped = 0;
121 ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
122 &completion_key, &overlapped, gqcs_timeout_);
123 if (overlapped)
124 {
125 ::InterlockedDecrement(&outstanding_work_);
126 static_cast<win_iocp_operation*>(overlapped)->destroy();
127 }
128 }
129 }
130
131 if (timer_thread_.get())
132 timer_thread_->join();
133 }
134
135 boost::system::error_code win_iocp_io_service::register_handle(
136 HANDLE handle, boost::system::error_code& ec)
137 {
138 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
139 {
140 DWORD last_error = ::GetLastError();
141 ec = boost::system::error_code(last_error,
142 boost::asio::error::get_system_category());
143 }
144 else
145 {
146 ec = boost::system::error_code();
147 }
148 return ec;
149 }
150
151 size_t win_iocp_io_service::run(boost::system::error_code& ec)
152 {
153 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
154 {
155 stop();
156 ec = boost::system::error_code();
157 return 0;
158 }
159
160 win_iocp_thread_info this_thread;
161 thread_call_stack::context ctx(this, this_thread);
162
163 size_t n = 0;
164 while (do_one(true, ec))
165 if (n != (std::numeric_limits<size_t>::max)())
166 ++n;
167 return n;
168 }
169
170 size_t win_iocp_io_service::run_one(boost::system::error_code& ec)
171 {
172 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
173 {
174 stop();
175 ec = boost::system::error_code();
176 return 0;
177 }
178
179 win_iocp_thread_info this_thread;
180 thread_call_stack::context ctx(this, this_thread);
181
182 return do_one(true, ec);
183 }
184
185 size_t win_iocp_io_service::poll(boost::system::error_code& ec)
186 {
187 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
188 {
189 stop();
190 ec = boost::system::error_code();
191 return 0;
192 }
193
194 win_iocp_thread_info this_thread;
195 thread_call_stack::context ctx(this, this_thread);
196
197 size_t n = 0;
198 while (do_one(false, ec))
199 if (n != (std::numeric_limits<size_t>::max)())
200 ++n;
201 return n;
202 }
203
204 size_t win_iocp_io_service::poll_one(boost::system::error_code& ec)
205 {
206 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
207 {
208 stop();
209 ec = boost::system::error_code();
210 return 0;
211 }
212
213 win_iocp_thread_info this_thread;
214 thread_call_stack::context ctx(this, this_thread);
215
216 return do_one(false, ec);
217 }
218
219 void win_iocp_io_service::stop()
220 {
221 if (::InterlockedExchange(&stopped_, 1) == 0)
222 {
223 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
224 {
225 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
226 {
227 DWORD last_error = ::GetLastError();
228 boost::system::error_code ec(last_error,
229 boost::asio::error::get_system_category());
230 boost::asio::detail::throw_error(ec, "pqcs");
231 }
232 }
233 }
234 }
235
236 void win_iocp_io_service::post_deferred_completion(win_iocp_operation* op)
237 {
238 // Flag the operation as ready.
239 op->ready_ = 1;
240
241 // Enqueue the operation on the I/O completion port.
242 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
243 {
244 // Out of resources. Put on completed queue instead.
245 mutex::scoped_lock lock(dispatch_mutex_);
246 completed_ops_.push(op);
247 ::InterlockedExchange(&dispatch_required_, 1);
248 }
249 }
250
251 void win_iocp_io_service::post_deferred_completions(
252 op_queue<win_iocp_operation>& ops)
253 {
254 while (win_iocp_operation* op = ops.front())
255 {
256 ops.pop();
257
258 // Flag the operation as ready.
259 op->ready_ = 1;
260
261 // Enqueue the operation on the I/O completion port.
262 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
263 {
264 // Out of resources. Put on completed queue instead.
265 mutex::scoped_lock lock(dispatch_mutex_);
266 completed_ops_.push(op);
267 completed_ops_.push(ops);
268 ::InterlockedExchange(&dispatch_required_, 1);
269 }
270 }
271 }
272
273 void win_iocp_io_service::abandon_operations(
274 op_queue<win_iocp_operation>& ops)
275 {
276 while (win_iocp_operation* op = ops.front())
277 {
278 ops.pop();
279 ::InterlockedDecrement(&outstanding_work_);
280 op->destroy();
281 }
282 }
283
284 void win_iocp_io_service::on_pending(win_iocp_operation* op)
285 {
286 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
287 {
288 // Enqueue the operation on the I/O completion port.
289 if (!::PostQueuedCompletionStatus(iocp_.handle,
290 0, overlapped_contains_result, op))
291 {
292 // Out of resources. Put on completed queue instead.
293 mutex::scoped_lock lock(dispatch_mutex_);
294 completed_ops_.push(op);
295 ::InterlockedExchange(&dispatch_required_, 1);
296 }
297 }
298 }
299
300 void win_iocp_io_service::on_completion(win_iocp_operation* op,
301 DWORD last_error, DWORD bytes_transferred)
302 {
303 // Flag that the operation is ready for invocation.
304 op->ready_ = 1;
305
306 // Store results in the OVERLAPPED structure.
307 op->Internal = reinterpret_cast<ulong_ptr_t>(
308 &boost::asio::error::get_system_category());
309 op->Offset = last_error;
310 op->OffsetHigh = bytes_transferred;
311
312 // Enqueue the operation on the I/O completion port.
313 if (!::PostQueuedCompletionStatus(iocp_.handle,
314 0, overlapped_contains_result, op))
315 {
316 // Out of resources. Put on completed queue instead.
317 mutex::scoped_lock lock(dispatch_mutex_);
318 completed_ops_.push(op);
319 ::InterlockedExchange(&dispatch_required_, 1);
320 }
321 }
322
323 void win_iocp_io_service::on_completion(win_iocp_operation* op,
324 const boost::system::error_code& ec, DWORD bytes_transferred)
325 {
326 // Flag that the operation is ready for invocation.
327 op->ready_ = 1;
328
329 // Store results in the OVERLAPPED structure.
330 op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
331 op->Offset = ec.value();
332 op->OffsetHigh = bytes_transferred;
333
334 // Enqueue the operation on the I/O completion port.
335 if (!::PostQueuedCompletionStatus(iocp_.handle,
336 0, overlapped_contains_result, op))
337 {
338 // Out of resources. Put on completed queue instead.
339 mutex::scoped_lock lock(dispatch_mutex_);
340 completed_ops_.push(op);
341 ::InterlockedExchange(&dispatch_required_, 1);
342 }
343 }
344
345 size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec)
346 {
347 for (;;)
348 {
349 // Try to acquire responsibility for dispatching timers and completed ops.
350 if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
351 {
352 mutex::scoped_lock lock(dispatch_mutex_);
353
354 // Dispatch pending timers and operations.
355 op_queue<win_iocp_operation> ops;
356 ops.push(completed_ops_);
357 timer_queues_.get_ready_timers(ops);
358 post_deferred_completions(ops);
359 update_timeout();
360 }
361
362 // Get the next operation from the queue.
363 DWORD bytes_transferred = 0;
364 dword_ptr_t completion_key = 0;
365 LPOVERLAPPED overlapped = 0;
366 ::SetLastError(0);
367 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
368 &completion_key, &overlapped, block ? gqcs_timeout_ : 0);
369 DWORD last_error = ::GetLastError();
370
371 if (overlapped)
372 {
373 win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
374 boost::system::error_code result_ec(last_error,
375 boost::asio::error::get_system_category());
376
377 // We may have been passed the last_error and bytes_transferred in the
378 // OVERLAPPED structure itself.
379 if (completion_key == overlapped_contains_result)
380 {
381 result_ec = boost::system::error_code(static_cast<int>(op->Offset),
382 *reinterpret_cast<boost::system::error_category*>(op->Internal));
383 bytes_transferred = op->OffsetHigh;
384 }
385
386 // Otherwise ensure any result has been saved into the OVERLAPPED
387 // structure.
388 else
389 {
390 op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
391 op->Offset = result_ec.value();
392 op->OffsetHigh = bytes_transferred;
393 }
394
395 // Dispatch the operation only if ready. The operation may not be ready
396 // if the initiating function (e.g. a call to WSARecv) has not yet
397 // returned. This is because the initiating function still wants access
398 // to the operation's OVERLAPPED structure.
399 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
400 {
401 // Ensure the count of outstanding work is decremented on block exit.
402 work_finished_on_block_exit on_exit = { this };
403 (void)on_exit;
404
405 op->complete(*this, result_ec, bytes_transferred);
406 ec = boost::system::error_code();
407 return 1;
408 }
409 }
410 else if (!ok)
411 {
412 if (last_error != WAIT_TIMEOUT)
413 {
414 ec = boost::system::error_code(last_error,
415 boost::asio::error::get_system_category());
416 return 0;
417 }
418
419 // If we're not polling we need to keep going until we get a real handler.
420 if (block)
421 continue;
422
423 ec = boost::system::error_code();
424 return 0;
425 }
426 else if (completion_key == wake_for_dispatch)
427 {
428 // We have been woken up to try to acquire responsibility for dispatching
429 // timers and completed operations.
430 }
431 else
432 {
433 // Indicate that there is no longer an in-flight stop event.
434 ::InterlockedExchange(&stop_event_posted_, 0);
435
436 // The stopped_ flag is always checked to ensure that any leftover
437 // stop events from a previous run invocation are ignored.
438 if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
439 {
440 // Wake up next thread that is blocked on GetQueuedCompletionStatus.
441 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
442 {
443 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
444 {
445 last_error = ::GetLastError();
446 ec = boost::system::error_code(last_error,
447 boost::asio::error::get_system_category());
448 return 0;
449 }
450 }
451
452 ec = boost::system::error_code();
453 return 0;
454 }
455 }
456 }
457 }
458
459 DWORD win_iocp_io_service::get_gqcs_timeout()
460 {
461 OSVERSIONINFOEX osvi;
462 ZeroMemory(&osvi, sizeof(osvi));
463 osvi.dwOSVersionInfoSize = sizeof(osvi);
464 osvi.dwMajorVersion = 6ul;
465
466 const uint64_t condition_mask = ::VerSetConditionMask(
467 0, VER_MAJORVERSION, VER_GREATER_EQUAL);
468
469 if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
470 return INFINITE;
471
472 return default_gqcs_timeout;
473 }
474
475 void win_iocp_io_service::do_add_timer_queue(timer_queue_base& queue)
476 {
477 mutex::scoped_lock lock(dispatch_mutex_);
478
479 timer_queues_.insert(&queue);
480
481 if (!waitable_timer_.handle)
482 {
483 waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
484 if (waitable_timer_.handle == 0)
485 {
486 DWORD last_error = ::GetLastError();
487 boost::system::error_code ec(last_error,
488 boost::asio::error::get_system_category());
489 boost::asio::detail::throw_error(ec, "timer");
490 }
491
492 LARGE_INTEGER timeout;
493 timeout.QuadPart = -max_timeout_usec;
494 timeout.QuadPart *= 10;
495 ::SetWaitableTimer(waitable_timer_.handle,
496 &timeout, max_timeout_msec, 0, 0, FALSE);
497 }
498
499 if (!timer_thread_.get())
500 {
501 timer_thread_function thread_function = { this };
502 timer_thread_.reset(new thread(thread_function, 65536));
503 }
504 }
505
506 void win_iocp_io_service::do_remove_timer_queue(timer_queue_base& queue)
507 {
508 mutex::scoped_lock lock(dispatch_mutex_);
509
510 timer_queues_.erase(&queue);
511 }
512
513 void win_iocp_io_service::update_timeout()
514 {
515 if (timer_thread_.get())
516 {
517 // There's no point updating the waitable timer if the new timeout period
518 // exceeds the maximum timeout. In that case, we might as well wait for the
519 // existing period of the timer to expire.
520 long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
521 if (timeout_usec < max_timeout_usec)
522 {
523 LARGE_INTEGER timeout;
524 timeout.QuadPart = -timeout_usec;
525 timeout.QuadPart *= 10;
526 ::SetWaitableTimer(waitable_timer_.handle,
527 &timeout, max_timeout_msec, 0, 0, FALSE);
528 }
529 }
530 }
531
532 } // namespace detail
533 } // namespace asio
534 } // namespace boost
535
536 #include <boost/asio/detail/pop_options.hpp>
537
538 #endif // defined(BOOST_ASIO_HAS_IOCP)
539
540 #endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_SERVICE_IPP