]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | #include <boost/asio/ts/executor.hpp> |
2 | #include <condition_variable> | |
3 | #include <deque> | |
4 | #include <memory> | |
5 | #include <mutex> | |
6 | #include <typeinfo> | |
7 | #include <vector> | |
8 | ||
9 | using boost::asio::defer; | |
10 | using boost::asio::executor; | |
11 | using boost::asio::post; | |
12 | using boost::asio::strand; | |
13 | using boost::asio::system_executor; | |
14 | ||
15 | //------------------------------------------------------------------------------ | |
16 | // A tiny actor framework | |
17 | // ~~~~~~~~~~~~~~~~~~~~~~ | |
18 | ||
19 | class actor; | |
20 | ||
21 | // Used to identify the sender and recipient of messages. | |
22 | typedef actor* actor_address; | |
23 | ||
24 | // Base class for all registered message handlers. | |
25 | class message_handler_base | |
26 | { | |
27 | public: | |
28 | virtual ~message_handler_base() {} | |
29 | ||
30 | // Used to determine which message handlers receive an incoming message. | |
31 | virtual const std::type_info& message_id() const = 0; | |
32 | }; | |
33 | ||
34 | // Base class for a handler for a specific message type. | |
35 | template <class Message> | |
36 | class message_handler : public message_handler_base | |
37 | { | |
38 | public: | |
39 | // Handle an incoming message. | |
40 | virtual void handle_message(Message msg, actor_address from) = 0; | |
41 | }; | |
42 | ||
43 | // Concrete message handler for a specific message type. | |
44 | template <class Actor, class Message> | |
45 | class mf_message_handler : public message_handler<Message> | |
46 | { | |
47 | public: | |
48 | // Construct a message handler to invoke the specified member function. | |
49 | mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a) | |
50 | : function_(mf), actor_(a) | |
51 | { | |
52 | } | |
53 | ||
54 | // Used to determine which message handlers receive an incoming message. | |
55 | virtual const std::type_info& message_id() const | |
56 | { | |
57 | return typeid(Message); | |
58 | } | |
59 | ||
60 | // Handle an incoming message. | |
61 | virtual void handle_message(Message msg, actor_address from) | |
62 | { | |
63 | (actor_->*function_)(std::move(msg), from); | |
64 | } | |
65 | ||
66 | // Determine whether the message handler represents the specified function. | |
67 | bool is_function(void (Actor::* mf)(Message, actor_address)) const | |
68 | { | |
69 | return mf == function_; | |
70 | } | |
71 | ||
72 | private: | |
73 | void (Actor::* function_)(Message, actor_address); | |
74 | Actor* actor_; | |
75 | }; | |
76 | ||
77 | // Base class for all actors. | |
78 | class actor | |
79 | { | |
80 | public: | |
81 | virtual ~actor() | |
82 | { | |
83 | } | |
84 | ||
85 | // Obtain the actor's address for use as a message sender or recipient. | |
86 | actor_address address() | |
87 | { | |
88 | return this; | |
89 | } | |
90 | ||
91 | // Send a message from one actor to another. | |
92 | template <class Message> | |
93 | friend void send(Message msg, actor_address from, actor_address to) | |
94 | { | |
95 | // Execute the message handler in the context of the target's executor. | |
96 | post(to->executor_, | |
97 | [=, msg=std::move(msg)] | |
98 | { | |
99 | to->call_handler(std::move(msg), from); | |
100 | }); | |
101 | } | |
102 | ||
103 | protected: | |
104 | // Construct the actor to use the specified executor for all message handlers. | |
105 | actor(executor e) | |
106 | : executor_(std::move(e)) | |
107 | { | |
108 | } | |
109 | ||
110 | // Register a handler for a specific message type. Duplicates are permitted. | |
111 | template <class Actor, class Message> | |
112 | void register_handler(void (Actor::* mf)(Message, actor_address)) | |
113 | { | |
114 | handlers_.push_back( | |
115 | std::make_shared<mf_message_handler<Actor, Message>>( | |
116 | mf, static_cast<Actor*>(this))); | |
117 | } | |
118 | ||
119 | // Deregister a handler. Removes only the first matching handler. | |
120 | template <class Actor, class Message> | |
121 | void deregister_handler(void (Actor::* mf)(Message, actor_address)) | |
122 | { | |
123 | const std::type_info& id = typeid(Message); | |
124 | for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter) | |
125 | { | |
126 | if ((*iter)->message_id() == id) | |
127 | { | |
128 | auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get()); | |
129 | if (mh->is_function(mf)) | |
130 | { | |
131 | handlers_.erase(iter); | |
132 | return; | |
133 | } | |
134 | } | |
135 | } | |
136 | } | |
137 | ||
138 | // Send a message from within a message handler. | |
139 | template <class Message> | |
140 | void tail_send(Message msg, actor_address to) | |
141 | { | |
142 | // Execute the message handler in the context of the target's executor. | |
143 | defer(to->executor_, | |
144 | [=, msg=std::move(msg), from=this] | |
145 | { | |
146 | to->call_handler(std::move(msg), from); | |
147 | }); | |
148 | } | |
149 | ||
150 | private: | |
151 | // Find the matching message handlers, if any, and call them. | |
152 | template <class Message> | |
153 | void call_handler(Message msg, actor_address from) | |
154 | { | |
155 | const std::type_info& message_id = typeid(Message); | |
156 | for (auto& h: handlers_) | |
157 | { | |
158 | if (h->message_id() == message_id) | |
159 | { | |
160 | auto mh = static_cast<message_handler<Message>*>(h.get()); | |
161 | mh->handle_message(msg, from); | |
162 | } | |
163 | } | |
164 | } | |
165 | ||
166 | // All messages associated with a single actor object should be processed | |
167 | // non-concurrently. We use a strand to ensure non-concurrent execution even | |
168 | // if the underlying executor may use multiple threads. | |
169 | strand<executor> executor_; | |
170 | ||
171 | std::vector<std::shared_ptr<message_handler_base>> handlers_; | |
172 | }; | |
173 | ||
174 | // A concrete actor that allows synchronous message retrieval. | |
175 | template <class Message> | |
176 | class receiver : public actor | |
177 | { | |
178 | public: | |
179 | receiver() | |
180 | : actor(system_executor()) | |
181 | { | |
182 | register_handler(&receiver::message_handler); | |
183 | } | |
184 | ||
185 | // Block until a message has been received. | |
186 | Message wait() | |
187 | { | |
188 | std::unique_lock<std::mutex> lock(mutex_); | |
189 | condition_.wait(lock, [this]{ return !message_queue_.empty(); }); | |
190 | Message msg(std::move(message_queue_.front())); | |
191 | message_queue_.pop_front(); | |
192 | return msg; | |
193 | } | |
194 | ||
195 | private: | |
196 | // Handle a new message by adding it to the queue and waking a waiter. | |
197 | void message_handler(Message msg, actor_address /* from */) | |
198 | { | |
199 | std::lock_guard<std::mutex> lock(mutex_); | |
200 | message_queue_.push_back(std::move(msg)); | |
201 | condition_.notify_one(); | |
202 | } | |
203 | ||
204 | std::mutex mutex_; | |
205 | std::condition_variable condition_; | |
206 | std::deque<Message> message_queue_; | |
207 | }; | |
208 | ||
209 | //------------------------------------------------------------------------------ | |
210 | ||
211 | #include <boost/asio/thread_pool.hpp> | |
212 | #include <iostream> | |
213 | ||
214 | using boost::asio::thread_pool; | |
215 | ||
216 | class member : public actor | |
217 | { | |
218 | public: | |
219 | explicit member(executor e) | |
220 | : actor(std::move(e)) | |
221 | { | |
222 | register_handler(&member::init_handler); | |
223 | } | |
224 | ||
225 | private: | |
226 | void init_handler(actor_address next, actor_address from) | |
227 | { | |
228 | next_ = next; | |
229 | caller_ = from; | |
230 | ||
231 | register_handler(&member::token_handler); | |
232 | deregister_handler(&member::init_handler); | |
233 | } | |
234 | ||
235 | void token_handler(int token, actor_address /*from*/) | |
236 | { | |
237 | int msg(token); | |
238 | actor_address to(caller_); | |
239 | ||
240 | if (token > 0) | |
241 | { | |
242 | msg = token - 1; | |
243 | to = next_; | |
244 | } | |
245 | ||
246 | tail_send(msg, to); | |
247 | } | |
248 | ||
249 | actor_address next_; | |
250 | actor_address caller_; | |
251 | }; | |
252 | ||
253 | int main() | |
254 | { | |
255 | const std::size_t num_threads = 16; | |
256 | const int num_hops = 50000000; | |
257 | const std::size_t num_actors = 503; | |
258 | const int token_value = (num_hops + num_actors - 1) / num_actors; | |
259 | const std::size_t actors_per_thread = num_actors / num_threads; | |
260 | ||
261 | struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} }; | |
262 | single_thread_pool pools[num_threads]; | |
263 | std::vector<std::shared_ptr<member>> members(num_actors); | |
264 | receiver<int> rcvr; | |
265 | ||
266 | // Create the member actors. | |
267 | for (std::size_t i = 0; i < num_actors; ++i) | |
268 | members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor()); | |
269 | ||
270 | // Initialise the actors by passing each one the address of the next actor in the ring. | |
271 | for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i) | |
272 | send(members[next_i]->address(), rcvr.address(), members[i - 1]->address()); | |
273 | ||
274 | // Send exactly one token to each actor, all with the same initial value, rounding up if required. | |
275 | for (std::size_t i = 0; i < num_actors; ++i) | |
276 | send(token_value, rcvr.address(), members[i]->address()); | |
277 | ||
278 | // Wait for all signal messages, indicating the tokens have all reached zero. | |
279 | for (std::size_t i = 0; i < num_actors; ++i) | |
280 | rcvr.wait(); | |
281 | } |