X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fboost%2Flibs%2Ffiber%2Fexamples%2Fasio%2Fround_robin.hpp;h=b06bb35c4794d63810e3d7b0198f96bcd7008042;hb=b32b81446b3b05102be0267e79203f59329c1d97;hp=0b208ab05824a2824696164c8dbfabfe6ffc82d9;hpb=215dd7151453fae88e6f968c975b6ce309d42dcf;p=ceph.git diff --git a/ceph/src/boost/libs/fiber/examples/asio/round_robin.hpp b/ceph/src/boost/libs/fiber/examples/asio/round_robin.hpp index 0b208ab05..b06bb35c4 100644 --- a/ceph/src/boost/libs/fiber/examples/asio/round_robin.hpp +++ b/ceph/src/boost/libs/fiber/examples/asio/round_robin.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -34,13 +35,14 @@ namespace asio { class round_robin : public algo::algorithm { private: - typedef scheduler::ready_queue_t rqueue_t; - //[asio_rr_suspend_timer - boost::asio::io_service & io_svc_; + std::shared_ptr< boost::asio::io_service > io_svc_; boost::asio::steady_timer suspend_timer_; //] - rqueue_t rqueue_{}; + boost::fibers::scheduler::ready_queue_type rqueue_{}; + boost::fibers::mutex mtx_{}; + boost::fibers::condition_variable cnd_{}; + std::size_t counter_{ 0 }; public: //[asio_rr_service_top @@ -52,26 +54,6 @@ public: service( boost::asio::io_service & io_svc) : boost::asio::io_service::service( io_svc), work_{ new boost::asio::io_service::work( io_svc) } { - io_svc.post([&io_svc](){ -//] -//[asio_rr_service_lambda - while ( ! io_svc.stopped() ) { - if ( boost::fibers::has_ready_fibers() ) { - // run all pending handlers in round_robin - while ( io_svc.poll() ); - // run pending (ready) fibers - this_fiber::yield(); - } else { - // run one handler inside io_service - // if no handler available, block this thread - if ( ! io_svc.run_one() ) { - break; - } - } - } -//] -//[asio_rr_service_bottom - }); } virtual ~service() {} @@ -86,19 +68,43 @@ public: //] //[asio_rr_ctor - round_robin( boost::asio::io_service & io_svc) : + round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) : io_svc_( io_svc), - suspend_timer_( io_svc_) { + suspend_timer_( * io_svc_) { // We use add_service() very deliberately. This will throw // service_already_exists if you pass the same io_service instance to // more than one round_robin instance. - boost::asio::add_service( io_svc_, new service( io_svc_)); - } + boost::asio::add_service( * io_svc_, new service( * io_svc_) ); + io_svc_->post([this]() mutable { //] +//[asio_rr_service_lambda + while ( ! io_svc_->stopped() ) { + if ( has_ready_fibers() ) { + // run all pending handlers in round_robin + while ( io_svc_->poll() ); + // block this fiber till all pending (ready) fibers are processed + // == round_robin::suspend_until() has been called + std::unique_lock< boost::fibers::mutex > lk( mtx_); + cnd_.wait( lk); + } else { + // run one handler inside io_service + // if no handler available, block this thread + if ( ! io_svc_->run_one() ) { + break; + } + } + } +//] + }); + } void awakened( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ! ctx->ready_is_linked() ); ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/ + if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { + ++counter_; + } } context * pick_next() noexcept { @@ -110,45 +116,45 @@ public: rqueue_.pop_front(); BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( context::active() != ctx); + if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { + --counter_; + } } return ctx; } bool has_ready_fibers() const noexcept { - return ! rqueue_.empty(); + return 0 < counter_; } //[asio_rr_suspend_until void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept { // Set a timer so at least one handler will eventually fire, causing - // run_one() to eventually return. Set a timer even if abs_time == - // time_point::max() so the timer can be canceled by our notify() - // method -- which calls the handler. - if ( suspend_timer_.expires_at() != abs_time) { - // Each expires_at(time_point) call cancels any previous pending - // call. We could inadvertently spin like this: - // dispatcher calls suspend_until() with earliest wake time - // suspend_until() sets suspend_timer_ - // lambda loop calls run_one() - // some other asio handler runs before timer expires - // run_one() returns to lambda loop - // lambda loop yields to dispatcher - // dispatcher finds no ready fibers - // dispatcher calls suspend_until() with SAME wake time - // suspend_until() sets suspend_timer_ to same time, canceling - // previous async_wait() - // lambda loop calls run_one() - // asio calls suspend_timer_ handler with operation_aborted - // run_one() returns to lambda loop... etc. etc. - // So only actually set the timer when we're passed a DIFFERENT - // abs_time value. + // run_one() to eventually return. + if ( (std::chrono::steady_clock::time_point::max)() != abs_time) { + // Each expires_at(time_point) call cancels any previous pending + // call. We could inadvertently spin like this: + // dispatcher calls suspend_until() with earliest wake time + // suspend_until() sets suspend_timer_ + // lambda loop calls run_one() + // some other asio handler runs before timer expires + // run_one() returns to lambda loop + // lambda loop yields to dispatcher + // dispatcher finds no ready fibers + // dispatcher calls suspend_until() with SAME wake time + // suspend_until() sets suspend_timer_ to same time, canceling + // previous async_wait() + // lambda loop calls run_one() + // asio calls suspend_timer_ handler with operation_aborted + // run_one() returns to lambda loop... etc. etc. + // So only actually set the timer when we're passed a DIFFERENT + // abs_time value. suspend_timer_.expires_at( abs_time); - // It really doesn't matter what the suspend_timer_ handler does, - // or even whether it's called because the timer ran out or was - // canceled. The whole point is to cause the run_one() call to - // return. So just pass a no-op lambda with proper signature. - suspend_timer_.async_wait([](boost::system::error_code const&){}); + suspend_timer_.async_wait([](boost::system::error_code const&){ + this_fiber::yield(); + }); } + cnd_.notify_one(); } //] @@ -169,6 +175,9 @@ public: // a new expiration time. This will cause us to spin the loop twice -- // once for the operation_aborted handler, once for timer expiration // -- but that shouldn't be a big problem. + suspend_timer_.async_wait([](boost::system::error_code const&){ + this_fiber::yield(); + }); suspend_timer_.expires_at( std::chrono::steady_clock::now() ); } //]