]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/asio/detail/impl/win_iocp_io_context.ipp
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / boost / boost / asio / detail / impl / win_iocp_io_context.ipp
CommitLineData
7c673cae 1//
b32b8144 2// detail/impl/win_iocp_io_context.ipp
7c673cae
FG
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
f67539c2 5// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7c673cae
FG
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
b32b8144
FG
11#ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
12#define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
7c673cae
FG
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19
20#if defined(BOOST_ASIO_HAS_IOCP)
21
22#include <boost/asio/error.hpp>
7c673cae
FG
23#include <boost/asio/detail/cstdint.hpp>
24#include <boost/asio/detail/handler_alloc_helpers.hpp>
25#include <boost/asio/detail/handler_invoke_helpers.hpp>
26#include <boost/asio/detail/limits.hpp>
92f5a8d4 27#include <boost/asio/detail/thread.hpp>
7c673cae 28#include <boost/asio/detail/throw_error.hpp>
b32b8144 29#include <boost/asio/detail/win_iocp_io_context.hpp>
7c673cae
FG
30
31#include <boost/asio/detail/push_options.hpp>
32
33namespace boost {
34namespace asio {
35namespace detail {
36
92f5a8d4
TL
37struct win_iocp_io_context::thread_function
38{
39 explicit thread_function(win_iocp_io_context* s)
40 : this_(s)
41 {
42 }
43
44 void operator()()
45 {
46 boost::system::error_code ec;
47 this_->run(ec);
48 }
49
50 win_iocp_io_context* this_;
51};
52
b32b8144 53struct win_iocp_io_context::work_finished_on_block_exit
7c673cae
FG
54{
55 ~work_finished_on_block_exit()
56 {
b32b8144 57 io_context_->work_finished();
7c673cae
FG
58 }
59
b32b8144 60 win_iocp_io_context* io_context_;
7c673cae
FG
61};
62
b32b8144 63struct win_iocp_io_context::timer_thread_function
7c673cae
FG
64{
65 void operator()()
66 {
b32b8144 67 while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
7c673cae 68 {
b32b8144 69 if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
7c673cae
FG
70 INFINITE) == WAIT_OBJECT_0)
71 {
b32b8144
FG
72 ::InterlockedExchange(&io_context_->dispatch_required_, 1);
73 ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
7c673cae
FG
74 0, wake_for_dispatch, 0);
75 }
76 }
77 }
78
b32b8144 79 win_iocp_io_context* io_context_;
7c673cae
FG
80};
81
b32b8144 82win_iocp_io_context::win_iocp_io_context(
92f5a8d4 83 boost::asio::execution_context& ctx, int concurrency_hint, bool own_thread)
b32b8144 84 : execution_context_service_base<win_iocp_io_context>(ctx),
7c673cae
FG
85 iocp_(),
86 outstanding_work_(0),
87 stopped_(0),
88 stop_event_posted_(0),
89 shutdown_(0),
90 gqcs_timeout_(get_gqcs_timeout()),
b32b8144
FG
91 dispatch_required_(0),
92 concurrency_hint_(concurrency_hint)
7c673cae
FG
93{
94 BOOST_ASIO_HANDLER_TRACKING_INIT;
95
96 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
b32b8144 97 static_cast<DWORD>(concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
7c673cae
FG
98 if (!iocp_.handle)
99 {
100 DWORD last_error = ::GetLastError();
101 boost::system::error_code ec(last_error,
102 boost::asio::error::get_system_category());
103 boost::asio::detail::throw_error(ec, "iocp");
104 }
92f5a8d4
TL
105
106 if (own_thread)
107 {
108 ::InterlockedIncrement(&outstanding_work_);
109 thread_.reset(new boost::asio::detail::thread(thread_function(this)));
110 }
111}
112
113win_iocp_io_context::~win_iocp_io_context()
114{
115 if (thread_.get())
116 {
117 thread_->join();
118 thread_.reset();
119 }
7c673cae
FG
120}
121
b32b8144 122void win_iocp_io_context::shutdown()
7c673cae
FG
123{
124 ::InterlockedExchange(&shutdown_, 1);
125
126 if (timer_thread_.get())
127 {
128 LARGE_INTEGER timeout;
129 timeout.QuadPart = 1;
130 ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
131 }
132
92f5a8d4
TL
133 if (thread_.get())
134 {
f67539c2 135 stop();
92f5a8d4
TL
136 thread_->join();
137 thread_.reset();
138 ::InterlockedDecrement(&outstanding_work_);
139 }
140
7c673cae
FG
141 while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
142 {
143 op_queue<win_iocp_operation> ops;
144 timer_queues_.get_all_timers(ops);
145 ops.push(completed_ops_);
146 if (!ops.empty())
147 {
148 while (win_iocp_operation* op = ops.front())
149 {
150 ops.pop();
151 ::InterlockedDecrement(&outstanding_work_);
152 op->destroy();
153 }
154 }
155 else
156 {
157 DWORD bytes_transferred = 0;
158 dword_ptr_t completion_key = 0;
159 LPOVERLAPPED overlapped = 0;
160 ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
161 &completion_key, &overlapped, gqcs_timeout_);
162 if (overlapped)
163 {
164 ::InterlockedDecrement(&outstanding_work_);
165 static_cast<win_iocp_operation*>(overlapped)->destroy();
166 }
167 }
168 }
169
170 if (timer_thread_.get())
171 timer_thread_->join();
172}
173
b32b8144 174boost::system::error_code win_iocp_io_context::register_handle(
7c673cae
FG
175 HANDLE handle, boost::system::error_code& ec)
176{
177 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
178 {
179 DWORD last_error = ::GetLastError();
180 ec = boost::system::error_code(last_error,
181 boost::asio::error::get_system_category());
182 }
183 else
184 {
185 ec = boost::system::error_code();
186 }
187 return ec;
188}
189
b32b8144 190size_t win_iocp_io_context::run(boost::system::error_code& ec)
7c673cae
FG
191{
192 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
193 {
194 stop();
195 ec = boost::system::error_code();
196 return 0;
197 }
198
199 win_iocp_thread_info this_thread;
200 thread_call_stack::context ctx(this, this_thread);
201
202 size_t n = 0;
b32b8144 203 while (do_one(INFINITE, ec))
7c673cae
FG
204 if (n != (std::numeric_limits<size_t>::max)())
205 ++n;
206 return n;
207}
208
b32b8144 209size_t win_iocp_io_context::run_one(boost::system::error_code& ec)
7c673cae
FG
210{
211 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
212 {
213 stop();
214 ec = boost::system::error_code();
215 return 0;
216 }
217
218 win_iocp_thread_info this_thread;
219 thread_call_stack::context ctx(this, this_thread);
220
b32b8144 221 return do_one(INFINITE, ec);
7c673cae
FG
222}
223
b32b8144
FG
224size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec)
225{
226 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
227 {
228 stop();
229 ec = boost::system::error_code();
230 return 0;
231 }
232
233 win_iocp_thread_info this_thread;
234 thread_call_stack::context ctx(this, this_thread);
235
236 return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), ec);
237}
238
239size_t win_iocp_io_context::poll(boost::system::error_code& ec)
7c673cae
FG
240{
241 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
242 {
243 stop();
244 ec = boost::system::error_code();
245 return 0;
246 }
247
248 win_iocp_thread_info this_thread;
249 thread_call_stack::context ctx(this, this_thread);
250
251 size_t n = 0;
b32b8144 252 while (do_one(0, ec))
7c673cae
FG
253 if (n != (std::numeric_limits<size_t>::max)())
254 ++n;
255 return n;
256}
257
b32b8144 258size_t win_iocp_io_context::poll_one(boost::system::error_code& ec)
7c673cae
FG
259{
260 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
261 {
262 stop();
263 ec = boost::system::error_code();
264 return 0;
265 }
266
267 win_iocp_thread_info this_thread;
268 thread_call_stack::context ctx(this, this_thread);
269
b32b8144 270 return do_one(0, ec);
7c673cae
FG
271}
272
b32b8144 273void win_iocp_io_context::stop()
7c673cae
FG
274{
275 if (::InterlockedExchange(&stopped_, 1) == 0)
276 {
277 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
278 {
279 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
280 {
281 DWORD last_error = ::GetLastError();
282 boost::system::error_code ec(last_error,
283 boost::asio::error::get_system_category());
284 boost::asio::detail::throw_error(ec, "pqcs");
285 }
286 }
287 }
288}
289
b32b8144 290void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
7c673cae
FG
291{
292 // Flag the operation as ready.
293 op->ready_ = 1;
294
295 // Enqueue the operation on the I/O completion port.
296 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
297 {
298 // Out of resources. Put on completed queue instead.
299 mutex::scoped_lock lock(dispatch_mutex_);
300 completed_ops_.push(op);
301 ::InterlockedExchange(&dispatch_required_, 1);
302 }
303}
304
b32b8144 305void win_iocp_io_context::post_deferred_completions(
7c673cae
FG
306 op_queue<win_iocp_operation>& ops)
307{
308 while (win_iocp_operation* op = ops.front())
309 {
310 ops.pop();
311
312 // Flag the operation as ready.
313 op->ready_ = 1;
314
315 // Enqueue the operation on the I/O completion port.
316 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
317 {
318 // Out of resources. Put on completed queue instead.
319 mutex::scoped_lock lock(dispatch_mutex_);
320 completed_ops_.push(op);
321 completed_ops_.push(ops);
322 ::InterlockedExchange(&dispatch_required_, 1);
323 }
324 }
325}
326
b32b8144 327void win_iocp_io_context::abandon_operations(
7c673cae
FG
328 op_queue<win_iocp_operation>& ops)
329{
330 while (win_iocp_operation* op = ops.front())
331 {
332 ops.pop();
333 ::InterlockedDecrement(&outstanding_work_);
334 op->destroy();
335 }
336}
337
b32b8144 338void win_iocp_io_context::on_pending(win_iocp_operation* op)
7c673cae
FG
339{
340 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
341 {
342 // Enqueue the operation on the I/O completion port.
343 if (!::PostQueuedCompletionStatus(iocp_.handle,
344 0, overlapped_contains_result, op))
345 {
346 // Out of resources. Put on completed queue instead.
347 mutex::scoped_lock lock(dispatch_mutex_);
348 completed_ops_.push(op);
349 ::InterlockedExchange(&dispatch_required_, 1);
350 }
351 }
352}
353
b32b8144 354void win_iocp_io_context::on_completion(win_iocp_operation* op,
7c673cae
FG
355 DWORD last_error, DWORD bytes_transferred)
356{
357 // Flag that the operation is ready for invocation.
358 op->ready_ = 1;
359
360 // Store results in the OVERLAPPED structure.
361 op->Internal = reinterpret_cast<ulong_ptr_t>(
362 &boost::asio::error::get_system_category());
363 op->Offset = last_error;
364 op->OffsetHigh = bytes_transferred;
365
366 // Enqueue the operation on the I/O completion port.
367 if (!::PostQueuedCompletionStatus(iocp_.handle,
368 0, overlapped_contains_result, op))
369 {
370 // Out of resources. Put on completed queue instead.
371 mutex::scoped_lock lock(dispatch_mutex_);
372 completed_ops_.push(op);
373 ::InterlockedExchange(&dispatch_required_, 1);
374 }
375}
376
b32b8144 377void win_iocp_io_context::on_completion(win_iocp_operation* op,
7c673cae
FG
378 const boost::system::error_code& ec, DWORD bytes_transferred)
379{
380 // Flag that the operation is ready for invocation.
381 op->ready_ = 1;
382
383 // Store results in the OVERLAPPED structure.
384 op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
385 op->Offset = ec.value();
386 op->OffsetHigh = bytes_transferred;
387
388 // Enqueue the operation on the I/O completion port.
389 if (!::PostQueuedCompletionStatus(iocp_.handle,
390 0, overlapped_contains_result, op))
391 {
392 // Out of resources. Put on completed queue instead.
393 mutex::scoped_lock lock(dispatch_mutex_);
394 completed_ops_.push(op);
395 ::InterlockedExchange(&dispatch_required_, 1);
396 }
397}
398
b32b8144 399size_t win_iocp_io_context::do_one(DWORD msec, boost::system::error_code& ec)
7c673cae
FG
400{
401 for (;;)
402 {
403 // Try to acquire responsibility for dispatching timers and completed ops.
404 if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
405 {
406 mutex::scoped_lock lock(dispatch_mutex_);
407
408 // Dispatch pending timers and operations.
409 op_queue<win_iocp_operation> ops;
410 ops.push(completed_ops_);
411 timer_queues_.get_ready_timers(ops);
412 post_deferred_completions(ops);
413 update_timeout();
414 }
415
416 // Get the next operation from the queue.
417 DWORD bytes_transferred = 0;
418 dword_ptr_t completion_key = 0;
419 LPOVERLAPPED overlapped = 0;
420 ::SetLastError(0);
b32b8144
FG
421 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
422 &bytes_transferred, &completion_key, &overlapped,
423 msec < gqcs_timeout_ ? msec : gqcs_timeout_);
7c673cae
FG
424 DWORD last_error = ::GetLastError();
425
426 if (overlapped)
427 {
428 win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
429 boost::system::error_code result_ec(last_error,
430 boost::asio::error::get_system_category());
431
432 // We may have been passed the last_error and bytes_transferred in the
433 // OVERLAPPED structure itself.
434 if (completion_key == overlapped_contains_result)
435 {
436 result_ec = boost::system::error_code(static_cast<int>(op->Offset),
437 *reinterpret_cast<boost::system::error_category*>(op->Internal));
438 bytes_transferred = op->OffsetHigh;
439 }
440
441 // Otherwise ensure any result has been saved into the OVERLAPPED
442 // structure.
443 else
444 {
445 op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
446 op->Offset = result_ec.value();
447 op->OffsetHigh = bytes_transferred;
448 }
449
450 // Dispatch the operation only if ready. The operation may not be ready
451 // if the initiating function (e.g. a call to WSARecv) has not yet
452 // returned. This is because the initiating function still wants access
453 // to the operation's OVERLAPPED structure.
454 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
455 {
456 // Ensure the count of outstanding work is decremented on block exit.
457 work_finished_on_block_exit on_exit = { this };
458 (void)on_exit;
459
b32b8144 460 op->complete(this, result_ec, bytes_transferred);
7c673cae
FG
461 ec = boost::system::error_code();
462 return 1;
463 }
464 }
465 else if (!ok)
466 {
467 if (last_error != WAIT_TIMEOUT)
468 {
469 ec = boost::system::error_code(last_error,
470 boost::asio::error::get_system_category());
471 return 0;
472 }
473
b32b8144
FG
474 // If we're waiting indefinitely we need to keep going until we get a
475 // real handler.
476 if (msec == INFINITE)
7c673cae
FG
477 continue;
478
479 ec = boost::system::error_code();
480 return 0;
481 }
482 else if (completion_key == wake_for_dispatch)
483 {
484 // We have been woken up to try to acquire responsibility for dispatching
485 // timers and completed operations.
486 }
487 else
488 {
489 // Indicate that there is no longer an in-flight stop event.
490 ::InterlockedExchange(&stop_event_posted_, 0);
491
492 // The stopped_ flag is always checked to ensure that any leftover
493 // stop events from a previous run invocation are ignored.
494 if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
495 {
496 // Wake up next thread that is blocked on GetQueuedCompletionStatus.
497 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
498 {
499 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
500 {
501 last_error = ::GetLastError();
502 ec = boost::system::error_code(last_error,
503 boost::asio::error::get_system_category());
504 return 0;
505 }
506 }
507
508 ec = boost::system::error_code();
509 return 0;
510 }
511 }
512 }
513}
514
b32b8144 515DWORD win_iocp_io_context::get_gqcs_timeout()
7c673cae
FG
516{
517 OSVERSIONINFOEX osvi;
518 ZeroMemory(&osvi, sizeof(osvi));
519 osvi.dwOSVersionInfoSize = sizeof(osvi);
520 osvi.dwMajorVersion = 6ul;
521
522 const uint64_t condition_mask = ::VerSetConditionMask(
523 0, VER_MAJORVERSION, VER_GREATER_EQUAL);
524
525 if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
526 return INFINITE;
527
528 return default_gqcs_timeout;
529}
530
b32b8144 531void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
7c673cae
FG
532{
533 mutex::scoped_lock lock(dispatch_mutex_);
534
535 timer_queues_.insert(&queue);
536
537 if (!waitable_timer_.handle)
538 {
539 waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
540 if (waitable_timer_.handle == 0)
541 {
542 DWORD last_error = ::GetLastError();
543 boost::system::error_code ec(last_error,
544 boost::asio::error::get_system_category());
545 boost::asio::detail::throw_error(ec, "timer");
546 }
547
548 LARGE_INTEGER timeout;
549 timeout.QuadPart = -max_timeout_usec;
550 timeout.QuadPart *= 10;
551 ::SetWaitableTimer(waitable_timer_.handle,
552 &timeout, max_timeout_msec, 0, 0, FALSE);
553 }
554
555 if (!timer_thread_.get())
556 {
557 timer_thread_function thread_function = { this };
558 timer_thread_.reset(new thread(thread_function, 65536));
559 }
560}
561
b32b8144 562void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
7c673cae
FG
563{
564 mutex::scoped_lock lock(dispatch_mutex_);
565
566 timer_queues_.erase(&queue);
567}
568
b32b8144 569void win_iocp_io_context::update_timeout()
7c673cae
FG
570{
571 if (timer_thread_.get())
572 {
573 // There's no point updating the waitable timer if the new timeout period
574 // exceeds the maximum timeout. In that case, we might as well wait for the
575 // existing period of the timer to expire.
576 long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
577 if (timeout_usec < max_timeout_usec)
578 {
579 LARGE_INTEGER timeout;
580 timeout.QuadPart = -timeout_usec;
581 timeout.QuadPart *= 10;
582 ::SetWaitableTimer(waitable_timer_.handle,
583 &timeout, max_timeout_msec, 0, 0, FALSE);
584 }
585 }
586}
587
588} // namespace detail
589} // namespace asio
590} // namespace boost
591
592#include <boost/asio/detail/pop_options.hpp>
593
594#endif // defined(BOOST_ASIO_HAS_IOCP)
595
b32b8144 596#endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP