]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/boost/fiber/unbuffered_channel.hpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / boost / fiber / unbuffered_channel.hpp
index 2236d9e22fcf2457da1784a16aef06f094d20656..160bcea24f43b028886ee71d891b0b35be4882c2 100644 (file)
@@ -20,6 +20,9 @@
 #include <boost/fiber/context.hpp>
 #include <boost/fiber/detail/config.hpp>
 #include <boost/fiber/detail/convert.hpp>
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+#include <boost/fiber/detail/exchange.hpp>
+#endif
 #include <boost/fiber/detail/spinlock.hpp>
 #include <boost/fiber/exceptions.hpp>
 
@@ -110,35 +113,43 @@ public:
 
     void close() noexcept {
         context * active_ctx = context::active();
-        // notify all waiting producers
-        closed_.store( true, std::memory_order_release);
-        detail::spinlock_lock lk1{ splk_producers_ };
-        while ( ! waiting_producers_.empty() ) {
-            context * producer_ctx = & waiting_producers_.front();
-            waiting_producers_.pop_front();
-            std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
-            if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                // notify context
-                active_ctx->schedule( producer_ctx);
-            } else if ( static_cast< std::intptr_t >( 0) == expected) {
-                // no timed-wait op.
+        // set flag
+        if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
+            // notify current waiting  
+            slot * s = slot_.load( std::memory_order_acquire);
+            if ( nullptr != s) {
                 // notify context
-                active_ctx->schedule( producer_ctx);
+                active_ctx->schedule( s->ctx);
             }
-        }
-        // notify all waiting consumers
-        detail::spinlock_lock lk2{ splk_consumers_ };
-        while ( ! waiting_consumers_.empty() ) {
-            context * consumer_ctx = & waiting_consumers_.front();
-            waiting_consumers_.pop_front();
-            std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
-            if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
-                // notify context
-                active_ctx->schedule( consumer_ctx);
-            } else if ( static_cast< std::intptr_t >( 0) == expected) {
-                // no timed-wait op.
-                // notify context
-                active_ctx->schedule( consumer_ctx);
+            // notify all waiting producers
+            detail::spinlock_lock lk1{ splk_producers_ };
+            while ( ! waiting_producers_.empty() ) {
+                context * producer_ctx = & waiting_producers_.front();
+                waiting_producers_.pop_front();
+                std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+                if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+                    // notify context
+                    active_ctx->schedule( producer_ctx);
+                } else if ( static_cast< std::intptr_t >( 0) == expected) {
+                    // no timed-wait op.
+                    // notify context
+                    active_ctx->schedule( producer_ctx);
+                }
+            }
+            // notify all waiting consumers
+            detail::spinlock_lock lk2{ splk_consumers_ };
+            while ( ! waiting_consumers_.empty() ) {
+                context * consumer_ctx = & waiting_consumers_.front();
+                waiting_consumers_.pop_front();
+                std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+                if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+                    // notify context
+                    active_ctx->schedule( consumer_ctx);
+                } else if ( static_cast< std::intptr_t >( 0) == expected) {
+                    // no timed-wait op.
+                    // notify context
+                    active_ctx->schedule( consumer_ctx);
+                }
             }
         }
     }
@@ -170,8 +181,14 @@ public:
                 }
                 // suspend till value has been consumed
                 active_ctx->suspend( lk);
-                // resumed, value has been consumed
-                return channel_op_status::success;
+                // resumed
+                if ( nullptr == s.ctx) {
+                    // value has been consumed
+                    return channel_op_status::success;
+                } else {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
+                }
             } else {
                 detail::spinlock_lock lk{ splk_producers_ };
                 if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -216,8 +233,14 @@ public:
                 }
                 // suspend till value has been consumed
                 active_ctx->suspend( lk);
-                // resumed, value has been consumed
-                return channel_op_status::success;
+                // resumed
+                if ( nullptr == s.ctx) {
+                    // value has been consumed
+                    return channel_op_status::success;
+                } else {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
+                }
             } else {
                 detail::spinlock_lock lk{ splk_producers_ };
                 if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -286,8 +309,14 @@ public:
                     // resumed, value has not been consumed
                     return channel_op_status::timeout;
                 }
-                // resumed, value has been consumed
-                return channel_op_status::success;
+                // resumed
+                if ( nullptr == s.ctx) {
+                    // value has been consumed
+                    return channel_op_status::success;
+                } else {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
+                }
             } else {
                 detail::spinlock_lock lk{ splk_producers_ };
                 if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -348,8 +377,14 @@ public:
                     // resumed, value has not been consumed
                     return channel_op_status::timeout;
                 }
-                // resumed, value has been consumed
-                return channel_op_status::success;
+                // resumed
+                if ( nullptr == s.ctx) {
+                    // value has been consumed
+                    return channel_op_status::success;
+                } else {
+                    // channel was closed before value was consumed
+                    return channel_op_status::closed;
+                }
             } else {
                 detail::spinlock_lock lk{ splk_producers_ };
                 if ( BOOST_UNLIKELY( is_closed() ) ) {
@@ -384,13 +419,14 @@ public:
                     while ( ! waiting_producers_.empty() ) {
                         context * producer_ctx = & waiting_producers_.front();
                         waiting_producers_.pop_front();
-                        lk.unlock();
                         std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+                            lk.unlock();
                             // notify context
                             active_ctx->schedule( producer_ctx);
                             break;
                         } else if ( static_cast< std::intptr_t >( 0) == expected) {
+                            lk.unlock();
                             // no timed-wait op.
                             // notify context
                             active_ctx->schedule( producer_ctx);
@@ -400,7 +436,11 @@ public:
                 }
                 value = std::move( s->value);
                 // notify context
-                active_ctx->schedule( s->ctx);
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
+#else
+                active_ctx->schedule( std::exchange( s->ctx, nullptr) );
+#endif
                 return channel_op_status::success;
             } else {
                 detail::spinlock_lock lk{ splk_consumers_ };
@@ -430,13 +470,14 @@ public:
                     while ( ! waiting_producers_.empty() ) {
                         context * producer_ctx = & waiting_producers_.front();
                         waiting_producers_.pop_front();
-                        lk.unlock();
                         std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+                            lk.unlock();
                             // notify context
                             active_ctx->schedule( producer_ctx);
                             break;
                         } else if ( static_cast< std::intptr_t >( 0) == expected) {
+                            lk.unlock();
                             // no timed-wait op.
                             // notify context
                             active_ctx->schedule( producer_ctx);
@@ -447,7 +488,11 @@ public:
                 // consume value
                 value_type value = std::move( s->value);
                 // notify context
-                active_ctx->schedule( s->ctx);
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
+#else
+                active_ctx->schedule( std::exchange( s->ctx, nullptr) );
+#endif
                 return std::move( value);
             } else {
                 detail::spinlock_lock lk{ splk_consumers_ };
@@ -489,13 +534,14 @@ public:
                     while ( ! waiting_producers_.empty() ) {
                         context * producer_ctx = & waiting_producers_.front();
                         waiting_producers_.pop_front();
-                        lk.unlock();
                         std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+                            lk.unlock();
                             // notify context
                             active_ctx->schedule( producer_ctx);
                             break;
                         } else if ( static_cast< std::intptr_t >( 0) == expected) {
+                            lk.unlock();
                             // no timed-wait op.
                             // notify context
                             active_ctx->schedule( producer_ctx);
@@ -506,7 +552,11 @@ public:
                 // consume value
                 value = std::move( s->value);
                 // notify context
-                active_ctx->schedule( s->ctx);
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
+#else
+                active_ctx->schedule( std::exchange( s->ctx, nullptr) );
+#endif
                 return channel_op_status::success;
             } else {
                 detail::spinlock_lock lk{ splk_consumers_ };
@@ -581,6 +631,7 @@ public:
         }
 
         iterator & operator++() {
+            reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
             increment_();
             return * this;
         }