5 // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
16 #include <boost/bind.hpp>
17 #include <boost/shared_ptr.hpp>
18 #include <boost/enable_shared_from_this.hpp>
19 #include <boost/asio/deadline_timer.hpp>
20 #include <boost/asio/io_context.hpp>
21 #include <boost/asio/ip/tcp.hpp>
22 #include <boost/asio/ip/udp.hpp>
23 #include <boost/asio/read_until.hpp>
24 #include <boost/asio/streambuf.hpp>
25 #include <boost/asio/write.hpp>
27 using boost::asio::deadline_timer
;
28 using boost::asio::ip::tcp
;
29 using boost::asio::ip::udp
;
31 //----------------------------------------------------------------------
36 virtual ~subscriber() {}
37 virtual void deliver(const std::string
& msg
) = 0;
40 typedef boost::shared_ptr
<subscriber
> subscriber_ptr
;
42 //----------------------------------------------------------------------
47 void join(subscriber_ptr subscriber
)
49 subscribers_
.insert(subscriber
);
52 void leave(subscriber_ptr subscriber
)
54 subscribers_
.erase(subscriber
);
57 void deliver(const std::string
& msg
)
59 std::for_each(subscribers_
.begin(), subscribers_
.end(),
60 boost::bind(&subscriber::deliver
, _1
, boost::ref(msg
)));
64 std::set
<subscriber_ptr
> subscribers_
;
67 //----------------------------------------------------------------------
70 // This class manages socket timeouts by applying the concept of a deadline.
71 // Some asynchronous operations are given deadlines by which they must complete.
72 // Deadlines are enforced by two "actors" that persist for the lifetime of the
73 // session object, one for input and one for output:
75 // +----------------+ +----------------+
77 // | check_deadline |<---+ | check_deadline |<---+
78 // | | | async_wait() | | | async_wait()
79 // +----------------+ | on input +----------------+ | on output
80 // | | deadline | | deadline
81 // +---------+ +---------+
83 // If either deadline actor determines that the corresponding deadline has
84 // expired, the socket is closed and any outstanding operations are cancelled.
86 // The input actor reads messages from the socket, where messages are delimited
87 // by the newline character:
91 // | start_read |<---+
95 // async_- | +-------------+
97 // until() +--->| handle_read |
101 // The deadline for receiving a complete message is 30 seconds. If a non-empty
102 // message is received, it is delivered to all subscribers. If a heartbeat (a
103 // message that consists of a single newline character) is received, a heartbeat
104 // is enqueued for the client, provided there are no other messages waiting to
107 // The output actor is responsible for sending messages to the client:
110 // | |<---------------------+
111 // | await_output | |
113 // +--------------+ | |
114 // | | | async_wait() |
117 // +-------------+ +--------------+
118 // | | async_write() | |
119 // | start_write |-------------->| handle_write |
121 // +-------------+ +--------------+
123 // The output actor first waits for an output message to be enqueued. It does
124 // this by using a deadline_timer as an asynchronous condition variable. The
125 // deadline_timer will be signalled whenever the output queue is non-empty.
127 // Once a message is available, it is sent to the client. The deadline for
128 // sending a complete message is 30 seconds. After the message is successfully
129 // sent, the output actor again waits for the output queue to become non-empty.
133 public boost::enable_shared_from_this
<tcp_session
>
136 tcp_session(boost::asio::io_context
& io_context
, channel
& ch
)
139 input_deadline_(io_context
),
140 non_empty_output_queue_(io_context
),
141 output_deadline_(io_context
)
143 input_deadline_
.expires_at(boost::posix_time::pos_infin
);
144 output_deadline_
.expires_at(boost::posix_time::pos_infin
);
146 // The non_empty_output_queue_ deadline_timer is set to pos_infin whenever
147 // the output queue is empty. This ensures that the output actor stays
148 // asleep until a message is put into the queue.
149 non_empty_output_queue_
.expires_at(boost::posix_time::pos_infin
);
152 tcp::socket
& socket()
157 // Called by the server object to initiate the four actors.
160 channel_
.join(shared_from_this());
164 input_deadline_
.async_wait(
165 boost::bind(&tcp_session::check_deadline
,
166 shared_from_this(), &input_deadline_
));
170 output_deadline_
.async_wait(
171 boost::bind(&tcp_session::check_deadline
,
172 shared_from_this(), &output_deadline_
));
178 channel_
.leave(shared_from_this());
180 boost::system::error_code ignored_ec
;
181 socket_
.close(ignored_ec
);
182 input_deadline_
.cancel();
183 non_empty_output_queue_
.cancel();
184 output_deadline_
.cancel();
189 return !socket_
.is_open();
192 void deliver(const std::string
& msg
)
194 output_queue_
.push_back(msg
+ "\n");
196 // Signal that the output queue contains messages. Modifying the expiry
197 // will wake the output actor, if it is waiting on the timer.
198 non_empty_output_queue_
.expires_at(boost::posix_time::neg_infin
);
203 // Set a deadline for the read operation.
204 input_deadline_
.expires_from_now(boost::posix_time::seconds(30));
206 // Start an asynchronous operation to read a newline-delimited message.
207 boost::asio::async_read_until(socket_
, input_buffer_
, '\n',
208 boost::bind(&tcp_session::handle_read
, shared_from_this(), _1
));
211 void handle_read(const boost::system::error_code
& ec
)
218 // Extract the newline-delimited message from the buffer.
220 std::istream
is(&input_buffer_
);
221 std::getline(is
, msg
);
225 channel_
.deliver(msg
);
229 // We received a heartbeat message from the client. If there's nothing
230 // else being sent or ready to be sent, send a heartbeat right back.
231 if (output_queue_
.empty())
233 output_queue_
.push_back("\n");
235 // Signal that the output queue contains messages. Modifying the
236 // expiry will wake the output actor, if it is waiting on the timer.
237 non_empty_output_queue_
.expires_at(boost::posix_time::neg_infin
);
254 if (output_queue_
.empty())
256 // There are no messages that are ready to be sent. The actor goes to
257 // sleep by waiting on the non_empty_output_queue_ timer. When a new
258 // message is added, the timer will be modified and the actor will wake.
259 non_empty_output_queue_
.expires_at(boost::posix_time::pos_infin
);
260 non_empty_output_queue_
.async_wait(
261 boost::bind(&tcp_session::await_output
, shared_from_this()));
271 // Set a deadline for the write operation.
272 output_deadline_
.expires_from_now(boost::posix_time::seconds(30));
274 // Start an asynchronous operation to send a message.
275 boost::asio::async_write(socket_
,
276 boost::asio::buffer(output_queue_
.front()),
277 boost::bind(&tcp_session::handle_write
, shared_from_this(), _1
));
280 void handle_write(const boost::system::error_code
& ec
)
287 output_queue_
.pop_front();
297 void check_deadline(deadline_timer
* deadline
)
302 // Check whether the deadline has passed. We compare the deadline against
303 // the current time since a new asynchronous operation may have moved the
304 // deadline before this actor had a chance to run.
305 if (deadline
->expires_at() <= deadline_timer::traits_type::now())
307 // The deadline has passed. Stop the session. The other actors will
308 // terminate as soon as possible.
313 // Put the actor back to sleep.
314 deadline
->async_wait(
315 boost::bind(&tcp_session::check_deadline
,
316 shared_from_this(), deadline
));
322 boost::asio::streambuf input_buffer_
;
323 deadline_timer input_deadline_
;
324 std::deque
<std::string
> output_queue_
;
325 deadline_timer non_empty_output_queue_
;
326 deadline_timer output_deadline_
;
329 typedef boost::shared_ptr
<tcp_session
> tcp_session_ptr
;
331 //----------------------------------------------------------------------
333 class udp_broadcaster
337 udp_broadcaster(boost::asio::io_context
& io_context
,
338 const udp::endpoint
& broadcast_endpoint
)
339 : socket_(io_context
)
341 socket_
.connect(broadcast_endpoint
);
345 void deliver(const std::string
& msg
)
347 boost::system::error_code ignored_ec
;
348 socket_
.send(boost::asio::buffer(msg
), 0, ignored_ec
);
354 //----------------------------------------------------------------------
359 server(boost::asio::io_context
& io_context
,
360 const tcp::endpoint
& listen_endpoint
,
361 const udp::endpoint
& broadcast_endpoint
)
362 : io_context_(io_context
),
363 acceptor_(io_context
, listen_endpoint
)
365 subscriber_ptr
bc(new udp_broadcaster(io_context_
, broadcast_endpoint
));
373 tcp_session_ptr
new_session(new tcp_session(io_context_
, channel_
));
375 acceptor_
.async_accept(new_session
->socket(),
376 boost::bind(&server::handle_accept
, this, new_session
, _1
));
379 void handle_accept(tcp_session_ptr session
,
380 const boost::system::error_code
& ec
)
391 boost::asio::io_context
& io_context_
;
392 tcp::acceptor acceptor_
;
396 //----------------------------------------------------------------------
398 int main(int argc
, char* argv
[])
402 using namespace std
; // For atoi.
406 std::cerr
<< "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
410 boost::asio::io_context io_context
;
412 tcp::endpoint
listen_endpoint(tcp::v4(), atoi(argv
[1]));
414 udp::endpoint
broadcast_endpoint(
415 boost::asio::ip::make_address(argv
[2]), atoi(argv
[3]));
417 server
s(io_context
, listen_endpoint
, broadcast_endpoint
);
421 catch (std::exception
& e
)
423 std::cerr
<< "Exception: " << e
.what() << "\n";