]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/libs/fiber/examples/asio/round_robin.hpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / examples / asio / round_robin.hpp
index 0b208ab05824a2824696164c8dbfabfe6ffc82d9..b06bb35c4794d63810e3d7b0198f96bcd7008042 100644 (file)
@@ -8,6 +8,7 @@
 
 #include <chrono>
 #include <cstddef>
+#include <memory>
 #include <mutex>
 #include <queue>
 
@@ -34,13 +35,14 @@ namespace asio {
 
 class round_robin : public algo::algorithm {
 private:
-    typedef scheduler::ready_queue_t rqueue_t;
-
 //[asio_rr_suspend_timer
-    boost::asio::io_service                     &   io_svc_;
+    std::shared_ptr< boost::asio::io_service >      io_svc_;
     boost::asio::steady_timer                       suspend_timer_;
 //]
-    rqueue_t                                        rqueue_{};
+    boost::fibers::scheduler::ready_queue_type      rqueue_{};
+    boost::fibers::mutex                            mtx_{};
+    boost::fibers::condition_variable               cnd_{};
+    std::size_t                                     counter_{ 0 };
 
 public:
 //[asio_rr_service_top
@@ -52,26 +54,6 @@ public:
         service( boost::asio::io_service & io_svc) :
             boost::asio::io_service::service( io_svc),
             work_{ new boost::asio::io_service::work( io_svc) } {
-            io_svc.post([&io_svc](){
-//]
-//[asio_rr_service_lambda
-                while ( ! io_svc.stopped() ) {
-                    if ( boost::fibers::has_ready_fibers() ) {
-                        // run all pending handlers in round_robin
-                        while ( io_svc.poll() );
-                        // run pending (ready) fibers
-                        this_fiber::yield();
-                    } else {
-                        // run one handler inside io_service
-                        // if no handler available, block this thread
-                        if ( ! io_svc.run_one() ) {
-                            break;
-                        }
-                    }
-                }
-//]
-//[asio_rr_service_bottom
-            });
         }
 
         virtual ~service() {}
@@ -86,19 +68,43 @@ public:
 //]
 
 //[asio_rr_ctor
-    round_robin( boost::asio::io_service & io_svc) :
+    round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) :
         io_svc_( io_svc),
-        suspend_timer_( io_svc_) {
+        suspend_timer_( io_svc_) {
         // We use add_service() very deliberately. This will throw
         // service_already_exists if you pass the same io_service instance to
         // more than one round_robin instance.
-        boost::asio::add_service( io_svc_, new service( io_svc_));
-    }
+        boost::asio::add_service( * io_svc_, new service( * io_svc_) );
+        io_svc_->post([this]() mutable {
 //]
+//[asio_rr_service_lambda
+                while ( ! io_svc_->stopped() ) {
+                    if ( has_ready_fibers() ) {
+                        // run all pending handlers in round_robin
+                        while ( io_svc_->poll() );
+                        // block this fiber till all pending (ready) fibers are processed
+                        // == round_robin::suspend_until() has been called
+                        std::unique_lock< boost::fibers::mutex > lk( mtx_);
+                        cnd_.wait( lk);
+                    } else {
+                        // run one handler inside io_service
+                        // if no handler available, block this thread
+                        if ( ! io_svc_->run_one() ) {
+                            break;
+                        }
+                    }
+               }
+//]
+            });
+    }
 
     void awakened( context * ctx) noexcept {
         BOOST_ASSERT( nullptr != ctx);
+        BOOST_ASSERT( ! ctx->ready_is_linked() );
         ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
+        if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
+            ++counter_;
+        }
     }
 
     context * pick_next() noexcept {
@@ -110,45 +116,45 @@ public:
             rqueue_.pop_front();
             BOOST_ASSERT( nullptr != ctx);
             BOOST_ASSERT( context::active() != ctx);
+            if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
+                --counter_;
+            }
         }
         return ctx;
     }
 
     bool has_ready_fibers() const noexcept {
-        return ! rqueue_.empty();
+        return 0 < counter_;
     }
 
 //[asio_rr_suspend_until
     void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
         // Set a timer so at least one handler will eventually fire, causing
-        // run_one() to eventually return. Set a timer even if abs_time ==
-        // time_point::max() so the timer can be canceled by our notify()
-        // method -- which calls the handler.
-        if ( suspend_timer_.expires_at() != abs_time) {
-            // Each expires_at(time_point) call cancels any previous pending
-            // call. We could inadvertently spin like this:
-            // dispatcher calls suspend_until() with earliest wake time
-            // suspend_until() sets suspend_timer_
-            // lambda loop calls run_one()
-            // some other asio handler runs before timer expires
-            // run_one() returns to lambda loop
-            // lambda loop yields to dispatcher
-            // dispatcher finds no ready fibers
-            // dispatcher calls suspend_until() with SAME wake time
-            // suspend_until() sets suspend_timer_ to same time, canceling
-            // previous async_wait()
-            // lambda loop calls run_one()
-            // asio calls suspend_timer_ handler with operation_aborted
-            // run_one() returns to lambda loop... etc. etc.
-            // So only actually set the timer when we're passed a DIFFERENT
-            // abs_time value.
+        // run_one() to eventually return.
+        if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
+                       // Each expires_at(time_point) call cancels any previous pending
+                       // call. We could inadvertently spin like this:
+                       // dispatcher calls suspend_until() with earliest wake time
+                       // suspend_until() sets suspend_timer_
+                       // lambda loop calls run_one()
+                       // some other asio handler runs before timer expires
+                       // run_one() returns to lambda loop
+                       // lambda loop yields to dispatcher
+                       // dispatcher finds no ready fibers
+                       // dispatcher calls suspend_until() with SAME wake time
+                       // suspend_until() sets suspend_timer_ to same time, canceling
+                       // previous async_wait()
+                       // lambda loop calls run_one()
+                       // asio calls suspend_timer_ handler with operation_aborted
+                       // run_one() returns to lambda loop... etc. etc.
+                       // So only actually set the timer when we're passed a DIFFERENT
+                       // abs_time value.
             suspend_timer_.expires_at( abs_time);
-            // It really doesn't matter what the suspend_timer_ handler does,
-            // or even whether it's called because the timer ran out or was
-            // canceled. The whole point is to cause the run_one() call to
-            // return. So just pass a no-op lambda with proper signature.
-            suspend_timer_.async_wait([](boost::system::error_code const&){});
+            suspend_timer_.async_wait([](boost::system::error_code const&){
+                                        this_fiber::yield();
+                                      });
         }
+        cnd_.notify_one();
     }
 //]
 
@@ -169,6 +175,9 @@ public:
         // a new expiration time. This will cause us to spin the loop twice --
         // once for the operation_aborted handler, once for timer expiration
         // -- but that shouldn't be a big problem.
+        suspend_timer_.async_wait([](boost::system::error_code const&){
+                                    this_fiber::yield();
+                                  });
         suspend_timer_.expires_at( std::chrono::steady_clock::now() );
     }
 //]