]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/asio/example/cpp14/executors/actor.cpp
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / boost / libs / asio / example / cpp14 / executors / actor.cpp
CommitLineData
f67539c2
TL
1#include <boost/asio/ts/executor.hpp>
2#include <condition_variable>
3#include <deque>
4#include <memory>
5#include <mutex>
6#include <typeinfo>
7#include <vector>
8
9using boost::asio::defer;
10using boost::asio::executor;
11using boost::asio::post;
12using boost::asio::strand;
13using boost::asio::system_executor;
14
15//------------------------------------------------------------------------------
16// A tiny actor framework
17// ~~~~~~~~~~~~~~~~~~~~~~
18
19class actor;
20
21// Used to identify the sender and recipient of messages.
22typedef actor* actor_address;
23
24// Base class for all registered message handlers.
25class message_handler_base
26{
27public:
28 virtual ~message_handler_base() {}
29
30 // Used to determine which message handlers receive an incoming message.
31 virtual const std::type_info& message_id() const = 0;
32};
33
34// Base class for a handler for a specific message type.
35template <class Message>
36class message_handler : public message_handler_base
37{
38public:
39 // Handle an incoming message.
40 virtual void handle_message(Message msg, actor_address from) = 0;
41};
42
43// Concrete message handler for a specific message type.
44template <class Actor, class Message>
45class mf_message_handler : public message_handler<Message>
46{
47public:
48 // Construct a message handler to invoke the specified member function.
49 mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
50 : function_(mf), actor_(a)
51 {
52 }
53
54 // Used to determine which message handlers receive an incoming message.
55 virtual const std::type_info& message_id() const
56 {
57 return typeid(Message);
58 }
59
60 // Handle an incoming message.
61 virtual void handle_message(Message msg, actor_address from)
62 {
63 (actor_->*function_)(std::move(msg), from);
64 }
65
66 // Determine whether the message handler represents the specified function.
67 bool is_function(void (Actor::* mf)(Message, actor_address)) const
68 {
69 return mf == function_;
70 }
71
72private:
73 void (Actor::* function_)(Message, actor_address);
74 Actor* actor_;
75};
76
77// Base class for all actors.
78class actor
79{
80public:
81 virtual ~actor()
82 {
83 }
84
85 // Obtain the actor's address for use as a message sender or recipient.
86 actor_address address()
87 {
88 return this;
89 }
90
91 // Send a message from one actor to another.
92 template <class Message>
93 friend void send(Message msg, actor_address from, actor_address to)
94 {
95 // Execute the message handler in the context of the target's executor.
96 post(to->executor_,
97 [=, msg=std::move(msg)]
98 {
99 to->call_handler(std::move(msg), from);
100 });
101 }
102
103protected:
104 // Construct the actor to use the specified executor for all message handlers.
105 actor(executor e)
106 : executor_(std::move(e))
107 {
108 }
109
110 // Register a handler for a specific message type. Duplicates are permitted.
111 template <class Actor, class Message>
112 void register_handler(void (Actor::* mf)(Message, actor_address))
113 {
114 handlers_.push_back(
115 std::make_shared<mf_message_handler<Actor, Message>>(
116 mf, static_cast<Actor*>(this)));
117 }
118
119 // Deregister a handler. Removes only the first matching handler.
120 template <class Actor, class Message>
121 void deregister_handler(void (Actor::* mf)(Message, actor_address))
122 {
123 const std::type_info& id = typeid(Message);
124 for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
125 {
126 if ((*iter)->message_id() == id)
127 {
128 auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
129 if (mh->is_function(mf))
130 {
131 handlers_.erase(iter);
132 return;
133 }
134 }
135 }
136 }
137
138 // Send a message from within a message handler.
139 template <class Message>
140 void tail_send(Message msg, actor_address to)
141 {
142 // Execute the message handler in the context of the target's executor.
143 defer(to->executor_,
144 [=, msg=std::move(msg), from=this]
145 {
146 to->call_handler(std::move(msg), from);
147 });
148 }
149
150private:
151 // Find the matching message handlers, if any, and call them.
152 template <class Message>
153 void call_handler(Message msg, actor_address from)
154 {
155 const std::type_info& message_id = typeid(Message);
156 for (auto& h: handlers_)
157 {
158 if (h->message_id() == message_id)
159 {
160 auto mh = static_cast<message_handler<Message>*>(h.get());
161 mh->handle_message(msg, from);
162 }
163 }
164 }
165
166 // All messages associated with a single actor object should be processed
167 // non-concurrently. We use a strand to ensure non-concurrent execution even
168 // if the underlying executor may use multiple threads.
169 strand<executor> executor_;
170
171 std::vector<std::shared_ptr<message_handler_base>> handlers_;
172};
173
174// A concrete actor that allows synchronous message retrieval.
175template <class Message>
176class receiver : public actor
177{
178public:
179 receiver()
180 : actor(system_executor())
181 {
182 register_handler(&receiver::message_handler);
183 }
184
185 // Block until a message has been received.
186 Message wait()
187 {
188 std::unique_lock<std::mutex> lock(mutex_);
189 condition_.wait(lock, [this]{ return !message_queue_.empty(); });
190 Message msg(std::move(message_queue_.front()));
191 message_queue_.pop_front();
192 return msg;
193 }
194
195private:
196 // Handle a new message by adding it to the queue and waking a waiter.
197 void message_handler(Message msg, actor_address /* from */)
198 {
199 std::lock_guard<std::mutex> lock(mutex_);
200 message_queue_.push_back(std::move(msg));
201 condition_.notify_one();
202 }
203
204 std::mutex mutex_;
205 std::condition_variable condition_;
206 std::deque<Message> message_queue_;
207};
208
209//------------------------------------------------------------------------------
210
211#include <boost/asio/thread_pool.hpp>
212#include <iostream>
213
214using boost::asio::thread_pool;
215
216class member : public actor
217{
218public:
219 explicit member(executor e)
220 : actor(std::move(e))
221 {
222 register_handler(&member::init_handler);
223 }
224
225private:
226 void init_handler(actor_address next, actor_address from)
227 {
228 next_ = next;
229 caller_ = from;
230
231 register_handler(&member::token_handler);
232 deregister_handler(&member::init_handler);
233 }
234
235 void token_handler(int token, actor_address /*from*/)
236 {
237 int msg(token);
238 actor_address to(caller_);
239
240 if (token > 0)
241 {
242 msg = token - 1;
243 to = next_;
244 }
245
246 tail_send(msg, to);
247 }
248
249 actor_address next_;
250 actor_address caller_;
251};
252
253int main()
254{
255 const std::size_t num_threads = 16;
256 const int num_hops = 50000000;
257 const std::size_t num_actors = 503;
258 const int token_value = (num_hops + num_actors - 1) / num_actors;
259 const std::size_t actors_per_thread = num_actors / num_threads;
260
261 struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
262 single_thread_pool pools[num_threads];
263 std::vector<std::shared_ptr<member>> members(num_actors);
264 receiver<int> rcvr;
265
266 // Create the member actors.
267 for (std::size_t i = 0; i < num_actors; ++i)
268 members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
269
270 // Initialise the actors by passing each one the address of the next actor in the ring.
271 for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
272 send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
273
274 // Send exactly one token to each actor, all with the same initial value, rounding up if required.
275 for (std::size_t i = 0; i < num_actors; ++i)
276 send(token_value, rcvr.address(), members[i]->address());
277
278 // Wait for all signal messages, indicating the tokens have all reached zero.
279 for (std::size_t i = 0; i < num_actors; ++i)
280 rcvr.wait();
281}