]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/fiber/buffered_channel.hpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / boost / fiber / buffered_channel.hpp
CommitLineData
b32b8144
FG
1
2// Copyright Oliver Kowalke 2016.
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9#define BOOST_FIBERS_BUFFERED_CHANNEL_H
10
11#include <atomic>
12#include <chrono>
13#include <cstddef>
14#include <cstdint>
15#include <memory>
16#include <type_traits>
17
18#include <boost/config.hpp>
19
20#include <boost/fiber/channel_op_status.hpp>
21#include <boost/fiber/context.hpp>
22#include <boost/fiber/detail/config.hpp>
23#include <boost/fiber/detail/convert.hpp>
24#include <boost/fiber/detail/spinlock.hpp>
25#include <boost/fiber/exceptions.hpp>
26
27#ifdef BOOST_HAS_ABI_HEADERS
28# include BOOST_ABI_PREFIX
29#endif
30
31namespace boost {
32namespace fibers {
33
34template< typename T >
35class buffered_channel {
36public:
11fdf7f2 37 typedef typename std::remove_reference< T >::type value_type;
b32b8144
FG
38
39private:
40 typedef context::wait_queue_t wait_queue_type;
11fdf7f2 41 typedef value_type slot_type;
b32b8144
FG
42
43 mutable detail::spinlock splk_{};
44 wait_queue_type waiting_producers_{};
45 wait_queue_type waiting_consumers_{};
46 slot_type * slots_;
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
51
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
54 }
55
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
58 }
59
60 bool is_closed_() const noexcept {
61 return closed_;
62 }
63
64public:
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
70 }
71 slots_ = new slot_type[capacity_];
72 }
73
74 ~buffered_channel() {
75 close();
76 delete [] slots_;
77 }
78
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
81
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
84 return is_closed_();
85 }
86
87 void close() noexcept {
88 context * active_ctx = context::active();
89 detail::spinlock_lock lk{ splk_ };
92f5a8d4
TL
90 if ( ! closed_) {
91 closed_ = true;
92 // notify all waiting producers
93 while ( ! waiting_producers_.empty() ) {
94 context * producer_ctx = & waiting_producers_.front();
95 waiting_producers_.pop_front();
96 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
97 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
98 // notify context
99 active_ctx->schedule( producer_ctx);
100 } else if ( static_cast< std::intptr_t >( 0) == expected) {
101 // no timed-wait op.
102 // notify context
103 active_ctx->schedule( producer_ctx);
104 }
b32b8144 105 }
92f5a8d4
TL
106 // notify all waiting consumers
107 while ( ! waiting_consumers_.empty() ) {
108 context * consumer_ctx = & waiting_consumers_.front();
109 waiting_consumers_.pop_front();
110 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
111 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
112 // notify context
113 active_ctx->schedule( consumer_ctx);
114 } else if ( static_cast< std::intptr_t >( 0) == expected) {
115 // no timed-wait op.
116 // notify context
117 active_ctx->schedule( consumer_ctx);
118 }
b32b8144
FG
119 }
120 }
121 }
122
123 channel_op_status try_push( value_type const& value) {
124 context * active_ctx = context::active();
125 detail::spinlock_lock lk{ splk_ };
126 if ( BOOST_UNLIKELY( is_closed_() ) ) {
127 return channel_op_status::closed;
128 } else if ( is_full_() ) {
129 return channel_op_status::full;
130 } else {
131 slots_[pidx_] = value;
132 pidx_ = (pidx_ + 1) % capacity_;
133 // notify one waiting consumer
134 while ( ! waiting_consumers_.empty() ) {
135 context * consumer_ctx = & waiting_consumers_.front();
136 waiting_consumers_.pop_front();
137 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
138 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 139 lk.unlock();
b32b8144
FG
140 // notify context
141 active_ctx->schedule( consumer_ctx);
142 break;
143 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 144 lk.unlock();
b32b8144
FG
145 // no timed-wait op.
146 // notify context
147 active_ctx->schedule( consumer_ctx);
148 break;
b32b8144
FG
149 }
150 }
151 return channel_op_status::success;
152 }
153 }
154
155 channel_op_status try_push( value_type && value) {
156 context * active_ctx = context::active();
157 detail::spinlock_lock lk{ splk_ };
158 if ( BOOST_UNLIKELY( is_closed_() ) ) {
159 return channel_op_status::closed;
160 } else if ( is_full_() ) {
161 return channel_op_status::full;
162 } else {
163 slots_[pidx_] = std::move( value);
164 pidx_ = (pidx_ + 1) % capacity_;
165 // notify one waiting consumer
166 while ( ! waiting_consumers_.empty() ) {
167 context * consumer_ctx = & waiting_consumers_.front();
168 waiting_consumers_.pop_front();
b32b8144
FG
169 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
170 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 171 lk.unlock();
b32b8144
FG
172 // notify context
173 active_ctx->schedule( consumer_ctx);
174 break;
175 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 176 lk.unlock();
b32b8144
FG
177 // no timed-wait op.
178 // notify context
179 active_ctx->schedule( consumer_ctx);
180 break;
b32b8144
FG
181 }
182 }
183 return channel_op_status::success;
184 }
185 }
186
187 channel_op_status push( value_type const& value) {
188 context * active_ctx = context::active();
189 for (;;) {
190 detail::spinlock_lock lk{ splk_ };
191 if ( BOOST_UNLIKELY( is_closed_() ) ) {
192 return channel_op_status::closed;
193 } else if ( is_full_() ) {
194 active_ctx->wait_link( waiting_producers_);
195 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
196 // suspend this producer
197 active_ctx->suspend( lk);
198 } else {
199 slots_[pidx_] = value;
200 pidx_ = (pidx_ + 1) % capacity_;
201 // notify one waiting consumer
202 while ( ! waiting_consumers_.empty() ) {
203 context * consumer_ctx = & waiting_consumers_.front();
204 waiting_consumers_.pop_front();
b32b8144
FG
205 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
206 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 207 lk.unlock();
b32b8144
FG
208 // notify context
209 active_ctx->schedule( consumer_ctx);
210 break;
211 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 212 lk.unlock();
b32b8144
FG
213 // no timed-wait op.
214 // notify context
215 active_ctx->schedule( consumer_ctx);
216 break;
b32b8144
FG
217 }
218 }
219 return channel_op_status::success;
220 }
221 }
222 }
223
224 channel_op_status push( value_type && value) {
225 context * active_ctx = context::active();
226 for (;;) {
227 detail::spinlock_lock lk{ splk_ };
228 if ( BOOST_UNLIKELY( is_closed_() ) ) {
229 return channel_op_status::closed;
230 } else if ( is_full_() ) {
231 active_ctx->wait_link( waiting_producers_);
232 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
233 // suspend this producer
234 active_ctx->suspend( lk);
235 } else {
236 slots_[pidx_] = std::move( value);
237 pidx_ = (pidx_ + 1) % capacity_;
238 // notify one waiting consumer
239 while ( ! waiting_consumers_.empty() ) {
240 context * consumer_ctx = & waiting_consumers_.front();
241 waiting_consumers_.pop_front();
b32b8144
FG
242 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
243 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 244 lk.unlock();
b32b8144
FG
245 // notify context
246 active_ctx->schedule( consumer_ctx);
247 break;
248 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 249 lk.unlock();
b32b8144
FG
250 // no timed-wait op.
251 // notify context
252 active_ctx->schedule( consumer_ctx);
253 break;
b32b8144
FG
254 }
255 }
256 return channel_op_status::success;
257 }
258 }
259 }
260
261 template< typename Rep, typename Period >
262 channel_op_status push_wait_for( value_type const& value,
263 std::chrono::duration< Rep, Period > const& timeout_duration) {
264 return push_wait_until( value,
265 std::chrono::steady_clock::now() + timeout_duration);
266 }
267
268 template< typename Rep, typename Period >
269 channel_op_status push_wait_for( value_type && value,
270 std::chrono::duration< Rep, Period > const& timeout_duration) {
271 return push_wait_until( std::forward< value_type >( value),
272 std::chrono::steady_clock::now() + timeout_duration);
273 }
274
275 template< typename Clock, typename Duration >
276 channel_op_status push_wait_until( value_type const& value,
277 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
278 context * active_ctx = context::active();
279 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
280 for (;;) {
281 detail::spinlock_lock lk{ splk_ };
282 if ( BOOST_UNLIKELY( is_closed_() ) ) {
283 return channel_op_status::closed;
284 } else if ( is_full_() ) {
285 active_ctx->wait_link( waiting_producers_);
b32b8144
FG
286 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
287 // suspend this producer
288 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
289 // relock local lk
290 lk.lock();
291 // remove from waiting-queue
292 waiting_producers_.remove( * active_ctx);
293 return channel_op_status::timeout;
294 }
295 } else {
296 slots_[pidx_] = value;
297 pidx_ = (pidx_ + 1) % capacity_;
298 // notify one waiting consumer
299 while ( ! waiting_consumers_.empty() ) {
300 context * consumer_ctx = & waiting_consumers_.front();
301 waiting_consumers_.pop_front();
b32b8144
FG
302 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
303 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 304 lk.unlock();
b32b8144
FG
305 // notify context
306 active_ctx->schedule( consumer_ctx);
307 break;
308 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 309 lk.unlock();
b32b8144
FG
310 // no timed-wait op.
311 // notify context
312 active_ctx->schedule( consumer_ctx);
313 break;
b32b8144
FG
314 }
315 }
316 return channel_op_status::success;
317 }
318 }
319 }
320
321 template< typename Clock, typename Duration >
322 channel_op_status push_wait_until( value_type && value,
323 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
324 context * active_ctx = context::active();
325 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
326 for (;;) {
327 detail::spinlock_lock lk{ splk_ };
328 if ( BOOST_UNLIKELY( is_closed_() ) ) {
329 return channel_op_status::closed;
330 } else if ( is_full_() ) {
331 active_ctx->wait_link( waiting_producers_);
b32b8144
FG
332 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
333 // suspend this producer
334 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
335 // relock local lk
336 lk.lock();
337 // remove from waiting-queue
338 waiting_producers_.remove( * active_ctx);
339 return channel_op_status::timeout;
340 }
341 } else {
342 slots_[pidx_] = std::move( value);
343 pidx_ = (pidx_ + 1) % capacity_;
344 // notify one waiting consumer
345 while ( ! waiting_consumers_.empty() ) {
346 context * consumer_ctx = & waiting_consumers_.front();
347 waiting_consumers_.pop_front();
b32b8144
FG
348 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
349 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 350 lk.unlock();
b32b8144
FG
351 // notify context
352 active_ctx->schedule( consumer_ctx);
353 break;
354 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 355 lk.unlock();
b32b8144
FG
356 // no timed-wait op.
357 // notify context
358 active_ctx->schedule( consumer_ctx);
359 break;
b32b8144
FG
360 }
361 }
362 return channel_op_status::success;
363 }
364 }
365 }
366
367 channel_op_status try_pop( value_type & value) {
368 context * active_ctx = context::active();
369 detail::spinlock_lock lk{ splk_ };
370 if ( is_empty_() ) {
371 return is_closed_()
372 ? channel_op_status::closed
373 : channel_op_status::empty;
374 } else {
375 value = std::move( slots_[cidx_]);
376 cidx_ = (cidx_ + 1) % capacity_;
377 // notify one waiting producer
378 while ( ! waiting_producers_.empty() ) {
379 context * producer_ctx = & waiting_producers_.front();
380 waiting_producers_.pop_front();
b32b8144
FG
381 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
382 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 383 lk.unlock();
b32b8144
FG
384 // notify context
385 active_ctx->schedule( producer_ctx);
386 break;
387 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 388 lk.unlock();
b32b8144
FG
389 // no timed-wait op.
390 // notify context
391 active_ctx->schedule( producer_ctx);
392 break;
b32b8144
FG
393 }
394 }
395 return channel_op_status::success;
396 }
397 }
398
399 channel_op_status pop( value_type & value) {
400 context * active_ctx = context::active();
401 for (;;) {
402 detail::spinlock_lock lk{ splk_ };
403 if ( is_empty_() ) {
404 if ( BOOST_UNLIKELY( is_closed_() ) ) {
405 return channel_op_status::closed;
406 } else {
407 active_ctx->wait_link( waiting_consumers_);
408 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
409 // suspend this consumer
410 active_ctx->suspend( lk);
411 }
412 } else {
413 value = std::move( slots_[cidx_]);
414 cidx_ = (cidx_ + 1) % capacity_;
415 // notify one waiting producer
416 while ( ! waiting_producers_.empty() ) {
417 context * producer_ctx = & waiting_producers_.front();
418 waiting_producers_.pop_front();
b32b8144
FG
419 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
420 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 421 lk.unlock();
b32b8144
FG
422 // notify context
423 active_ctx->schedule( producer_ctx);
424 break;
425 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 426 lk.unlock();
b32b8144
FG
427 // no timed-wait op.
428 // notify context
429 active_ctx->schedule( producer_ctx);
430 break;
b32b8144
FG
431 }
432 }
433 return channel_op_status::success;
434 }
435 }
436 }
437
438 value_type value_pop() {
439 context * active_ctx = context::active();
440 for (;;) {
441 detail::spinlock_lock lk{ splk_ };
442 if ( is_empty_() ) {
443 if ( BOOST_UNLIKELY( is_closed_() ) ) {
444 throw fiber_error{
445 std::make_error_code( std::errc::operation_not_permitted),
446 "boost fiber: channel is closed" };
447 } else {
448 active_ctx->wait_link( waiting_consumers_);
449 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
450 // suspend this consumer
451 active_ctx->suspend( lk);
452 }
453 } else {
454 value_type value = std::move( slots_[cidx_]);
455 cidx_ = (cidx_ + 1) % capacity_;
456 // notify one waiting producer
457 while ( ! waiting_producers_.empty() ) {
458 context * producer_ctx = & waiting_producers_.front();
459 waiting_producers_.pop_front();
b32b8144
FG
460 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
461 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 462 lk.unlock();
b32b8144
FG
463 // notify context
464 active_ctx->schedule( producer_ctx);
465 break;
466 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 467 lk.unlock();
b32b8144
FG
468 // no timed-wait op.
469 // notify context
470 active_ctx->schedule( producer_ctx);
471 break;
b32b8144
FG
472 }
473 }
474 return std::move( value);
475 }
476 }
477 }
478
479 template< typename Rep, typename Period >
480 channel_op_status pop_wait_for( value_type & value,
481 std::chrono::duration< Rep, Period > const& timeout_duration) {
482 return pop_wait_until( value,
483 std::chrono::steady_clock::now() + timeout_duration);
484 }
485
486 template< typename Clock, typename Duration >
487 channel_op_status pop_wait_until( value_type & value,
488 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
489 context * active_ctx = context::active();
490 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
491 for (;;) {
492 detail::spinlock_lock lk{ splk_ };
493 if ( is_empty_() ) {
494 if ( BOOST_UNLIKELY( is_closed_() ) ) {
495 return channel_op_status::closed;
496 } else {
497 active_ctx->wait_link( waiting_consumers_);
b32b8144
FG
498 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
499 // suspend this consumer
500 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
501 // relock local lk
502 lk.lock();
503 // remove from waiting-queue
504 waiting_consumers_.remove( * active_ctx);
505 return channel_op_status::timeout;
506 }
507 }
508 } else {
509 value = std::move( slots_[cidx_]);
510 cidx_ = (cidx_ + 1) % capacity_;
511 // notify one waiting producer
512 while ( ! waiting_producers_.empty() ) {
513 context * producer_ctx = & waiting_producers_.front();
514 waiting_producers_.pop_front();
b32b8144
FG
515 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
516 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
92f5a8d4 517 lk.unlock();
b32b8144
FG
518 // notify context
519 active_ctx->schedule( producer_ctx);
520 break;
521 } else if ( static_cast< std::intptr_t >( 0) == expected) {
92f5a8d4 522 lk.unlock();
b32b8144
FG
523 // no timed-wait op.
524 // notify context
525 active_ctx->schedule( producer_ctx);
526 break;
b32b8144
FG
527 }
528 }
529 return channel_op_status::success;
530 }
531 }
532 }
533
11fdf7f2 534 class iterator {
b32b8144
FG
535 private:
536 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
537
11fdf7f2
TL
538 buffered_channel * chan_{ nullptr };
539 storage_type storage_;
b32b8144
FG
540
541 void increment_() {
542 BOOST_ASSERT( nullptr != chan_);
543 try {
544 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
545 } catch ( fiber_error const&) {
546 chan_ = nullptr;
547 }
548 }
549
550 public:
11fdf7f2
TL
551 typedef std::input_iterator_tag iterator_category;
552 typedef std::ptrdiff_t difference_type;
553 typedef value_type * pointer;
554 typedef value_type & reference;
555
556 typedef pointer pointer_t;
557 typedef reference reference_t;
b32b8144
FG
558
559 iterator() noexcept = default;
560
561 explicit iterator( buffered_channel< T > * chan) noexcept :
562 chan_{ chan } {
563 increment_();
564 }
565
566 iterator( iterator const& other) noexcept :
567 chan_{ other.chan_ } {
568 }
569
570 iterator & operator=( iterator const& other) noexcept {
571 if ( BOOST_LIKELY( this != & other) ) {
572 chan_ = other.chan_;
573 }
574 return * this;
575 }
576
577 bool operator==( iterator const& other) const noexcept {
578 return other.chan_ == chan_;
579 }
580
581 bool operator!=( iterator const& other) const noexcept {
582 return other.chan_ != chan_;
583 }
584
585 iterator & operator++() {
92f5a8d4 586 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
b32b8144
FG
587 increment_();
588 return * this;
589 }
590
591 iterator operator++( int) = delete;
592
593 reference_t operator*() noexcept {
594 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
595 }
596
597 pointer_t operator->() noexcept {
598 return reinterpret_cast< value_type * >( std::addressof( storage_) );
599 }
600 };
601
602 friend class iterator;
603};
604
605template< typename T >
606typename buffered_channel< T >::iterator
607begin( buffered_channel< T > & chan) {
608 return typename buffered_channel< T >::iterator( & chan);
609}
610
611template< typename T >
612typename buffered_channel< T >::iterator
613end( buffered_channel< T > &) {
614 return typename buffered_channel< T >::iterator();
615}
616
617}}
618
619#ifdef BOOST_HAS_ABI_HEADERS
620# include BOOST_ABI_SUFFIX
621#endif
622
623#endif // BOOST_FIBERS_BUFFERED_CHANNEL_H