5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
17 #include <boost/bind/bind.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/enable_shared_from_this.hpp>
20 #include <boost/asio/buffer.hpp>
21 #include <boost/asio/io_context.hpp>
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/asio/ip/udp.hpp>
24 #include <boost/asio/read_until.hpp>
25 #include <boost/asio/steady_timer.hpp>
26 #include <boost/asio/write.hpp>
28 using boost::asio::steady_timer
;
29 using boost::asio::ip::tcp
;
30 using boost::asio::ip::udp
;
32 //----------------------------------------------------------------------
37 virtual ~subscriber() {}
38 virtual void deliver(const std::string
& msg
) = 0;
41 typedef boost::shared_ptr
<subscriber
> subscriber_ptr
;
43 //----------------------------------------------------------------------
48 void join(subscriber_ptr subscriber
)
50 subscribers_
.insert(subscriber
);
53 void leave(subscriber_ptr subscriber
)
55 subscribers_
.erase(subscriber
);
58 void deliver(const std::string
& msg
)
60 std::for_each(subscribers_
.begin(), subscribers_
.end(),
61 boost::bind(&subscriber::deliver
,
62 boost::placeholders::_1
, boost::ref(msg
)));
66 std::set
<subscriber_ptr
> subscribers_
;
69 //----------------------------------------------------------------------
72 // This class manages socket timeouts by applying the concept of a deadline.
73 // Some asynchronous operations are given deadlines by which they must complete.
74 // Deadlines are enforced by two "actors" that persist for the lifetime of the
75 // session object, one for input and one for output:
77 // +----------------+ +----------------+
79 // | check_deadline |<---+ | check_deadline |<---+
80 // | | | async_wait() | | | async_wait()
81 // +----------------+ | on input +----------------+ | on output
82 // | | deadline | | deadline
83 // +---------+ +---------+
85 // If either deadline actor determines that the corresponding deadline has
86 // expired, the socket is closed and any outstanding operations are cancelled.
88 // The input actor reads messages from the socket, where messages are delimited
89 // by the newline character:
93 // | start_read |<---+
97 // async_- | +-------------+
99 // until() +--->| handle_read |
103 // The deadline for receiving a complete message is 30 seconds. If a non-empty
104 // message is received, it is delivered to all subscribers. If a heartbeat (a
105 // message that consists of a single newline character) is received, a heartbeat
106 // is enqueued for the client, provided there are no other messages waiting to
109 // The output actor is responsible for sending messages to the client:
112 // | |<---------------------+
113 // | await_output | |
115 // +--------------+ | |
116 // | | | async_wait() |
119 // +-------------+ +--------------+
120 // | | async_write() | |
121 // | start_write |-------------->| handle_write |
123 // +-------------+ +--------------+
125 // The output actor first waits for an output message to be enqueued. It does
126 // this by using a steady_timer as an asynchronous condition variable. The
127 // steady_timer will be signalled whenever the output queue is non-empty.
129 // Once a message is available, it is sent to the client. The deadline for
130 // sending a complete message is 30 seconds. After the message is successfully
131 // sent, the output actor again waits for the output queue to become non-empty.
135 public boost::enable_shared_from_this
<tcp_session
>
138 tcp_session(boost::asio::io_context
& io_context
, channel
& ch
)
141 input_deadline_(io_context
),
142 non_empty_output_queue_(io_context
),
143 output_deadline_(io_context
)
145 input_deadline_
.expires_at(steady_timer::time_point::max());
146 output_deadline_
.expires_at(steady_timer::time_point::max());
148 // The non_empty_output_queue_ steady_timer is set to the maximum time
149 // point whenever the output queue is empty. This ensures that the output
150 // actor stays asleep until a message is put into the queue.
151 non_empty_output_queue_
.expires_at(steady_timer::time_point::max());
154 tcp::socket
& socket()
159 // Called by the server object to initiate the four actors.
162 channel_
.join(shared_from_this());
166 input_deadline_
.async_wait(
167 boost::bind(&tcp_session::check_deadline
,
168 shared_from_this(), &input_deadline_
));
172 output_deadline_
.async_wait(
173 boost::bind(&tcp_session::check_deadline
,
174 shared_from_this(), &output_deadline_
));
180 channel_
.leave(shared_from_this());
182 boost::system::error_code ignored_ec
;
183 socket_
.close(ignored_ec
);
184 input_deadline_
.cancel();
185 non_empty_output_queue_
.cancel();
186 output_deadline_
.cancel();
191 return !socket_
.is_open();
194 void deliver(const std::string
& msg
)
196 output_queue_
.push_back(msg
+ "\n");
198 // Signal that the output queue contains messages. Modifying the expiry
199 // will wake the output actor, if it is waiting on the timer.
200 non_empty_output_queue_
.expires_at(steady_timer::time_point::min());
205 // Set a deadline for the read operation.
206 input_deadline_
.expires_after(boost::asio::chrono::seconds(30));
208 // Start an asynchronous operation to read a newline-delimited message.
209 boost::asio::async_read_until(socket_
,
210 boost::asio::dynamic_buffer(input_buffer_
), '\n',
211 boost::bind(&tcp_session::handle_read
, shared_from_this(),
212 boost::placeholders::_1
, boost::placeholders::_2
));
215 void handle_read(const boost::system::error_code
& ec
, std::size_t n
)
222 // Extract the newline-delimited message from the buffer.
223 std::string
msg(input_buffer_
.substr(0, n
- 1));
224 input_buffer_
.erase(0, n
);
228 channel_
.deliver(msg
);
232 // We received a heartbeat message from the client. If there's nothing
233 // else being sent or ready to be sent, send a heartbeat right back.
234 if (output_queue_
.empty())
236 output_queue_
.push_back("\n");
238 // Signal that the output queue contains messages. Modifying the
239 // expiry will wake the output actor, if it is waiting on the timer.
240 non_empty_output_queue_
.expires_at(steady_timer::time_point::min());
257 if (output_queue_
.empty())
259 // There are no messages that are ready to be sent. The actor goes to
260 // sleep by waiting on the non_empty_output_queue_ timer. When a new
261 // message is added, the timer will be modified and the actor will wake.
262 non_empty_output_queue_
.expires_at(steady_timer::time_point::max());
263 non_empty_output_queue_
.async_wait(
264 boost::bind(&tcp_session::await_output
, shared_from_this()));
274 // Set a deadline for the write operation.
275 output_deadline_
.expires_after(boost::asio::chrono::seconds(30));
277 // Start an asynchronous operation to send a message.
278 boost::asio::async_write(socket_
,
279 boost::asio::buffer(output_queue_
.front()),
280 boost::bind(&tcp_session::handle_write
,
281 shared_from_this(), boost::placeholders::_1
));
284 void handle_write(const boost::system::error_code
& ec
)
291 output_queue_
.pop_front();
301 void check_deadline(steady_timer
* deadline
)
306 // Check whether the deadline has passed. We compare the deadline against
307 // the current time since a new asynchronous operation may have moved the
308 // deadline before this actor had a chance to run.
309 if (deadline
->expiry() <= steady_timer::clock_type::now())
311 // The deadline has passed. Stop the session. The other actors will
312 // terminate as soon as possible.
317 // Put the actor back to sleep.
318 deadline
->async_wait(
319 boost::bind(&tcp_session::check_deadline
,
320 shared_from_this(), deadline
));
326 std::string input_buffer_
;
327 steady_timer input_deadline_
;
328 std::deque
<std::string
> output_queue_
;
329 steady_timer non_empty_output_queue_
;
330 steady_timer output_deadline_
;
333 typedef boost::shared_ptr
<tcp_session
> tcp_session_ptr
;
335 //----------------------------------------------------------------------
337 class udp_broadcaster
341 udp_broadcaster(boost::asio::io_context
& io_context
,
342 const udp::endpoint
& broadcast_endpoint
)
343 : socket_(io_context
)
345 socket_
.connect(broadcast_endpoint
);
346 socket_
.set_option(udp::socket::broadcast(true));
350 void deliver(const std::string
& msg
)
352 boost::system::error_code ignored_ec
;
353 socket_
.send(boost::asio::buffer(msg
), 0, ignored_ec
);
359 //----------------------------------------------------------------------
364 server(boost::asio::io_context
& io_context
,
365 const tcp::endpoint
& listen_endpoint
,
366 const udp::endpoint
& broadcast_endpoint
)
367 : io_context_(io_context
),
368 acceptor_(io_context
, listen_endpoint
)
370 subscriber_ptr
bc(new udp_broadcaster(io_context_
, broadcast_endpoint
));
378 tcp_session_ptr
new_session(new tcp_session(io_context_
, channel_
));
380 acceptor_
.async_accept(new_session
->socket(),
381 boost::bind(&server::handle_accept
,
382 this, new_session
, boost::placeholders::_1
));
385 void handle_accept(tcp_session_ptr session
,
386 const boost::system::error_code
& ec
)
397 boost::asio::io_context
& io_context_
;
398 tcp::acceptor acceptor_
;
402 //----------------------------------------------------------------------
404 int main(int argc
, char* argv
[])
408 using namespace std
; // For atoi.
412 std::cerr
<< "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
416 boost::asio::io_context io_context
;
418 tcp::endpoint
listen_endpoint(tcp::v4(), atoi(argv
[1]));
420 udp::endpoint
broadcast_endpoint(
421 boost::asio::ip::make_address(argv
[2]), atoi(argv
[3]));
423 server
s(io_context
, listen_endpoint
, broadcast_endpoint
);
427 catch (std::exception
& e
)
429 std::cerr
<< "Exception: " << e
.what() << "\n";