]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/fiber/unbuffered_channel.hpp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / boost / fiber / unbuffered_channel.hpp
CommitLineData
b32b8144
FG
1
2// Copyright Oliver Kowalke 2016.
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8#define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
9
10#include <atomic>
11#include <chrono>
12#include <cstddef>
13#include <cstdint>
14#include <memory>
15#include <vector>
16
17#include <boost/config.hpp>
18
19#include <boost/fiber/channel_op_status.hpp>
20#include <boost/fiber/context.hpp>
21#include <boost/fiber/detail/config.hpp>
22#include <boost/fiber/detail/convert.hpp>
23#include <boost/fiber/detail/spinlock.hpp>
24#include <boost/fiber/exceptions.hpp>
25
26#ifdef BOOST_HAS_ABI_HEADERS
27# include BOOST_ABI_PREFIX
28#endif
29
30namespace boost {
31namespace fibers {
32
33template< typename T >
34class unbuffered_channel {
35public:
11fdf7f2 36 typedef typename std::remove_reference< T >::type value_type;
b32b8144
FG
37
38private:
39 typedef context::wait_queue_t wait_queue_type;
40
41 struct slot {
42 value_type value;
43 context * ctx;
44
45 slot( value_type const& value_, context * ctx_) :
46 value{ value_ },
47 ctx{ ctx_ } {
48 }
49
50 slot( value_type && value_, context * ctx_) :
51 value{ std::move( value_) },
52 ctx{ ctx_ } {
53 }
54 };
55
56 // shared cacheline
11fdf7f2 57 std::atomic< slot * > slot_{ nullptr };
b32b8144 58 // shared cacheline
11fdf7f2
TL
59 std::atomic_bool closed_{ false };
60 mutable detail::spinlock splk_producers_{};
61 wait_queue_type waiting_producers_{};
62 mutable detail::spinlock splk_consumers_{};
63 wait_queue_type waiting_consumers_{};
64 char pad_[cacheline_length];
b32b8144
FG
65
66 bool is_empty_() {
67 return nullptr == slot_.load( std::memory_order_acquire);
68 }
69
70 bool try_push_( slot * own_slot) {
71 for (;;) {
72 slot * s = slot_.load( std::memory_order_acquire);
73 if ( nullptr == s) {
74 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
75 continue;
76 }
77 return true;
78 } else {
79 return false;
80 }
81 }
82 }
83
84 slot * try_pop_() {
85 slot * nil_slot = nullptr;
86 for (;;) {
87 slot * s = slot_.load( std::memory_order_acquire);
88 if ( nullptr != s) {
89 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
90 continue;}
91 }
92 return s;
93 }
94 }
95
96public:
97 unbuffered_channel() {
98 }
99
100 ~unbuffered_channel() {
101 close();
102 }
103
104 unbuffered_channel( unbuffered_channel const&) = delete;
105 unbuffered_channel & operator=( unbuffered_channel const&) = delete;
106
107 bool is_closed() const noexcept {
108 return closed_.load( std::memory_order_acquire);
109 }
110
111 void close() noexcept {
112 context * active_ctx = context::active();
113 // notify all waiting producers
114 closed_.store( true, std::memory_order_release);
115 detail::spinlock_lock lk1{ splk_producers_ };
116 while ( ! waiting_producers_.empty() ) {
117 context * producer_ctx = & waiting_producers_.front();
118 waiting_producers_.pop_front();
119 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
120 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
121 // notify context
122 active_ctx->schedule( producer_ctx);
123 } else if ( static_cast< std::intptr_t >( 0) == expected) {
124 // no timed-wait op.
125 // notify context
126 active_ctx->schedule( producer_ctx);
b32b8144
FG
127 }
128 }
129 // notify all waiting consumers
130 detail::spinlock_lock lk2{ splk_consumers_ };
131 while ( ! waiting_consumers_.empty() ) {
132 context * consumer_ctx = & waiting_consumers_.front();
133 waiting_consumers_.pop_front();
134 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
135 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
136 // notify context
137 active_ctx->schedule( consumer_ctx);
138 } else if ( static_cast< std::intptr_t >( 0) == expected) {
139 // no timed-wait op.
140 // notify context
141 active_ctx->schedule( consumer_ctx);
b32b8144
FG
142 }
143 }
144 }
145
146 channel_op_status push( value_type const& value) {
147 context * active_ctx = context::active();
148 slot s{ value, active_ctx };
149 for (;;) {
150 if ( BOOST_UNLIKELY( is_closed() ) ) {
151 return channel_op_status::closed;
152 }
153 if ( try_push_( & s) ) {
154 detail::spinlock_lock lk{ splk_consumers_ };
155 // notify one waiting consumer
156 while ( ! waiting_consumers_.empty() ) {
157 context * consumer_ctx = & waiting_consumers_.front();
158 waiting_consumers_.pop_front();
159 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
160 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
161 // notify context
162 active_ctx->schedule( consumer_ctx);
163 break;
164 } else if ( static_cast< std::intptr_t >( 0) == expected) {
165 // no timed-wait op.
166 // notify context
167 active_ctx->schedule( consumer_ctx);
168 break;
b32b8144
FG
169 }
170 }
171 // suspend till value has been consumed
172 active_ctx->suspend( lk);
173 // resumed, value has been consumed
174 return channel_op_status::success;
175 } else {
176 detail::spinlock_lock lk{ splk_producers_ };
177 if ( BOOST_UNLIKELY( is_closed() ) ) {
178 return channel_op_status::closed;
179 }
180 if ( is_empty_() ) {
181 continue;
182 }
183 active_ctx->wait_link( waiting_producers_);
184 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
185 // suspend this producer
186 active_ctx->suspend( lk);
187 // resumed, slot mabye free
188 }
189 }
190 }
191
192 channel_op_status push( value_type && value) {
193 context * active_ctx = context::active();
194 slot s{ std::move( value), active_ctx };
195 for (;;) {
196 if ( BOOST_UNLIKELY( is_closed() ) ) {
197 return channel_op_status::closed;
198 }
199 if ( try_push_( & s) ) {
200 detail::spinlock_lock lk{ splk_consumers_ };
201 // notify one waiting consumer
202 while ( ! waiting_consumers_.empty() ) {
203 context * consumer_ctx = & waiting_consumers_.front();
204 waiting_consumers_.pop_front();
205 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
206 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
207 // notify context
208 active_ctx->schedule( consumer_ctx);
209 break;
210 } else if ( static_cast< std::intptr_t >( 0) == expected) {
211 // no timed-wait op.
212 // notify context
213 active_ctx->schedule( consumer_ctx);
214 break;
b32b8144
FG
215 }
216 }
217 // suspend till value has been consumed
218 active_ctx->suspend( lk);
219 // resumed, value has been consumed
220 return channel_op_status::success;
221 } else {
222 detail::spinlock_lock lk{ splk_producers_ };
223 if ( BOOST_UNLIKELY( is_closed() ) ) {
224 return channel_op_status::closed;
225 }
226 if ( is_empty_() ) {
227 continue;
228 }
229 active_ctx->wait_link( waiting_producers_);
230 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
231 // suspend this producer
232 active_ctx->suspend( lk);
233 // resumed, slot mabye free
234 }
235 }
236 }
237
238 template< typename Rep, typename Period >
239 channel_op_status push_wait_for( value_type const& value,
240 std::chrono::duration< Rep, Period > const& timeout_duration) {
241 return push_wait_until( value,
242 std::chrono::steady_clock::now() + timeout_duration);
243 }
244
245 template< typename Rep, typename Period >
246 channel_op_status push_wait_for( value_type && value,
247 std::chrono::duration< Rep, Period > const& timeout_duration) {
248 return push_wait_until( std::forward< value_type >( value),
249 std::chrono::steady_clock::now() + timeout_duration);
250 }
251
252 template< typename Clock, typename Duration >
253 channel_op_status push_wait_until( value_type const& value,
254 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
255 context * active_ctx = context::active();
256 slot s{ value, active_ctx };
257 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
258 for (;;) {
259 if ( BOOST_UNLIKELY( is_closed() ) ) {
260 return channel_op_status::closed;
261 }
262 if ( try_push_( & s) ) {
263 detail::spinlock_lock lk{ splk_consumers_ };
264 // notify one waiting consumer
265 while ( ! waiting_consumers_.empty() ) {
266 context * consumer_ctx = & waiting_consumers_.front();
267 waiting_consumers_.pop_front();
268 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
269 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
270 // notify context
271 active_ctx->schedule( consumer_ctx);
272 break;
273 } else if ( static_cast< std::intptr_t >( 0) == expected) {
274 // no timed-wait op.
275 // notify context
276 active_ctx->schedule( consumer_ctx);
277 break;
b32b8144
FG
278 }
279 }
280 // suspend this producer
b32b8144
FG
281 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
282 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
283 // clear slot
284 slot * nil_slot = nullptr, * own_slot = & s;
285 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
286 // resumed, value has not been consumed
287 return channel_op_status::timeout;
288 }
289 // resumed, value has been consumed
290 return channel_op_status::success;
291 } else {
292 detail::spinlock_lock lk{ splk_producers_ };
293 if ( BOOST_UNLIKELY( is_closed() ) ) {
294 return channel_op_status::closed;
295 }
296 if ( is_empty_() ) {
297 continue;
298 }
299 active_ctx->wait_link( waiting_producers_);
b32b8144
FG
300 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
301 // suspend this producer
302 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
303 // relock local lk
304 lk.lock();
305 // remove from waiting-queue
306 waiting_producers_.remove( * active_ctx);
307 return channel_op_status::timeout;
308 }
309 // resumed, slot maybe free
310 }
311 }
312 }
313
314 template< typename Clock, typename Duration >
315 channel_op_status push_wait_until( value_type && value,
316 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
317 context * active_ctx = context::active();
318 slot s{ std::move( value), active_ctx };
319 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
320 for (;;) {
321 if ( BOOST_UNLIKELY( is_closed() ) ) {
322 return channel_op_status::closed;
323 }
324 if ( try_push_( & s) ) {
325 detail::spinlock_lock lk{ splk_consumers_ };
326 // notify one waiting consumer
327 while ( ! waiting_consumers_.empty() ) {
328 context * consumer_ctx = & waiting_consumers_.front();
329 waiting_consumers_.pop_front();
330 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
331 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
332 // notify context
333 active_ctx->schedule( consumer_ctx);
334 break;
335 } else if ( static_cast< std::intptr_t >( 0) == expected) {
336 // no timed-wait op.
337 // notify context
338 active_ctx->schedule( consumer_ctx);
339 break;
b32b8144
FG
340 }
341 }
342 // suspend this producer
b32b8144
FG
343 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
344 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
345 // clear slot
346 slot * nil_slot = nullptr, * own_slot = & s;
347 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
348 // resumed, value has not been consumed
349 return channel_op_status::timeout;
350 }
351 // resumed, value has been consumed
352 return channel_op_status::success;
353 } else {
354 detail::spinlock_lock lk{ splk_producers_ };
355 if ( BOOST_UNLIKELY( is_closed() ) ) {
356 return channel_op_status::closed;
357 }
358 if ( is_empty_() ) {
359 continue;
360 }
361 active_ctx->wait_link( waiting_producers_);
b32b8144
FG
362 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
363 // suspend this producer
364 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
365 // relock local lk
366 lk.lock();
367 // remove from waiting-queue
368 waiting_producers_.remove( * active_ctx);
369 return channel_op_status::timeout;
370 }
371 // resumed, slot maybe free
372 }
373 }
374 }
375
376 channel_op_status pop( value_type & value) {
377 context * active_ctx = context::active();
378 slot * s = nullptr;
379 for (;;) {
380 if ( nullptr != ( s = try_pop_() ) ) {
381 {
382 detail::spinlock_lock lk{ splk_producers_ };
383 // notify one waiting producer
384 while ( ! waiting_producers_.empty() ) {
385 context * producer_ctx = & waiting_producers_.front();
386 waiting_producers_.pop_front();
387 lk.unlock();
388 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
389 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
390 // notify context
391 active_ctx->schedule( producer_ctx);
392 break;
393 } else if ( static_cast< std::intptr_t >( 0) == expected) {
394 // no timed-wait op.
395 // notify context
396 active_ctx->schedule( producer_ctx);
397 break;
b32b8144
FG
398 }
399 }
400 }
401 value = std::move( s->value);
402 // notify context
403 active_ctx->schedule( s->ctx);
404 return channel_op_status::success;
405 } else {
406 detail::spinlock_lock lk{ splk_consumers_ };
407 if ( BOOST_UNLIKELY( is_closed() ) ) {
408 return channel_op_status::closed;
409 }
410 if ( ! is_empty_() ) {
411 continue;
412 }
413 active_ctx->wait_link( waiting_consumers_);
414 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
415 // suspend this consumer
416 active_ctx->suspend( lk);
417 // resumed, slot mabye set
418 }
419 }
420 }
421
422 value_type value_pop() {
423 context * active_ctx = context::active();
424 slot * s = nullptr;
425 for (;;) {
426 if ( nullptr != ( s = try_pop_() ) ) {
427 {
428 detail::spinlock_lock lk{ splk_producers_ };
429 // notify one waiting producer
430 while ( ! waiting_producers_.empty() ) {
431 context * producer_ctx = & waiting_producers_.front();
432 waiting_producers_.pop_front();
433 lk.unlock();
434 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
435 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
436 // notify context
437 active_ctx->schedule( producer_ctx);
438 break;
439 } else if ( static_cast< std::intptr_t >( 0) == expected) {
440 // no timed-wait op.
441 // notify context
442 active_ctx->schedule( producer_ctx);
443 break;
b32b8144
FG
444 }
445 }
446 }
447 // consume value
448 value_type value = std::move( s->value);
449 // notify context
450 active_ctx->schedule( s->ctx);
451 return std::move( value);
452 } else {
453 detail::spinlock_lock lk{ splk_consumers_ };
454 if ( BOOST_UNLIKELY( is_closed() ) ) {
455 throw fiber_error{
456 std::make_error_code( std::errc::operation_not_permitted),
457 "boost fiber: channel is closed" };
458 }
459 if ( ! is_empty_() ) {
460 continue;
461 }
462 active_ctx->wait_link( waiting_consumers_);
463 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
464 // suspend this consumer
465 active_ctx->suspend( lk);
466 // resumed, slot mabye set
467 }
468 }
469 }
470
471 template< typename Rep, typename Period >
472 channel_op_status pop_wait_for( value_type & value,
473 std::chrono::duration< Rep, Period > const& timeout_duration) {
474 return pop_wait_until( value,
475 std::chrono::steady_clock::now() + timeout_duration);
476 }
477
478 template< typename Clock, typename Duration >
479 channel_op_status pop_wait_until( value_type & value,
480 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
481 context * active_ctx = context::active();
482 slot * s = nullptr;
483 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
484 for (;;) {
485 if ( nullptr != ( s = try_pop_() ) ) {
486 {
487 detail::spinlock_lock lk{ splk_producers_ };
488 // notify one waiting producer
489 while ( ! waiting_producers_.empty() ) {
490 context * producer_ctx = & waiting_producers_.front();
491 waiting_producers_.pop_front();
492 lk.unlock();
493 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
494 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
b32b8144
FG
495 // notify context
496 active_ctx->schedule( producer_ctx);
497 break;
498 } else if ( static_cast< std::intptr_t >( 0) == expected) {
499 // no timed-wait op.
500 // notify context
501 active_ctx->schedule( producer_ctx);
502 break;
b32b8144
FG
503 }
504 }
505 }
506 // consume value
507 value = std::move( s->value);
508 // notify context
509 active_ctx->schedule( s->ctx);
510 return channel_op_status::success;
511 } else {
512 detail::spinlock_lock lk{ splk_consumers_ };
513 if ( BOOST_UNLIKELY( is_closed() ) ) {
514 return channel_op_status::closed;
515 }
516 if ( ! is_empty_() ) {
517 continue;
518 }
519 active_ctx->wait_link( waiting_consumers_);
b32b8144
FG
520 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
521 // suspend this consumer
522 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
523 // relock local lk
524 lk.lock();
525 // remove from waiting-queue
526 waiting_consumers_.remove( * active_ctx);
527 return channel_op_status::timeout;
528 }
529 }
530 }
531 }
532
11fdf7f2 533 class iterator {
b32b8144
FG
534 private:
535 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
536
537 unbuffered_channel * chan_{ nullptr };
538 storage_type storage_;
539
540 void increment_() {
541 BOOST_ASSERT( nullptr != chan_);
542 try {
543 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
544 } catch ( fiber_error const&) {
545 chan_ = nullptr;
546 }
547 }
548
549 public:
11fdf7f2
TL
550 typedef std::input_iterator_tag iterator_category;
551 typedef std::ptrdiff_t difference_type;
552 typedef value_type * pointer;
553 typedef value_type & reference;
554
555 typedef pointer pointer_t;
556 typedef reference reference_t;
b32b8144
FG
557
558 iterator() noexcept = default;
559
560 explicit iterator( unbuffered_channel< T > * chan) noexcept :
561 chan_{ chan } {
562 increment_();
563 }
564
565 iterator( iterator const& other) noexcept :
566 chan_{ other.chan_ } {
567 }
568
569 iterator & operator=( iterator const& other) noexcept {
570 if ( this == & other) return * this;
571 chan_ = other.chan_;
572 return * this;
573 }
574
575 bool operator==( iterator const& other) const noexcept {
576 return other.chan_ == chan_;
577 }
578
579 bool operator!=( iterator const& other) const noexcept {
580 return other.chan_ != chan_;
581 }
582
583 iterator & operator++() {
584 increment_();
585 return * this;
586 }
587
588 iterator operator++( int) = delete;
589
590 reference_t operator*() noexcept {
591 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
592 }
593
594 pointer_t operator->() noexcept {
595 return reinterpret_cast< value_type * >( std::addressof( storage_) );
596 }
597 };
598
599 friend class iterator;
600};
601
602template< typename T >
603typename unbuffered_channel< T >::iterator
604begin( unbuffered_channel< T > & chan) {
605 return typename unbuffered_channel< T >::iterator( & chan);
606}
607
608template< typename T >
609typename unbuffered_channel< T >::iterator
610end( unbuffered_channel< T > &) {
611 return typename unbuffered_channel< T >::iterator();
612}
613
614}}
615
616#ifdef BOOST_HAS_ABI_HEADERS
617# include BOOST_ABI_SUFFIX
618#endif
619
620#endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H