]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright Oliver Kowalke 2013. |
2 | // Distributed under the Boost Software License, Version 1.0. | |
3 | // (See accompanying file LICENSE_1_0.txt or copy at | |
4 | // http://www.boost.org/LICENSE_1_0.txt) | |
5 | ||
6 | #ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H | |
7 | #define BOOST_FIBERS_ASIO_ROUND_ROBIN_H | |
8 | ||
9 | #include <chrono> | |
10 | #include <cstddef> | |
b32b8144 | 11 | #include <memory> |
7c673cae FG |
12 | #include <mutex> |
13 | #include <queue> | |
14 | ||
15 | #include <boost/asio.hpp> | |
16 | #include <boost/assert.hpp> | |
17 | #include <boost/asio/steady_timer.hpp> | |
18 | #include <boost/config.hpp> | |
19 | ||
20 | #include <boost/fiber/condition_variable.hpp> | |
21 | #include <boost/fiber/context.hpp> | |
22 | #include <boost/fiber/mutex.hpp> | |
23 | #include <boost/fiber/operations.hpp> | |
24 | #include <boost/fiber/scheduler.hpp> | |
25 | ||
26 | #include "yield.hpp" | |
27 | ||
28 | #ifdef BOOST_HAS_ABI_HEADERS | |
29 | # include BOOST_ABI_PREFIX | |
30 | #endif | |
31 | ||
32 | namespace boost { | |
33 | namespace fibers { | |
34 | namespace asio { | |
35 | ||
36 | class round_robin : public algo::algorithm { | |
37 | private: | |
7c673cae | 38 | //[asio_rr_suspend_timer |
b32b8144 | 39 | std::shared_ptr< boost::asio::io_service > io_svc_; |
7c673cae FG |
40 | boost::asio::steady_timer suspend_timer_; |
41 | //] | |
b32b8144 FG |
42 | boost::fibers::scheduler::ready_queue_type rqueue_{}; |
43 | boost::fibers::mutex mtx_{}; | |
44 | boost::fibers::condition_variable cnd_{}; | |
45 | std::size_t counter_{ 0 }; | |
7c673cae FG |
46 | |
47 | public: | |
48 | //[asio_rr_service_top | |
49 | struct service : public boost::asio::io_service::service { | |
50 | static boost::asio::io_service::id id; | |
51 | ||
52 | std::unique_ptr< boost::asio::io_service::work > work_; | |
53 | ||
54 | service( boost::asio::io_service & io_svc) : | |
55 | boost::asio::io_service::service( io_svc), | |
56 | work_{ new boost::asio::io_service::work( io_svc) } { | |
7c673cae FG |
57 | } |
58 | ||
59 | virtual ~service() {} | |
60 | ||
61 | service( service const&) = delete; | |
62 | service & operator=( service const&) = delete; | |
63 | ||
64 | void shutdown_service() override final { | |
65 | work_.reset(); | |
66 | } | |
67 | }; | |
68 | //] | |
69 | ||
70 | //[asio_rr_ctor | |
b32b8144 | 71 | round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) : |
7c673cae | 72 | io_svc_( io_svc), |
b32b8144 | 73 | suspend_timer_( * io_svc_) { |
7c673cae FG |
74 | // We use add_service() very deliberately. This will throw |
75 | // service_already_exists if you pass the same io_service instance to | |
76 | // more than one round_robin instance. | |
b32b8144 FG |
77 | boost::asio::add_service( * io_svc_, new service( * io_svc_) ); |
78 | io_svc_->post([this]() mutable { | |
7c673cae | 79 | //] |
b32b8144 FG |
80 | //[asio_rr_service_lambda |
81 | while ( ! io_svc_->stopped() ) { | |
82 | if ( has_ready_fibers() ) { | |
83 | // run all pending handlers in round_robin | |
84 | while ( io_svc_->poll() ); | |
85 | // block this fiber till all pending (ready) fibers are processed | |
86 | // == round_robin::suspend_until() has been called | |
87 | std::unique_lock< boost::fibers::mutex > lk( mtx_); | |
88 | cnd_.wait( lk); | |
89 | } else { | |
90 | // run one handler inside io_service | |
91 | // if no handler available, block this thread | |
92 | if ( ! io_svc_->run_one() ) { | |
93 | break; | |
94 | } | |
95 | } | |
96 | } | |
97 | //] | |
98 | }); | |
99 | } | |
7c673cae FG |
100 | |
101 | void awakened( context * ctx) noexcept { | |
102 | BOOST_ASSERT( nullptr != ctx); | |
b32b8144 | 103 | BOOST_ASSERT( ! ctx->ready_is_linked() ); |
7c673cae | 104 | ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/ |
b32b8144 FG |
105 | if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { |
106 | ++counter_; | |
107 | } | |
7c673cae FG |
108 | } |
109 | ||
110 | context * pick_next() noexcept { | |
111 | context * ctx( nullptr); | |
112 | if ( ! rqueue_.empty() ) { /*< | |
113 | pop an item from the ready queue | |
114 | >*/ | |
115 | ctx = & rqueue_.front(); | |
116 | rqueue_.pop_front(); | |
117 | BOOST_ASSERT( nullptr != ctx); | |
118 | BOOST_ASSERT( context::active() != ctx); | |
b32b8144 FG |
119 | if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { |
120 | --counter_; | |
121 | } | |
7c673cae FG |
122 | } |
123 | return ctx; | |
124 | } | |
125 | ||
126 | bool has_ready_fibers() const noexcept { | |
b32b8144 | 127 | return 0 < counter_; |
7c673cae FG |
128 | } |
129 | ||
130 | //[asio_rr_suspend_until | |
131 | void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept { | |
132 | // Set a timer so at least one handler will eventually fire, causing | |
b32b8144 FG |
133 | // run_one() to eventually return. |
134 | if ( (std::chrono::steady_clock::time_point::max)() != abs_time) { | |
135 | // Each expires_at(time_point) call cancels any previous pending | |
136 | // call. We could inadvertently spin like this: | |
137 | // dispatcher calls suspend_until() with earliest wake time | |
138 | // suspend_until() sets suspend_timer_ | |
139 | // lambda loop calls run_one() | |
140 | // some other asio handler runs before timer expires | |
141 | // run_one() returns to lambda loop | |
142 | // lambda loop yields to dispatcher | |
143 | // dispatcher finds no ready fibers | |
144 | // dispatcher calls suspend_until() with SAME wake time | |
145 | // suspend_until() sets suspend_timer_ to same time, canceling | |
146 | // previous async_wait() | |
147 | // lambda loop calls run_one() | |
148 | // asio calls suspend_timer_ handler with operation_aborted | |
149 | // run_one() returns to lambda loop... etc. etc. | |
150 | // So only actually set the timer when we're passed a DIFFERENT | |
151 | // abs_time value. | |
7c673cae | 152 | suspend_timer_.expires_at( abs_time); |
b32b8144 FG |
153 | suspend_timer_.async_wait([](boost::system::error_code const&){ |
154 | this_fiber::yield(); | |
155 | }); | |
7c673cae | 156 | } |
b32b8144 | 157 | cnd_.notify_one(); |
7c673cae FG |
158 | } |
159 | //] | |
160 | ||
161 | //[asio_rr_notify | |
162 | void notify() noexcept { | |
163 | // Something has happened that should wake one or more fibers BEFORE | |
164 | // suspend_timer_ expires. Reset the timer to cause it to fire | |
165 | // immediately, causing the run_one() call to return. In theory we | |
166 | // could use cancel() because we don't care whether suspend_timer_'s | |
167 | // handler is called with operation_aborted or success. However -- | |
168 | // cancel() doesn't change the expiration time, and we use | |
169 | // suspend_timer_'s expiration time to decide whether it's already | |
170 | // set. If suspend_until() set some specific wake time, then notify() | |
171 | // canceled it, then suspend_until() was called again with the same | |
172 | // wake time, it would match suspend_timer_'s expiration time and we'd | |
173 | // refrain from setting the timer. So instead of simply calling | |
174 | // cancel(), reset the timer, which cancels the pending sleep AND sets | |
175 | // a new expiration time. This will cause us to spin the loop twice -- | |
176 | // once for the operation_aborted handler, once for timer expiration | |
177 | // -- but that shouldn't be a big problem. | |
b32b8144 FG |
178 | suspend_timer_.async_wait([](boost::system::error_code const&){ |
179 | this_fiber::yield(); | |
180 | }); | |
7c673cae FG |
181 | suspend_timer_.expires_at( std::chrono::steady_clock::now() ); |
182 | } | |
183 | //] | |
184 | }; | |
185 | ||
186 | boost::asio::io_service::id round_robin::service::id; | |
187 | ||
188 | }}} | |
189 | ||
190 | #ifdef BOOST_HAS_ABI_HEADERS | |
191 | # include BOOST_ABI_SUFFIX | |
192 | #endif | |
193 | ||
194 | #endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H |