1 // Copyright Oliver Kowalke 2013.
2 // Distributed under the Boost Software License, Version 1.0.
3 // (See accompanying file LICENSE_1_0.txt or copy at
4 // http://www.boost.org/LICENSE_1_0.txt)
6 #ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
7 #define BOOST_FIBERS_ASIO_ROUND_ROBIN_H
15 #include <boost/asio.hpp>
16 #include <boost/assert.hpp>
17 #include <boost/asio/steady_timer.hpp>
18 #include <boost/config.hpp>
20 #include <boost/fiber/condition_variable.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/mutex.hpp>
23 #include <boost/fiber/operations.hpp>
24 #include <boost/fiber/scheduler.hpp>
28 #ifdef BOOST_HAS_ABI_HEADERS
29 # include BOOST_ABI_PREFIX
36 class round_robin : public algo::algorithm {
38 //[asio_rr_suspend_timer
39 std::shared_ptr< boost::asio::io_service > io_svc_;
40 boost::asio::steady_timer suspend_timer_;
42 boost::fibers::scheduler::ready_queue_type rqueue_{};
43 boost::fibers::mutex mtx_{};
44 boost::fibers::condition_variable cnd_{};
45 std::size_t counter_{ 0 };
48 //[asio_rr_service_top
49 struct service : public boost::asio::io_service::service {
50 static boost::asio::io_service::id id;
52 std::unique_ptr< boost::asio::io_service::work > work_;
54 service( boost::asio::io_service & io_svc) :
55 boost::asio::io_service::service( io_svc),
56 work_{ new boost::asio::io_service::work( io_svc) } {
61 service( service const&) = delete;
62 service & operator=( service const&) = delete;
64 void shutdown_service() override final {
71 round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) :
73 suspend_timer_( * io_svc_) {
74 // We use add_service() very deliberately. This will throw
75 // service_already_exists if you pass the same io_service instance to
76 // more than one round_robin instance.
77 boost::asio::add_service( * io_svc_, new service( * io_svc_) );
78 io_svc_->post([this]() mutable {
80 //[asio_rr_service_lambda
81 while ( ! io_svc_->stopped() ) {
82 if ( has_ready_fibers() ) {
83 // run all pending handlers in round_robin
84 while ( io_svc_->poll() );
85 // block this fiber till all pending (ready) fibers are processed
86 // == round_robin::suspend_until() has been called
87 std::unique_lock< boost::fibers::mutex > lk( mtx_);
90 // run one handler inside io_service
91 // if no handler available, block this thread
92 if ( ! io_svc_->run_one() ) {
101 void awakened( context * ctx) noexcept {
102 BOOST_ASSERT( nullptr != ctx);
103 BOOST_ASSERT( ! ctx->ready_is_linked() );
104 ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
105 if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
110 context * pick_next() noexcept {
111 context * ctx( nullptr);
112 if ( ! rqueue_.empty() ) { /*<
113 pop an item from the ready queue
115 ctx = & rqueue_.front();
117 BOOST_ASSERT( nullptr != ctx);
118 BOOST_ASSERT( context::active() != ctx);
119 if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
126 bool has_ready_fibers() const noexcept {
130 //[asio_rr_suspend_until
131 void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
132 // Set a timer so at least one handler will eventually fire, causing
133 // run_one() to eventually return.
134 if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
135 // Each expires_at(time_point) call cancels any previous pending
136 // call. We could inadvertently spin like this:
137 // dispatcher calls suspend_until() with earliest wake time
138 // suspend_until() sets suspend_timer_
139 // lambda loop calls run_one()
140 // some other asio handler runs before timer expires
141 // run_one() returns to lambda loop
142 // lambda loop yields to dispatcher
143 // dispatcher finds no ready fibers
144 // dispatcher calls suspend_until() with SAME wake time
145 // suspend_until() sets suspend_timer_ to same time, canceling
146 // previous async_wait()
147 // lambda loop calls run_one()
148 // asio calls suspend_timer_ handler with operation_aborted
149 // run_one() returns to lambda loop... etc. etc.
150 // So only actually set the timer when we're passed a DIFFERENT
152 suspend_timer_.expires_at( abs_time);
153 suspend_timer_.async_wait([](boost::system::error_code const&){
162 void notify() noexcept {
163 // Something has happened that should wake one or more fibers BEFORE
164 // suspend_timer_ expires. Reset the timer to cause it to fire
165 // immediately, causing the run_one() call to return. In theory we
166 // could use cancel() because we don't care whether suspend_timer_'s
167 // handler is called with operation_aborted or success. However --
168 // cancel() doesn't change the expiration time, and we use
169 // suspend_timer_'s expiration time to decide whether it's already
170 // set. If suspend_until() set some specific wake time, then notify()
171 // canceled it, then suspend_until() was called again with the same
172 // wake time, it would match suspend_timer_'s expiration time and we'd
173 // refrain from setting the timer. So instead of simply calling
174 // cancel(), reset the timer, which cancels the pending sleep AND sets
175 // a new expiration time. This will cause us to spin the loop twice --
176 // once for the operation_aborted handler, once for timer expiration
177 // -- but that shouldn't be a big problem.
178 suspend_timer_.async_wait([](boost::system::error_code const&){
181 suspend_timer_.expires_at( std::chrono::steady_clock::now() );
186 boost::asio::io_service::id round_robin::service::id;
190 #ifdef BOOST_HAS_ABI_HEADERS
191 # include BOOST_ABI_SUFFIX
194 #endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H