]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/asio/example/cpp14/executors/actor.cpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / libs / asio / example / cpp14 / executors / actor.cpp
1 #include <boost/asio/any_io_executor.hpp>
2 #include <boost/asio/defer.hpp>
3 #include <boost/asio/post.hpp>
4 #include <boost/asio/strand.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <condition_variable>
7 #include <deque>
8 #include <memory>
9 #include <mutex>
10 #include <typeinfo>
11 #include <vector>
12
13 using boost::asio::any_io_executor;
14 using boost::asio::defer;
15 using boost::asio::post;
16 using boost::asio::strand;
17 using boost::asio::system_executor;
18
19 //------------------------------------------------------------------------------
20 // A tiny actor framework
21 // ~~~~~~~~~~~~~~~~~~~~~~
22
23 class actor;
24
25 // Used to identify the sender and recipient of messages.
26 typedef actor* actor_address;
27
28 // Base class for all registered message handlers.
29 class message_handler_base
30 {
31 public:
32 virtual ~message_handler_base() {}
33
34 // Used to determine which message handlers receive an incoming message.
35 virtual const std::type_info& message_id() const = 0;
36 };
37
38 // Base class for a handler for a specific message type.
39 template <class Message>
40 class message_handler : public message_handler_base
41 {
42 public:
43 // Handle an incoming message.
44 virtual void handle_message(Message msg, actor_address from) = 0;
45 };
46
47 // Concrete message handler for a specific message type.
48 template <class Actor, class Message>
49 class mf_message_handler : public message_handler<Message>
50 {
51 public:
52 // Construct a message handler to invoke the specified member function.
53 mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
54 : function_(mf), actor_(a)
55 {
56 }
57
58 // Used to determine which message handlers receive an incoming message.
59 virtual const std::type_info& message_id() const
60 {
61 return typeid(Message);
62 }
63
64 // Handle an incoming message.
65 virtual void handle_message(Message msg, actor_address from)
66 {
67 (actor_->*function_)(std::move(msg), from);
68 }
69
70 // Determine whether the message handler represents the specified function.
71 bool is_function(void (Actor::* mf)(Message, actor_address)) const
72 {
73 return mf == function_;
74 }
75
76 private:
77 void (Actor::* function_)(Message, actor_address);
78 Actor* actor_;
79 };
80
81 // Base class for all actors.
82 class actor
83 {
84 public:
85 virtual ~actor()
86 {
87 }
88
89 // Obtain the actor's address for use as a message sender or recipient.
90 actor_address address()
91 {
92 return this;
93 }
94
95 // Send a message from one actor to another.
96 template <class Message>
97 friend void send(Message msg, actor_address from, actor_address to)
98 {
99 // Execute the message handler in the context of the target's executor.
100 post(to->executor_,
101 [=, msg=std::move(msg)]() mutable
102 {
103 to->call_handler(std::move(msg), from);
104 });
105 }
106
107 protected:
108 // Construct the actor to use the specified executor for all message handlers.
109 actor(any_io_executor e)
110 : executor_(std::move(e))
111 {
112 }
113
114 // Register a handler for a specific message type. Duplicates are permitted.
115 template <class Actor, class Message>
116 void register_handler(void (Actor::* mf)(Message, actor_address))
117 {
118 handlers_.push_back(
119 std::make_shared<mf_message_handler<Actor, Message>>(
120 mf, static_cast<Actor*>(this)));
121 }
122
123 // Deregister a handler. Removes only the first matching handler.
124 template <class Actor, class Message>
125 void deregister_handler(void (Actor::* mf)(Message, actor_address))
126 {
127 const std::type_info& id = typeid(Message);
128 for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
129 {
130 if ((*iter)->message_id() == id)
131 {
132 auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
133 if (mh->is_function(mf))
134 {
135 handlers_.erase(iter);
136 return;
137 }
138 }
139 }
140 }
141
142 // Send a message from within a message handler.
143 template <class Message>
144 void tail_send(Message msg, actor_address to)
145 {
146 // Execute the message handler in the context of the target's executor.
147 defer(to->executor_,
148 [=, msg=std::move(msg), from=this]
149 {
150 to->call_handler(std::move(msg), from);
151 });
152 }
153
154 private:
155 // Find the matching message handlers, if any, and call them.
156 template <class Message>
157 void call_handler(Message msg, actor_address from)
158 {
159 const std::type_info& message_id = typeid(Message);
160 for (auto& h: handlers_)
161 {
162 if (h->message_id() == message_id)
163 {
164 auto mh = static_cast<message_handler<Message>*>(h.get());
165 mh->handle_message(msg, from);
166 }
167 }
168 }
169
170 // All messages associated with a single actor object should be processed
171 // non-concurrently. We use a strand to ensure non-concurrent execution even
172 // if the underlying executor may use multiple threads.
173 strand<any_io_executor> executor_;
174
175 std::vector<std::shared_ptr<message_handler_base>> handlers_;
176 };
177
178 // A concrete actor that allows synchronous message retrieval.
179 template <class Message>
180 class receiver : public actor
181 {
182 public:
183 receiver()
184 : actor(system_executor())
185 {
186 register_handler(&receiver::message_handler);
187 }
188
189 // Block until a message has been received.
190 Message wait()
191 {
192 std::unique_lock<std::mutex> lock(mutex_);
193 condition_.wait(lock, [this]{ return !message_queue_.empty(); });
194 Message msg(std::move(message_queue_.front()));
195 message_queue_.pop_front();
196 return msg;
197 }
198
199 private:
200 // Handle a new message by adding it to the queue and waking a waiter.
201 void message_handler(Message msg, actor_address /* from */)
202 {
203 std::lock_guard<std::mutex> lock(mutex_);
204 message_queue_.push_back(std::move(msg));
205 condition_.notify_one();
206 }
207
208 std::mutex mutex_;
209 std::condition_variable condition_;
210 std::deque<Message> message_queue_;
211 };
212
213 //------------------------------------------------------------------------------
214
215 #include <boost/asio/thread_pool.hpp>
216 #include <iostream>
217
218 using boost::asio::thread_pool;
219
220 class member : public actor
221 {
222 public:
223 explicit member(any_io_executor e)
224 : actor(std::move(e))
225 {
226 register_handler(&member::init_handler);
227 }
228
229 private:
230 void init_handler(actor_address next, actor_address from)
231 {
232 next_ = next;
233 caller_ = from;
234
235 register_handler(&member::token_handler);
236 deregister_handler(&member::init_handler);
237 }
238
239 void token_handler(int token, actor_address /*from*/)
240 {
241 int msg(token);
242 actor_address to(caller_);
243
244 if (token > 0)
245 {
246 msg = token - 1;
247 to = next_;
248 }
249
250 tail_send(msg, to);
251 }
252
253 actor_address next_;
254 actor_address caller_;
255 };
256
257 int main()
258 {
259 const std::size_t num_threads = 16;
260 const int num_hops = 50000000;
261 const std::size_t num_actors = 503;
262 const int token_value = (num_hops + num_actors - 1) / num_actors;
263 const std::size_t actors_per_thread = num_actors / num_threads;
264
265 struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
266 single_thread_pool pools[num_threads];
267 std::vector<std::shared_ptr<member>> members(num_actors);
268 receiver<int> rcvr;
269
270 // Create the member actors.
271 for (std::size_t i = 0; i < num_actors; ++i)
272 members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
273
274 // Initialise the actors by passing each one the address of the next actor in the ring.
275 for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
276 send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
277
278 // Send exactly one token to each actor, all with the same initial value, rounding up if required.
279 for (std::size_t i = 0; i < num_actors; ++i)
280 send(token_value, rcvr.address(), members[i]->address());
281
282 // Wait for all signal messages, indicating the tokens have all reached zero.
283 for (std::size_t i = 0; i < num_actors; ++i)
284 rcvr.wait();
285 }