]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/fiber/buffered_channel.hpp
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / boost / boost / fiber / buffered_channel.hpp
1
2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
10
11 #include <atomic>
12 #include <chrono>
13 #include <cstddef>
14 #include <cstdint>
15 #include <memory>
16 #include <type_traits>
17
18 #include <boost/config.hpp>
19
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/detail/config.hpp>
23 #include <boost/fiber/detail/convert.hpp>
24 #include <boost/fiber/detail/spinlock.hpp>
25 #include <boost/fiber/exceptions.hpp>
26
27 #ifdef BOOST_HAS_ABI_HEADERS
28 # include BOOST_ABI_PREFIX
29 #endif
30
31 namespace boost {
32 namespace fibers {
33
34 template< typename T >
35 class buffered_channel {
36 public:
37 using value_type = typename std::remove_reference<T>::type;
38
39 private:
40 using wait_queue_type = context::wait_queue_t;
41 using slot_type = value_type;
42
43 mutable detail::spinlock splk_{};
44 wait_queue_type waiting_producers_{};
45 wait_queue_type waiting_consumers_{};
46 slot_type * slots_;
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
51
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
54 }
55
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
58 }
59
60 bool is_closed_() const noexcept {
61 return closed_;
62 }
63
64 public:
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
70 }
71 slots_ = new slot_type[capacity_];
72 }
73
74 ~buffered_channel() {
75 close();
76 delete [] slots_;
77 }
78
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
81
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
84 return is_closed_();
85 }
86
87 void close() noexcept {
88 context * active_ctx = context::active();
89 detail::spinlock_lock lk{ splk_ };
90 if ( ! closed_) {
91 closed_ = true;
92 // notify all waiting producers
93 while ( ! waiting_producers_.empty() ) {
94 context * producer_ctx = & waiting_producers_.front();
95 waiting_producers_.pop_front();
96 auto expected = reinterpret_cast< std::intptr_t >( this);
97 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
98 // notify context
99 active_ctx->schedule( producer_ctx);
100 } else if ( static_cast< std::intptr_t >( 0) == expected) {
101 // no timed-wait op.
102 // notify context
103 active_ctx->schedule( producer_ctx);
104 }
105 }
106 // notify all waiting consumers
107 while ( ! waiting_consumers_.empty() ) {
108 context * consumer_ctx = & waiting_consumers_.front();
109 waiting_consumers_.pop_front();
110 auto expected = reinterpret_cast< std::intptr_t >( this);
111 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
112 // notify context
113 active_ctx->schedule( consumer_ctx);
114 } else if ( static_cast< std::intptr_t >( 0) == expected) {
115 // no timed-wait op.
116 // notify context
117 active_ctx->schedule( consumer_ctx);
118 }
119 }
120 }
121 }
122
123 channel_op_status try_push( value_type const& value) {
124 context * active_ctx = context::active();
125 detail::spinlock_lock lk{ splk_ };
126 if ( BOOST_UNLIKELY( is_closed_() ) ) {
127 return channel_op_status::closed;
128 }
129 if ( is_full_() ) {
130 return channel_op_status::full;
131 }
132 slots_[pidx_] = value;
133 pidx_ = (pidx_ + 1) % capacity_;
134 // notify one waiting consumer
135 while ( ! waiting_consumers_.empty() ) {
136 context * consumer_ctx = & waiting_consumers_.front();
137 waiting_consumers_.pop_front();
138 auto expected = reinterpret_cast< std::intptr_t >( this);
139 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
140 lk.unlock();
141 // notify context
142 active_ctx->schedule( consumer_ctx);
143 break;
144 }
145 if ( static_cast< std::intptr_t >( 0) == expected) {
146 lk.unlock();
147 // no timed-wait op.
148 // notify context
149 active_ctx->schedule( consumer_ctx);
150 break;
151 }
152 }
153 return channel_op_status::success;
154 }
155
156 channel_op_status try_push( value_type && value) {
157 context * active_ctx = context::active();
158 detail::spinlock_lock lk{ splk_ };
159 if ( BOOST_UNLIKELY( is_closed_() ) ) {
160 return channel_op_status::closed;
161 }
162 if ( is_full_() ) {
163 return channel_op_status::full;
164 }
165 slots_[pidx_] = std::move( value);
166 pidx_ = (pidx_ + 1) % capacity_;
167 // notify one waiting consumer
168 while ( ! waiting_consumers_.empty() ) {
169 context * consumer_ctx = & waiting_consumers_.front();
170 waiting_consumers_.pop_front();
171 auto expected = reinterpret_cast< std::intptr_t >( this);
172 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
173 lk.unlock();
174 // notify context
175 active_ctx->schedule( consumer_ctx);
176 break;
177 }
178 if ( static_cast< std::intptr_t >( 0) == expected) {
179 lk.unlock();
180 // no timed-wait op.
181 // notify context
182 active_ctx->schedule( consumer_ctx);
183 break;
184 }
185 }
186 return channel_op_status::success;
187 }
188
189 channel_op_status push( value_type const& value) {
190 context * active_ctx = context::active();
191 for (;;) {
192 detail::spinlock_lock lk{ splk_ };
193 if ( BOOST_UNLIKELY( is_closed_() ) ) {
194 return channel_op_status::closed;
195 }
196 if ( is_full_() ) {
197 active_ctx->wait_link( waiting_producers_);
198 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
199 // suspend this producer
200 active_ctx->suspend( lk);
201 } else {
202 slots_[pidx_] = value;
203 pidx_ = (pidx_ + 1) % capacity_;
204 // notify one waiting consumer
205 while ( ! waiting_consumers_.empty() ) {
206 context * consumer_ctx = & waiting_consumers_.front();
207 waiting_consumers_.pop_front();
208 auto expected = reinterpret_cast< std::intptr_t >( this);
209 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
210 lk.unlock();
211 // notify context
212 active_ctx->schedule( consumer_ctx);
213 break;
214 }
215 if ( static_cast< std::intptr_t >( 0) == expected) {
216 lk.unlock();
217 // no timed-wait op.
218 // notify context
219 active_ctx->schedule( consumer_ctx);
220 break;
221 }
222 }
223 return channel_op_status::success;
224 }
225 }
226 }
227
228 channel_op_status push( value_type && value) {
229 context * active_ctx = context::active();
230 for (;;) {
231 detail::spinlock_lock lk{ splk_ };
232 if ( BOOST_UNLIKELY( is_closed_() ) ) {
233 return channel_op_status::closed;
234 }
235 if ( is_full_() ) {
236 active_ctx->wait_link( waiting_producers_);
237 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
238 // suspend this producer
239 active_ctx->suspend( lk);
240 } else {
241 slots_[pidx_] = std::move( value);
242 pidx_ = (pidx_ + 1) % capacity_;
243 // notify one waiting consumer
244 while ( ! waiting_consumers_.empty() ) {
245 context * consumer_ctx = & waiting_consumers_.front();
246 waiting_consumers_.pop_front();
247 auto expected = reinterpret_cast< std::intptr_t >( this);
248 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
249 lk.unlock();
250 // notify context
251 active_ctx->schedule( consumer_ctx);
252 break;
253 }
254 if ( static_cast< std::intptr_t >( 0) == expected) {
255 lk.unlock();
256 // no timed-wait op.
257 // notify context
258 active_ctx->schedule( consumer_ctx);
259 break;
260 }
261 }
262 return channel_op_status::success;
263 }
264 }
265 }
266
267 template< typename Rep, typename Period >
268 channel_op_status push_wait_for( value_type const& value,
269 std::chrono::duration< Rep, Period > const& timeout_duration) {
270 return push_wait_until( value,
271 std::chrono::steady_clock::now() + timeout_duration);
272 }
273
274 template< typename Rep, typename Period >
275 channel_op_status push_wait_for( value_type && value,
276 std::chrono::duration< Rep, Period > const& timeout_duration) {
277 return push_wait_until( std::forward< value_type >( value),
278 std::chrono::steady_clock::now() + timeout_duration);
279 }
280
281 template< typename Clock, typename Duration >
282 channel_op_status push_wait_until( value_type const& value,
283 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
284 context * active_ctx = context::active();
285 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
286 for (;;) {
287 detail::spinlock_lock lk{ splk_ };
288 if ( BOOST_UNLIKELY( is_closed_() ) ) {
289 return channel_op_status::closed;
290 }
291 if ( is_full_() ) {
292 active_ctx->wait_link( waiting_producers_);
293 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
294 // suspend this producer
295 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
296 // relock local lk
297 lk.lock();
298 // remove from waiting-queue
299 waiting_producers_.remove( * active_ctx);
300 return channel_op_status::timeout;
301 }
302 } else {
303 slots_[pidx_] = value;
304 pidx_ = (pidx_ + 1) % capacity_;
305 // notify one waiting consumer
306 while ( ! waiting_consumers_.empty() ) {
307 context * consumer_ctx = & waiting_consumers_.front();
308 waiting_consumers_.pop_front();
309 auto expected = reinterpret_cast< std::intptr_t >( this);
310 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
311 lk.unlock();
312 // notify context
313 active_ctx->schedule( consumer_ctx);
314 break;
315 }
316 if ( static_cast< std::intptr_t >( 0) == expected) {
317 lk.unlock();
318 // no timed-wait op.
319 // notify context
320 active_ctx->schedule( consumer_ctx);
321 break;
322 }
323 }
324 return channel_op_status::success;
325 }
326 }
327 }
328
329 template< typename Clock, typename Duration >
330 channel_op_status push_wait_until( value_type && value,
331 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
332 context * active_ctx = context::active();
333 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
334 for (;;) {
335 detail::spinlock_lock lk{ splk_ };
336 if ( BOOST_UNLIKELY( is_closed_() ) ) {
337 return channel_op_status::closed;
338 }
339 if ( is_full_() ) {
340 active_ctx->wait_link( waiting_producers_);
341 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
342 // suspend this producer
343 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
344 // relock local lk
345 lk.lock();
346 // remove from waiting-queue
347 waiting_producers_.remove( * active_ctx);
348 return channel_op_status::timeout;
349 }
350 } else {
351 slots_[pidx_] = std::move( value);
352 pidx_ = (pidx_ + 1) % capacity_;
353 // notify one waiting consumer
354 while ( ! waiting_consumers_.empty() ) {
355 context * consumer_ctx = & waiting_consumers_.front();
356 waiting_consumers_.pop_front();
357 auto expected = reinterpret_cast< std::intptr_t >( this);
358 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
359 lk.unlock();
360 // notify context
361 active_ctx->schedule( consumer_ctx);
362 break;
363 }
364 if ( static_cast< std::intptr_t >( 0) == expected) {
365 lk.unlock();
366 // no timed-wait op.
367 // notify context
368 active_ctx->schedule( consumer_ctx);
369 break;
370 }
371 }
372 return channel_op_status::success;
373 }
374 }
375 }
376
377 channel_op_status try_pop( value_type & value) {
378 context * active_ctx = context::active();
379 detail::spinlock_lock lk{ splk_ };
380 if ( is_empty_() ) {
381 return is_closed_()
382 ? channel_op_status::closed
383 : channel_op_status::empty;
384 }
385 value = std::move( slots_[cidx_]);
386 cidx_ = (cidx_ + 1) % capacity_;
387 // notify one waiting producer
388 while ( ! waiting_producers_.empty() ) {
389 context * producer_ctx = & waiting_producers_.front();
390 waiting_producers_.pop_front();
391 auto expected = reinterpret_cast< std::intptr_t >( this);
392 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
393 lk.unlock();
394 // notify context
395 active_ctx->schedule( producer_ctx);
396 break;
397 }
398 if ( static_cast< std::intptr_t >( 0) == expected) {
399 lk.unlock();
400 // no timed-wait op.
401 // notify context
402 active_ctx->schedule( producer_ctx);
403 break;
404 }
405 }
406 return channel_op_status::success;
407 }
408
409 channel_op_status pop( value_type & value) {
410 context * active_ctx = context::active();
411 for (;;) {
412 detail::spinlock_lock lk{ splk_ };
413 if ( is_empty_() ) {
414 if ( BOOST_UNLIKELY( is_closed_() ) ) {
415 return channel_op_status::closed;
416 }
417 active_ctx->wait_link( waiting_consumers_);
418 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
419 // suspend this consumer
420 active_ctx->suspend( lk);
421 } else {
422 value = std::move( slots_[cidx_]);
423 cidx_ = (cidx_ + 1) % capacity_;
424 // notify one waiting producer
425 while ( ! waiting_producers_.empty() ) {
426 context * producer_ctx = & waiting_producers_.front();
427 waiting_producers_.pop_front();
428 auto expected = reinterpret_cast< std::intptr_t >( this);
429 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
430 lk.unlock();
431 // notify context
432 active_ctx->schedule( producer_ctx);
433 break;
434 }
435 if ( static_cast< std::intptr_t >( 0) == expected) {
436 lk.unlock();
437 // no timed-wait op.
438 // notify context
439 active_ctx->schedule( producer_ctx);
440 break;
441 }
442 }
443 return channel_op_status::success;
444 }
445 }
446 }
447
448 value_type value_pop() {
449 context * active_ctx = context::active();
450 for (;;) {
451 detail::spinlock_lock lk{ splk_ };
452 if ( is_empty_() ) {
453 if ( BOOST_UNLIKELY( is_closed_() ) ) {
454 throw fiber_error{
455 std::make_error_code( std::errc::operation_not_permitted),
456 "boost fiber: channel is closed" };
457 }
458 active_ctx->wait_link( waiting_consumers_);
459 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
460 // suspend this consumer
461 active_ctx->suspend( lk);
462 } else {
463 value_type value = std::move( slots_[cidx_]);
464 cidx_ = (cidx_ + 1) % capacity_;
465 // notify one waiting producer
466 while ( ! waiting_producers_.empty() ) {
467 context * producer_ctx = & waiting_producers_.front();
468 waiting_producers_.pop_front();
469 auto expected = reinterpret_cast< std::intptr_t >( this);
470 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
471 lk.unlock();
472 // notify context
473 active_ctx->schedule( producer_ctx);
474 break;
475 }
476 if ( static_cast< std::intptr_t >( 0) == expected) {
477 lk.unlock();
478 // no timed-wait op.
479 // notify context
480 active_ctx->schedule( producer_ctx);
481 break;
482 }
483 }
484 return value;
485 }
486 }
487 }
488
489 template< typename Rep, typename Period >
490 channel_op_status pop_wait_for( value_type & value,
491 std::chrono::duration< Rep, Period > const& timeout_duration) {
492 return pop_wait_until( value,
493 std::chrono::steady_clock::now() + timeout_duration);
494 }
495
496 template< typename Clock, typename Duration >
497 channel_op_status pop_wait_until( value_type & value,
498 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
499 context * active_ctx = context::active();
500 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
501 for (;;) {
502 detail::spinlock_lock lk{ splk_ };
503 if ( is_empty_() ) {
504 if ( BOOST_UNLIKELY( is_closed_() ) ) {
505 return channel_op_status::closed;
506 }
507 active_ctx->wait_link( waiting_consumers_);
508 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
509 // suspend this consumer
510 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
511 // relock local lk
512 lk.lock();
513 // remove from waiting-queue
514 waiting_consumers_.remove( * active_ctx);
515 return channel_op_status::timeout;
516 }
517 } else {
518 value = std::move( slots_[cidx_]);
519 cidx_ = (cidx_ + 1) % capacity_;
520 // notify one waiting producer
521 while ( ! waiting_producers_.empty() ) {
522 context * producer_ctx = & waiting_producers_.front();
523 waiting_producers_.pop_front();
524 auto expected = reinterpret_cast< std::intptr_t >( this);
525 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
526 lk.unlock();
527 // notify context
528 active_ctx->schedule( producer_ctx);
529 break;
530 } if ( static_cast< std::intptr_t >( 0) == expected) {
531 lk.unlock();
532 // no timed-wait op.
533 // notify context
534 active_ctx->schedule( producer_ctx);
535 break;
536 }
537 }
538 return channel_op_status::success;
539 }
540 }
541 }
542
543 class iterator {
544 private:
545 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
546
547 buffered_channel * chan_{ nullptr };
548 storage_type storage_;
549
550 void increment_() {
551 BOOST_ASSERT( nullptr != chan_);
552 try {
553 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
554 } catch ( fiber_error const&) {
555 chan_ = nullptr;
556 }
557 }
558
559 public:
560 using iterator_category = std::input_iterator_tag;
561 using difference_type = std::ptrdiff_t;
562 using pointer = value_type *;
563 using reference = value_type &;
564
565 using pointer_t = pointer;
566 using reference_t = reference;
567
568 iterator() noexcept = default;
569
570 explicit iterator( buffered_channel< T > * chan) noexcept :
571 chan_{ chan } {
572 increment_();
573 }
574
575 iterator( iterator const& other) noexcept :
576 chan_{ other.chan_ } {
577 }
578
579 iterator & operator=( iterator const& other) noexcept {
580 if ( BOOST_LIKELY( this != & other) ) {
581 chan_ = other.chan_;
582 }
583 return * this;
584 }
585
586 bool operator==( iterator const& other) const noexcept {
587 return other.chan_ == chan_;
588 }
589
590 bool operator!=( iterator const& other) const noexcept {
591 return other.chan_ != chan_;
592 }
593
594 iterator & operator++() {
595 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
596 increment_();
597 return * this;
598 }
599
600 const iterator operator++( int) = delete;
601
602 reference_t operator*() noexcept {
603 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
604 }
605
606 pointer_t operator->() noexcept {
607 return reinterpret_cast< value_type * >( std::addressof( storage_) );
608 }
609 };
610
611 friend class iterator;
612 };
613
614 template< typename T >
615 typename buffered_channel< T >::iterator
616 begin( buffered_channel< T > & chan) {
617 return typename buffered_channel< T >::iterator( & chan);
618 }
619
620 template< typename T >
621 typename buffered_channel< T >::iterator
622 end( buffered_channel< T > &) {
623 return typename buffered_channel< T >::iterator();
624 }
625
626 }}
627
628 #ifdef BOOST_HAS_ABI_HEADERS
629 # include BOOST_ABI_SUFFIX
630 #endif
631
632 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H