]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/boost/fiber/unbuffered_channel.hpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / boost / fiber / unbuffered_channel.hpp
index 40d17e4e6d0148bb5d775f82ea7b79a229916269..91aa5f12d0c3639146b3fcb78d70d67fbc6d6652 100644 (file)
@@ -25,6 +25,7 @@
 #endif
 #include <boost/fiber/detail/spinlock.hpp>
 #include <boost/fiber/exceptions.hpp>
+#include <boost/fiber/waker.hpp>
 
 #ifdef BOOST_HAS_ABI_HEADERS
 #  include BOOST_ABI_PREFIX
@@ -39,20 +40,18 @@ public:
     using value_type = typename std::remove_reference<T>::type;
 
 private:
-    using wait_queue_type = context::wait_queue_t;
-
     struct slot {
         value_type  value;
-        context *   ctx;
+        waker       w;
 
-        slot( value_type const& value_, context * ctx_) :
+        slot( value_type const& value_, waker && w) :
             value{ value_ },
-            ctx{ ctx_ } {
+            w{ std::move(w) } {
         }
 
-        slot( value_type && value_, context * ctx_) :
+        slot( value_type && value_, waker && w) :
             value{ std::move( value_) },
-            ctx{ ctx_ } {
+            w{ std::move(w) } {
         }
     };
 
@@ -61,9 +60,9 @@ private:
     // shared cacheline
     std::atomic_bool            closed_{ false };
     mutable detail::spinlock    splk_producers_{};
-    wait_queue_type             waiting_producers_{};
+    wait_queue                  waiting_producers_{};
     mutable detail::spinlock    splk_consumers_{};
-    wait_queue_type             waiting_consumers_{};
+    wait_queue                  waiting_consumers_{};
     char                        pad_[cacheline_length];
 
     bool is_empty_() {
@@ -110,83 +109,41 @@ public:
     }
 
     void close() noexcept {
-        context * active_ctx = context::active();
         // set flag
         if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
             // notify current waiting  
             slot * s = slot_.load( std::memory_order_acquire);
             if ( nullptr != s) {
                 // notify context
-                active_ctx->schedule( s->ctx);
+                s->w.wake();
             }
-            // notify all waiting producers
             detail::spinlock_lock lk1{ splk_producers_ };
-            while ( ! waiting_producers_.empty() ) {
-                context * producer_ctx = & waiting_producers_.front();
-                waiting_producers_.pop_front();
-                auto expected = reinterpret_cast< std::intptr_t >( this);
-                if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                    // notify context
-                    active_ctx->schedule( producer_ctx);
-                } else if ( static_cast< std::intptr_t >( 0) == expected) {
-                    // no timed-wait op.
-                    // notify context
-                    active_ctx->schedule( producer_ctx);
-                }
-            }
-            // notify all waiting consumers
+            waiting_producers_.notify_all();
+
             detail::spinlock_lock lk2{ splk_consumers_ };
-            while ( ! waiting_consumers_.empty() ) {
-                context * consumer_ctx = & waiting_consumers_.front();
-                waiting_consumers_.pop_front();
-                auto expected = reinterpret_cast< std::intptr_t >( this);
-                if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                    // notify context
-                    active_ctx->schedule( consumer_ctx);
-                } else if ( static_cast< std::intptr_t >( 0) == expected) {
-                    // no timed-wait op.
-                    // notify context
-                    active_ctx->schedule( consumer_ctx);
-                }
-            }
+            waiting_consumers_.notify_all();
         }
     }
 
     channel_op_status push( value_type const& value) {
         context * active_ctx = context::active();
-        slot s{ value, active_ctx };
+        slot s{ value, active_ctx->create_waker() };
         for (;;) {
             if ( BOOST_UNLIKELY( is_closed() ) ) {
                 return channel_op_status::closed;
             }
             if ( try_push_( & s) ) {
                 detail::spinlock_lock lk{ splk_consumers_ };
-                // notify one waiting consumer
-                while ( ! waiting_consumers_.empty() ) {
-                    context * consumer_ctx = & waiting_consumers_.front();
-                    waiting_consumers_.pop_front();
-                    auto expected = reinterpret_cast< std::intptr_t >( this);
-                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    }
-                    if ( static_cast< std::intptr_t >( 0) == expected) {
-                        // no timed-wait op.
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    }
-                }
+                waiting_consumers_.notify_one();
                 // suspend till value has been consumed
                 active_ctx->suspend( lk);
                 // resumed
-                if ( nullptr == s.ctx) {
-                    // value has been consumed
-                    return channel_op_status::success;
+                if ( BOOST_UNLIKELY( is_closed() ) ) {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
                 }
-                // channel was closed before value was consumed
-                return channel_op_status::closed;
+                // value has been consumed
+                return channel_op_status::success;
             }
             detail::spinlock_lock lk{ splk_producers_ };
             if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -195,48 +152,31 @@ public:
             if ( is_empty_() ) {
                 continue;
             }
-            active_ctx->wait_link( waiting_producers_);
-            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
-            // suspend this producer
-            active_ctx->suspend( lk);
+
+            waiting_producers_.suspend_and_wait( lk, active_ctx);
             // resumed, slot mabye free
         }
     }
 
     channel_op_status push( value_type && value) {
         context * active_ctx = context::active();
-        slot s{ std::move( value), active_ctx };
+        slot s{ std::move( value), active_ctx->create_waker() };
         for (;;) {
             if ( BOOST_UNLIKELY( is_closed() ) ) {
                 return channel_op_status::closed;
             }
             if ( try_push_( & s) ) {
                 detail::spinlock_lock lk{ splk_consumers_ };
-                // notify one waiting consumer
-                while ( ! waiting_consumers_.empty() ) {
-                    context * consumer_ctx = & waiting_consumers_.front();
-                    waiting_consumers_.pop_front();
-                    auto expected = reinterpret_cast< std::intptr_t >( this);
-                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    } if ( static_cast< std::intptr_t >( 0) == expected) {
-                        // no timed-wait op.
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    }
-                }
+                waiting_consumers_.notify_one();
                 // suspend till value has been consumed
                 active_ctx->suspend( lk);
                 // resumed
-                if ( nullptr == s.ctx) {
-                    // value has been consumed
-                    return channel_op_status::success;
+                if ( BOOST_UNLIKELY( is_closed() ) ) {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
                 }
-                // channel was closed before value was consumed
-                return channel_op_status::closed;
+                // value has been consumed
+                return channel_op_status::success;
             }
             detail::spinlock_lock lk{ splk_producers_ };
             if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -245,10 +185,7 @@ public:
             if ( is_empty_() ) {
                 continue;
             }
-            active_ctx->wait_link( waiting_producers_);
-            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
-            // suspend this producer
-            active_ctx->suspend( lk);
+            waiting_producers_.suspend_and_wait( lk, active_ctx);
             // resumed, slot mabye free
         }
     }
@@ -271,7 +208,7 @@ public:
     channel_op_status push_wait_until( value_type const& value,
                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
         context * active_ctx = context::active();
-        slot s{ value, active_ctx };
+        slot s{ value, active_ctx->create_waker() };
         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
         for (;;) {
             if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -279,26 +216,9 @@ public:
             }
             if ( try_push_( & s) ) {
                 detail::spinlock_lock lk{ splk_consumers_ };
-                // notify one waiting consumer
-                while ( ! waiting_consumers_.empty() ) {
-                    context * consumer_ctx = & waiting_consumers_.front();
-                    waiting_consumers_.pop_front();
-                    auto expected = reinterpret_cast< std::intptr_t >( this);
-                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    }
-                    if ( static_cast< std::intptr_t >( 0) == expected) {
-                        // no timed-wait op.
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    }
-                }
+                waiting_consumers_.notify_one();
                 // suspend this producer
-                active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
-                if ( ! active_ctx->wait_until( timeout_time, lk) ) {
+                if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
                     // clear slot
                     slot * nil_slot = nullptr, * own_slot = & s;
                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
@@ -306,12 +226,12 @@ public:
                     return channel_op_status::timeout;
                 }
                 // resumed
-                if ( nullptr == s.ctx) {
-                    // value has been consumed
-                    return channel_op_status::success;
+                if ( BOOST_UNLIKELY( is_closed() ) ) {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
                 }
-                // channel was closed before value was consumed
-                return channel_op_status::closed;
+                // value has been consumed
+                return channel_op_status::success;
             }
             detail::spinlock_lock lk{ splk_producers_ };
             if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -320,14 +240,9 @@ public:
             if ( is_empty_() ) {
                 continue;
             }
-            active_ctx->wait_link( waiting_producers_);
-            active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
-            // suspend this producer
-            if ( ! active_ctx->wait_until( timeout_time, lk) ) {
-                // relock local lk
-                lk.lock();
-                // remove from waiting-queue
-                waiting_producers_.remove( * active_ctx);
+
+            if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
+            {
                 return channel_op_status::timeout;
             }
             // resumed, slot maybe free
@@ -338,7 +253,7 @@ public:
     channel_op_status push_wait_until( value_type && value,
                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
         context * active_ctx = context::active();
-        slot s{ std::move( value), active_ctx };
+        slot s{ std::move( value), active_ctx->create_waker() };
         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
         for (;;) {
             if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -346,25 +261,9 @@ public:
             }
             if ( try_push_( & s) ) {
                 detail::spinlock_lock lk{ splk_consumers_ };
-                // notify one waiting consumer
-                while ( ! waiting_consumers_.empty() ) {
-                    context * consumer_ctx = & waiting_consumers_.front();
-                    waiting_consumers_.pop_front();
-                    auto expected = reinterpret_cast< std::intptr_t >( this);
-                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    } if ( static_cast< std::intptr_t >( 0) == expected) {
-                        // no timed-wait op.
-                        // notify context
-                        active_ctx->schedule( consumer_ctx);
-                        break;
-                    }
-                }
+                waiting_consumers_.notify_one();
                 // suspend this producer
-                active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
-                if ( ! active_ctx->wait_until( timeout_time, lk) ) {
+                if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
                     // clear slot
                     slot * nil_slot = nullptr, * own_slot = & s;
                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
@@ -372,12 +271,12 @@ public:
                     return channel_op_status::timeout;
                 }
                 // resumed
-                if ( nullptr == s.ctx) {
-                    // value has been consumed
-                    return channel_op_status::success;
+                if ( BOOST_UNLIKELY( is_closed() ) ) {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
                 }
-                // channel was closed before value was consumed
-                return channel_op_status::closed;
+                // value has been consumed
+                return channel_op_status::success;
             }
             detail::spinlock_lock lk{ splk_producers_ };
             if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -386,14 +285,8 @@ public:
             if ( is_empty_() ) {
                 continue;
             }
-            active_ctx->wait_link( waiting_producers_);
-            active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
-            // suspend this producer
-            if ( ! active_ctx->wait_until( timeout_time, lk) ) {
-                // relock local lk
-                lk.lock();
-                // remove from waiting-queue
-                waiting_producers_.remove( * active_ctx);
+            if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
+            {
                 return channel_op_status::timeout;
             }
             // resumed, slot maybe free
@@ -407,32 +300,11 @@ public:
             if ( nullptr != ( s = try_pop_() ) ) {
                 {
                     detail::spinlock_lock lk{ splk_producers_ };
-                    // notify one waiting producer
-                    while ( ! waiting_producers_.empty() ) {
-                        context * producer_ctx = & waiting_producers_.front();
-                        waiting_producers_.pop_front();
-                        auto expected = reinterpret_cast< std::intptr_t >( this);
-                        if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                            lk.unlock();
-                            // notify context
-                            active_ctx->schedule( producer_ctx);
-                            break;
-                        } if ( static_cast< std::intptr_t >( 0) == expected) {
-                            lk.unlock();
-                            // no timed-wait op.
-                            // notify context
-                            active_ctx->schedule( producer_ctx);
-                            break;
-                        }
-                    }
+                    waiting_producers_.notify_one();
                 }
                 value = std::move( s->value);
                 // notify context
-#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
-                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
-#else
-                active_ctx->schedule( std::exchange( s->ctx, nullptr) );
-#endif
+                s->w.wake();
                 return channel_op_status::success;
             }
             detail::spinlock_lock lk{ splk_consumers_ };
@@ -442,10 +314,7 @@ public:
             if ( ! is_empty_() ) {
                 continue;
             }
-            active_ctx->wait_link( waiting_consumers_);
-            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
-            // suspend this consumer
-            active_ctx->suspend( lk);
+            waiting_consumers_.suspend_and_wait( lk, active_ctx);
             // resumed, slot mabye set
         }
     }
@@ -457,33 +326,12 @@ public:
             if ( nullptr != ( s = try_pop_() ) ) {
                 {
                     detail::spinlock_lock lk{ splk_producers_ };
-                    // notify one waiting producer
-                    while ( ! waiting_producers_.empty() ) {
-                        context * producer_ctx = & waiting_producers_.front();
-                        waiting_producers_.pop_front();
-                        auto expected = reinterpret_cast< std::intptr_t >( this);
-                        if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                            lk.unlock();
-                            // notify context
-                            active_ctx->schedule( producer_ctx);
-                            break;
-                        } if ( static_cast< std::intptr_t >( 0) == expected) {
-                            lk.unlock();
-                            // no timed-wait op.
-                            // notify context
-                            active_ctx->schedule( producer_ctx);
-                            break;
-                        }
-                    }
+                    waiting_producers_.notify_one();
                 }
                 // consume value
                 value_type value = std::move( s->value);
                 // notify context
-#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
-                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
-#else
-                active_ctx->schedule( std::exchange( s->ctx, nullptr) );
-#endif
+                s->w.wake();
                 return std::move( value);
             }
             detail::spinlock_lock lk{ splk_consumers_ };
@@ -495,10 +343,7 @@ public:
             if ( ! is_empty_() ) {
                 continue;
             }
-            active_ctx->wait_link( waiting_consumers_);
-            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
-            // suspend this consumer
-            active_ctx->suspend( lk);
+            waiting_consumers_.suspend_and_wait( lk, active_ctx);
             // resumed, slot mabye set
         }
     }
@@ -520,34 +365,12 @@ public:
             if ( nullptr != ( s = try_pop_() ) ) {
                 {
                     detail::spinlock_lock lk{ splk_producers_ };
-                    // notify one waiting producer
-                    while ( ! waiting_producers_.empty() ) {
-                        context * producer_ctx = & waiting_producers_.front();
-                        waiting_producers_.pop_front();
-                        auto expected = reinterpret_cast< std::intptr_t >( this);
-                        if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                            lk.unlock();
-                            // notify context
-                            active_ctx->schedule( producer_ctx);
-                            break;
-                        }
-                        if ( static_cast< std::intptr_t >( 0) == expected) {
-                            lk.unlock();
-                            // no timed-wait op.
-                            // notify context
-                            active_ctx->schedule( producer_ctx);
-                            break;
-                        }
-                    }
+                    waiting_producers_.notify_one();
                 }
                 // consume value
                 value = std::move( s->value);
                 // notify context
-#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
-                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
-#else
-                active_ctx->schedule( std::exchange( s->ctx, nullptr) );
-#endif
+                s->w.wake();
                 return channel_op_status::success;
             }
             detail::spinlock_lock lk{ splk_consumers_ };
@@ -557,14 +380,7 @@ public:
             if ( ! is_empty_() ) {
                 continue;
             }
-            active_ctx->wait_link( waiting_consumers_);
-            active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
-            // suspend this consumer
-            if ( ! active_ctx->wait_until( timeout_time, lk) ) {
-                // relock local lk
-                lk.lock();
-                // remove from waiting-queue
-                waiting_consumers_.remove( * active_ctx);
+            if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
                 return channel_op_status::timeout;
             }
         }
@@ -577,9 +393,12 @@ public:
         unbuffered_channel  *   chan_{ nullptr };
         storage_type            storage_;
 
-        void increment_() {
+        void increment_( bool initial = false) {
             BOOST_ASSERT( nullptr != chan_);
             try {
+                if ( ! initial) {
+                    reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
+                }
                 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
             } catch ( fiber_error const&) {
                 chan_ = nullptr;
@@ -599,7 +418,7 @@ public:
 
         explicit iterator( unbuffered_channel< T > * chan) noexcept :
             chan_{ chan } {
-            increment_();
+            increment_( true);
         }
 
         iterator( iterator const& other) noexcept :