1 #include <boost/asio/any_io_executor.hpp>
2 #include <boost/asio/defer.hpp>
3 #include <boost/asio/post.hpp>
4 #include <boost/asio/strand.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <condition_variable>
13 using boost::asio::any_io_executor
;
14 using boost::asio::defer
;
15 using boost::asio::post
;
16 using boost::asio::strand
;
17 using boost::asio::system_executor
;
19 //------------------------------------------------------------------------------
20 // A tiny actor framework
21 // ~~~~~~~~~~~~~~~~~~~~~~
25 // Used to identify the sender and recipient of messages.
26 typedef actor
* actor_address
;
28 // Base class for all registered message handlers.
29 class message_handler_base
32 virtual ~message_handler_base() {}
34 // Used to determine which message handlers receive an incoming message.
35 virtual const std::type_info
& message_id() const = 0;
38 // Base class for a handler for a specific message type.
39 template <class Message
>
40 class message_handler
: public message_handler_base
43 // Handle an incoming message.
44 virtual void handle_message(Message msg
, actor_address from
) = 0;
47 // Concrete message handler for a specific message type.
48 template <class Actor
, class Message
>
49 class mf_message_handler
: public message_handler
<Message
>
52 // Construct a message handler to invoke the specified member function.
53 mf_message_handler(void (Actor::* mf
)(Message
, actor_address
), Actor
* a
)
54 : function_(mf
), actor_(a
)
58 // Used to determine which message handlers receive an incoming message.
59 virtual const std::type_info
& message_id() const
61 return typeid(Message
);
64 // Handle an incoming message.
65 virtual void handle_message(Message msg
, actor_address from
)
67 (actor_
->*function_
)(std::move(msg
), from
);
70 // Determine whether the message handler represents the specified function.
71 bool is_function(void (Actor::* mf
)(Message
, actor_address
)) const
73 return mf
== function_
;
77 void (Actor::* function_
)(Message
, actor_address
);
81 // Base class for all actors.
89 // Obtain the actor's address for use as a message sender or recipient.
90 actor_address
address()
95 // Send a message from one actor to another.
96 template <class Message
>
97 friend void send(Message msg
, actor_address from
, actor_address to
)
99 // Execute the message handler in the context of the target's executor.
101 [=, msg
=std::move(msg
)]() mutable
103 to
->call_handler(std::move(msg
), from
);
108 // Construct the actor to use the specified executor for all message handlers.
109 actor(any_io_executor e
)
110 : executor_(std::move(e
))
114 // Register a handler for a specific message type. Duplicates are permitted.
115 template <class Actor
, class Message
>
116 void register_handler(void (Actor::* mf
)(Message
, actor_address
))
119 std::make_shared
<mf_message_handler
<Actor
, Message
>>(
120 mf
, static_cast<Actor
*>(this)));
123 // Deregister a handler. Removes only the first matching handler.
124 template <class Actor
, class Message
>
125 void deregister_handler(void (Actor::* mf
)(Message
, actor_address
))
127 const std::type_info
& id
= typeid(Message
);
128 for (auto iter
= handlers_
.begin(); iter
!= handlers_
.end(); ++iter
)
130 if ((*iter
)->message_id() == id
)
132 auto mh
= static_cast<mf_message_handler
<Actor
, Message
>*>(iter
->get());
133 if (mh
->is_function(mf
))
135 handlers_
.erase(iter
);
142 // Send a message from within a message handler.
143 template <class Message
>
144 void tail_send(Message msg
, actor_address to
)
146 // Execute the message handler in the context of the target's executor.
148 [=, msg
=std::move(msg
), from
=this]
150 to
->call_handler(std::move(msg
), from
);
155 // Find the matching message handlers, if any, and call them.
156 template <class Message
>
157 void call_handler(Message msg
, actor_address from
)
159 const std::type_info
& message_id
= typeid(Message
);
160 for (auto& h
: handlers_
)
162 if (h
->message_id() == message_id
)
164 auto mh
= static_cast<message_handler
<Message
>*>(h
.get());
165 mh
->handle_message(msg
, from
);
170 // All messages associated with a single actor object should be processed
171 // non-concurrently. We use a strand to ensure non-concurrent execution even
172 // if the underlying executor may use multiple threads.
173 strand
<any_io_executor
> executor_
;
175 std::vector
<std::shared_ptr
<message_handler_base
>> handlers_
;
178 // A concrete actor that allows synchronous message retrieval.
179 template <class Message
>
180 class receiver
: public actor
184 : actor(system_executor())
186 register_handler(&receiver::message_handler
);
189 // Block until a message has been received.
192 std::unique_lock
<std::mutex
> lock(mutex_
);
193 condition_
.wait(lock
, [this]{ return !message_queue_
.empty(); });
194 Message
msg(std::move(message_queue_
.front()));
195 message_queue_
.pop_front();
200 // Handle a new message by adding it to the queue and waking a waiter.
201 void message_handler(Message msg
, actor_address
/* from */)
203 std::lock_guard
<std::mutex
> lock(mutex_
);
204 message_queue_
.push_back(std::move(msg
));
205 condition_
.notify_one();
209 std::condition_variable condition_
;
210 std::deque
<Message
> message_queue_
;
213 //------------------------------------------------------------------------------
215 #include <boost/asio/thread_pool.hpp>
218 using boost::asio::thread_pool
;
220 class member
: public actor
223 explicit member(any_io_executor e
)
224 : actor(std::move(e
))
226 register_handler(&member::init_handler
);
230 void init_handler(actor_address next
, actor_address from
)
235 register_handler(&member::token_handler
);
236 deregister_handler(&member::init_handler
);
239 void token_handler(int token
, actor_address
/*from*/)
242 actor_address
to(caller_
);
254 actor_address caller_
;
259 const std::size_t num_threads
= 16;
260 const int num_hops
= 50000000;
261 const std::size_t num_actors
= 503;
262 const int token_value
= (num_hops
+ num_actors
- 1) / num_actors
;
263 const std::size_t actors_per_thread
= num_actors
/ num_threads
;
265 struct single_thread_pool
: thread_pool
{ single_thread_pool() : thread_pool(1) {} };
266 single_thread_pool pools
[num_threads
];
267 std::vector
<std::shared_ptr
<member
>> members(num_actors
);
270 // Create the member actors.
271 for (std::size_t i
= 0; i
< num_actors
; ++i
)
272 members
[i
] = std::make_shared
<member
>(pools
[(i
/ actors_per_thread
) % num_threads
].get_executor());
274 // Initialise the actors by passing each one the address of the next actor in the ring.
275 for (std::size_t i
= num_actors
, next_i
= 0; i
> 0; next_i
= --i
)
276 send(members
[next_i
]->address(), rcvr
.address(), members
[i
- 1]->address());
278 // Send exactly one token to each actor, all with the same initial value, rounding up if required.
279 for (std::size_t i
= 0; i
< num_actors
; ++i
)
280 send(token_value
, rcvr
.address(), members
[i
]->address());
282 // Wait for all signal messages, indicating the tokens have all reached zero.
283 for (std::size_t i
= 0; i
< num_actors
; ++i
)