]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/fiber/examples/asio/round_robin.hpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / examples / asio / round_robin.hpp
CommitLineData
7c673cae
FG
1// Copyright Oliver Kowalke 2013.
2// Distributed under the Boost Software License, Version 1.0.
3// (See accompanying file LICENSE_1_0.txt or copy at
4// http://www.boost.org/LICENSE_1_0.txt)
5
6#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
7#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H
8
9#include <chrono>
10#include <cstddef>
b32b8144 11#include <memory>
7c673cae
FG
12#include <mutex>
13#include <queue>
14
15#include <boost/asio.hpp>
16#include <boost/assert.hpp>
17#include <boost/asio/steady_timer.hpp>
18#include <boost/config.hpp>
19
20#include <boost/fiber/condition_variable.hpp>
21#include <boost/fiber/context.hpp>
22#include <boost/fiber/mutex.hpp>
23#include <boost/fiber/operations.hpp>
24#include <boost/fiber/scheduler.hpp>
25
26#include "yield.hpp"
27
28#ifdef BOOST_HAS_ABI_HEADERS
29# include BOOST_ABI_PREFIX
30#endif
31
32namespace boost {
33namespace fibers {
34namespace asio {
35
36class round_robin : public algo::algorithm {
37private:
7c673cae 38//[asio_rr_suspend_timer
b32b8144 39 std::shared_ptr< boost::asio::io_service > io_svc_;
7c673cae
FG
40 boost::asio::steady_timer suspend_timer_;
41//]
b32b8144
FG
42 boost::fibers::scheduler::ready_queue_type rqueue_{};
43 boost::fibers::mutex mtx_{};
44 boost::fibers::condition_variable cnd_{};
45 std::size_t counter_{ 0 };
7c673cae
FG
46
47public:
48//[asio_rr_service_top
49 struct service : public boost::asio::io_service::service {
50 static boost::asio::io_service::id id;
51
52 std::unique_ptr< boost::asio::io_service::work > work_;
53
54 service( boost::asio::io_service & io_svc) :
55 boost::asio::io_service::service( io_svc),
56 work_{ new boost::asio::io_service::work( io_svc) } {
7c673cae
FG
57 }
58
59 virtual ~service() {}
60
61 service( service const&) = delete;
62 service & operator=( service const&) = delete;
63
64 void shutdown_service() override final {
65 work_.reset();
66 }
67 };
68//]
69
70//[asio_rr_ctor
b32b8144 71 round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) :
7c673cae 72 io_svc_( io_svc),
b32b8144 73 suspend_timer_( * io_svc_) {
7c673cae
FG
74 // We use add_service() very deliberately. This will throw
75 // service_already_exists if you pass the same io_service instance to
76 // more than one round_robin instance.
b32b8144
FG
77 boost::asio::add_service( * io_svc_, new service( * io_svc_) );
78 io_svc_->post([this]() mutable {
7c673cae 79//]
b32b8144
FG
80//[asio_rr_service_lambda
81 while ( ! io_svc_->stopped() ) {
82 if ( has_ready_fibers() ) {
83 // run all pending handlers in round_robin
84 while ( io_svc_->poll() );
85 // block this fiber till all pending (ready) fibers are processed
86 // == round_robin::suspend_until() has been called
87 std::unique_lock< boost::fibers::mutex > lk( mtx_);
88 cnd_.wait( lk);
89 } else {
90 // run one handler inside io_service
91 // if no handler available, block this thread
92 if ( ! io_svc_->run_one() ) {
93 break;
94 }
95 }
96 }
97//]
98 });
99 }
7c673cae
FG
100
101 void awakened( context * ctx) noexcept {
102 BOOST_ASSERT( nullptr != ctx);
b32b8144 103 BOOST_ASSERT( ! ctx->ready_is_linked() );
7c673cae 104 ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
b32b8144
FG
105 if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
106 ++counter_;
107 }
7c673cae
FG
108 }
109
110 context * pick_next() noexcept {
111 context * ctx( nullptr);
112 if ( ! rqueue_.empty() ) { /*<
113 pop an item from the ready queue
114 >*/
115 ctx = & rqueue_.front();
116 rqueue_.pop_front();
117 BOOST_ASSERT( nullptr != ctx);
118 BOOST_ASSERT( context::active() != ctx);
b32b8144
FG
119 if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
120 --counter_;
121 }
7c673cae
FG
122 }
123 return ctx;
124 }
125
126 bool has_ready_fibers() const noexcept {
b32b8144 127 return 0 < counter_;
7c673cae
FG
128 }
129
130//[asio_rr_suspend_until
131 void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
132 // Set a timer so at least one handler will eventually fire, causing
b32b8144
FG
133 // run_one() to eventually return.
134 if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
135 // Each expires_at(time_point) call cancels any previous pending
136 // call. We could inadvertently spin like this:
137 // dispatcher calls suspend_until() with earliest wake time
138 // suspend_until() sets suspend_timer_
139 // lambda loop calls run_one()
140 // some other asio handler runs before timer expires
141 // run_one() returns to lambda loop
142 // lambda loop yields to dispatcher
143 // dispatcher finds no ready fibers
144 // dispatcher calls suspend_until() with SAME wake time
145 // suspend_until() sets suspend_timer_ to same time, canceling
146 // previous async_wait()
147 // lambda loop calls run_one()
148 // asio calls suspend_timer_ handler with operation_aborted
149 // run_one() returns to lambda loop... etc. etc.
150 // So only actually set the timer when we're passed a DIFFERENT
151 // abs_time value.
7c673cae 152 suspend_timer_.expires_at( abs_time);
b32b8144
FG
153 suspend_timer_.async_wait([](boost::system::error_code const&){
154 this_fiber::yield();
155 });
7c673cae 156 }
b32b8144 157 cnd_.notify_one();
7c673cae
FG
158 }
159//]
160
161//[asio_rr_notify
162 void notify() noexcept {
163 // Something has happened that should wake one or more fibers BEFORE
164 // suspend_timer_ expires. Reset the timer to cause it to fire
165 // immediately, causing the run_one() call to return. In theory we
166 // could use cancel() because we don't care whether suspend_timer_'s
167 // handler is called with operation_aborted or success. However --
168 // cancel() doesn't change the expiration time, and we use
169 // suspend_timer_'s expiration time to decide whether it's already
170 // set. If suspend_until() set some specific wake time, then notify()
171 // canceled it, then suspend_until() was called again with the same
172 // wake time, it would match suspend_timer_'s expiration time and we'd
173 // refrain from setting the timer. So instead of simply calling
174 // cancel(), reset the timer, which cancels the pending sleep AND sets
175 // a new expiration time. This will cause us to spin the loop twice --
176 // once for the operation_aborted handler, once for timer expiration
177 // -- but that shouldn't be a big problem.
b32b8144
FG
178 suspend_timer_.async_wait([](boost::system::error_code const&){
179 this_fiber::yield();
180 });
7c673cae
FG
181 suspend_timer_.expires_at( std::chrono::steady_clock::now() );
182 }
183//]
184};
185
186boost::asio::io_service::id round_robin::service::id;
187
188}}}
189
190#ifdef BOOST_HAS_ABI_HEADERS
191# include BOOST_ABI_SUFFIX
192#endif
193
194#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H