#include <chrono>
#include <cstddef>
+#include <memory>
#include <mutex>
#include <queue>
class round_robin : public algo::algorithm {
private:
- typedef scheduler::ready_queue_t rqueue_t;
-
//[asio_rr_suspend_timer
- boost::asio::io_service & io_svc_;
+ std::shared_ptr< boost::asio::io_service > io_svc_;
boost::asio::steady_timer suspend_timer_;
//]
- rqueue_t rqueue_{};
+ boost::fibers::scheduler::ready_queue_type rqueue_{};
+ boost::fibers::mutex mtx_{};
+ boost::fibers::condition_variable cnd_{};
+ std::size_t counter_{ 0 };
public:
//[asio_rr_service_top
service( boost::asio::io_service & io_svc) :
boost::asio::io_service::service( io_svc),
work_{ new boost::asio::io_service::work( io_svc) } {
- io_svc.post([&io_svc](){
-//]
-//[asio_rr_service_lambda
- while ( ! io_svc.stopped() ) {
- if ( boost::fibers::has_ready_fibers() ) {
- // run all pending handlers in round_robin
- while ( io_svc.poll() );
- // run pending (ready) fibers
- this_fiber::yield();
- } else {
- // run one handler inside io_service
- // if no handler available, block this thread
- if ( ! io_svc.run_one() ) {
- break;
- }
- }
- }
-//]
-//[asio_rr_service_bottom
- });
}
virtual ~service() {}
//]
//[asio_rr_ctor
- round_robin( boost::asio::io_service & io_svc) :
+ round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) :
io_svc_( io_svc),
- suspend_timer_( io_svc_) {
+ suspend_timer_( * io_svc_) {
// We use add_service() very deliberately. This will throw
// service_already_exists if you pass the same io_service instance to
// more than one round_robin instance.
- boost::asio::add_service( io_svc_, new service( io_svc_));
- }
+ boost::asio::add_service( * io_svc_, new service( * io_svc_) );
+ io_svc_->post([this]() mutable {
//]
+//[asio_rr_service_lambda
+ while ( ! io_svc_->stopped() ) {
+ if ( has_ready_fibers() ) {
+ // run all pending handlers in round_robin
+ while ( io_svc_->poll() );
+ // block this fiber till all pending (ready) fibers are processed
+ // == round_robin::suspend_until() has been called
+ std::unique_lock< boost::fibers::mutex > lk( mtx_);
+ cnd_.wait( lk);
+ } else {
+ // run one handler inside io_service
+ // if no handler available, block this thread
+ if ( ! io_svc_->run_one() ) {
+ break;
+ }
+ }
+ }
+//]
+ });
+ }
void awakened( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
+ BOOST_ASSERT( ! ctx->ready_is_linked() );
ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
+ if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
+ ++counter_;
+ }
}
context * pick_next() noexcept {
rqueue_.pop_front();
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( context::active() != ctx);
+ if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
+ --counter_;
+ }
}
return ctx;
}
bool has_ready_fibers() const noexcept {
- return ! rqueue_.empty();
+ return 0 < counter_;
}
//[asio_rr_suspend_until
void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
// Set a timer so at least one handler will eventually fire, causing
- // run_one() to eventually return. Set a timer even if abs_time ==
- // time_point::max() so the timer can be canceled by our notify()
- // method -- which calls the handler.
- if ( suspend_timer_.expires_at() != abs_time) {
- // Each expires_at(time_point) call cancels any previous pending
- // call. We could inadvertently spin like this:
- // dispatcher calls suspend_until() with earliest wake time
- // suspend_until() sets suspend_timer_
- // lambda loop calls run_one()
- // some other asio handler runs before timer expires
- // run_one() returns to lambda loop
- // lambda loop yields to dispatcher
- // dispatcher finds no ready fibers
- // dispatcher calls suspend_until() with SAME wake time
- // suspend_until() sets suspend_timer_ to same time, canceling
- // previous async_wait()
- // lambda loop calls run_one()
- // asio calls suspend_timer_ handler with operation_aborted
- // run_one() returns to lambda loop... etc. etc.
- // So only actually set the timer when we're passed a DIFFERENT
- // abs_time value.
+ // run_one() to eventually return.
+ if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
+ // Each expires_at(time_point) call cancels any previous pending
+ // call. We could inadvertently spin like this:
+ // dispatcher calls suspend_until() with earliest wake time
+ // suspend_until() sets suspend_timer_
+ // lambda loop calls run_one()
+ // some other asio handler runs before timer expires
+ // run_one() returns to lambda loop
+ // lambda loop yields to dispatcher
+ // dispatcher finds no ready fibers
+ // dispatcher calls suspend_until() with SAME wake time
+ // suspend_until() sets suspend_timer_ to same time, canceling
+ // previous async_wait()
+ // lambda loop calls run_one()
+ // asio calls suspend_timer_ handler with operation_aborted
+ // run_one() returns to lambda loop... etc. etc.
+ // So only actually set the timer when we're passed a DIFFERENT
+ // abs_time value.
suspend_timer_.expires_at( abs_time);
- // It really doesn't matter what the suspend_timer_ handler does,
- // or even whether it's called because the timer ran out or was
- // canceled. The whole point is to cause the run_one() call to
- // return. So just pass a no-op lambda with proper signature.
- suspend_timer_.async_wait([](boost::system::error_code const&){});
+ suspend_timer_.async_wait([](boost::system::error_code const&){
+ this_fiber::yield();
+ });
}
+ cnd_.notify_one();
}
//]
// a new expiration time. This will cause us to spin the loop twice --
// once for the operation_aborted handler, once for timer expiration
// -- but that shouldn't be a big problem.
+ suspend_timer_.async_wait([](boost::system::error_code const&){
+ this_fiber::yield();
+ });
suspend_timer_.expires_at( std::chrono::steady_clock::now() );
}
//]