5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
18 #include <boost/asio/buffer.hpp>
19 #include <boost/asio/io_context.hpp>
20 #include <boost/asio/ip/tcp.hpp>
21 #include <boost/asio/ip/udp.hpp>
22 #include <boost/asio/read_until.hpp>
23 #include <boost/asio/steady_timer.hpp>
24 #include <boost/asio/write.hpp>
26 using boost::asio::steady_timer
;
27 using boost::asio::ip::tcp
;
28 using boost::asio::ip::udp
;
30 //----------------------------------------------------------------------
35 virtual ~subscriber() = default;
36 virtual void deliver(const std::string
& msg
) = 0;
39 typedef std::shared_ptr
<subscriber
> subscriber_ptr
;
41 //----------------------------------------------------------------------
46 void join(subscriber_ptr subscriber
)
48 subscribers_
.insert(subscriber
);
51 void leave(subscriber_ptr subscriber
)
53 subscribers_
.erase(subscriber
);
56 void deliver(const std::string
& msg
)
58 for (const auto& s
: subscribers_
)
65 std::set
<subscriber_ptr
> subscribers_
;
68 //----------------------------------------------------------------------
71 // This class manages socket timeouts by applying the concept of a deadline.
72 // Some asynchronous operations are given deadlines by which they must complete.
73 // Deadlines are enforced by two "actors" that persist for the lifetime of the
74 // session object, one for input and one for output:
76 // +----------------+ +----------------+
78 // | check_deadline |<-------+ | check_deadline |<-------+
80 // +----------------+ | +----------------+ |
82 // async_wait() | +----------------+ async_wait() | +----------------+
83 // on input | | lambda | on output | | lambda |
84 // deadline +--->| in | deadline +--->| in |
85 // | check_deadline | | check_deadline |
86 // +----------------+ +----------------+
88 // If either deadline actor determines that the corresponding deadline has
89 // expired, the socket is closed and any outstanding operations are cancelled.
91 // The input actor reads messages from the socket, where messages are delimited
92 // by the newline character:
96 // | read_line |<----+
100 // async_- | +-------------+
101 // read_- | | lambda |
102 // until() +--->| in |
106 // The deadline for receiving a complete message is 30 seconds. If a non-empty
107 // message is received, it is delivered to all subscribers. If a heartbeat (a
108 // message that consists of a single newline character) is received, a heartbeat
109 // is enqueued for the client, provided there are no other messages waiting to
112 // The output actor is responsible for sending messages to the client:
114 // +----------------+
115 // | |<---------------------+
116 // | await_output | |
118 // +----------------+ | |
120 // | async_- | +----------------+ |
121 // | wait() | | lambda | |
123 // | | await_output | |
124 // | +----------------+ |
126 // +--------------+ +--------------+
127 // | | async_write() | lambda |
128 // | write_line |-------------->| in |
129 // | | | write_line |
130 // +--------------+ +--------------+
132 // The output actor first waits for an output message to be enqueued. It does
133 // this by using a steady_timer as an asynchronous condition variable. The
134 // steady_timer will be signalled whenever the output queue is non-empty.
136 // Once a message is available, it is sent to the client. The deadline for
137 // sending a complete message is 30 seconds. After the message is successfully
138 // sent, the output actor again waits for the output queue to become non-empty.
142 public std::enable_shared_from_this
<tcp_session
>
145 tcp_session(tcp::socket socket
, channel
& ch
)
147 socket_(std::move(socket
))
149 input_deadline_
.expires_at(steady_timer::time_point::max());
150 output_deadline_
.expires_at(steady_timer::time_point::max());
152 // The non_empty_output_queue_ steady_timer is set to the maximum time
153 // point whenever the output queue is empty. This ensures that the output
154 // actor stays asleep until a message is put into the queue.
155 non_empty_output_queue_
.expires_at(steady_timer::time_point::max());
158 // Called by the server object to initiate the four actors.
161 channel_
.join(shared_from_this());
164 check_deadline(input_deadline_
);
167 check_deadline(output_deadline_
);
173 channel_
.leave(shared_from_this());
175 boost::system::error_code ignored_error
;
176 socket_
.close(ignored_error
);
177 input_deadline_
.cancel();
178 non_empty_output_queue_
.cancel();
179 output_deadline_
.cancel();
184 return !socket_
.is_open();
187 void deliver(const std::string
& msg
) override
189 output_queue_
.push_back(msg
+ "\n");
191 // Signal that the output queue contains messages. Modifying the expiry
192 // will wake the output actor, if it is waiting on the timer.
193 non_empty_output_queue_
.expires_at(steady_timer::time_point::min());
198 // Set a deadline for the read operation.
199 input_deadline_
.expires_after(std::chrono::seconds(30));
201 // Start an asynchronous operation to read a newline-delimited message.
202 auto self(shared_from_this());
203 boost::asio::async_read_until(socket_
,
204 boost::asio::dynamic_buffer(input_buffer_
), '\n',
205 [this, self
](const boost::system::error_code
& error
, std::size_t n
)
207 // Check if the session was stopped while the operation was pending.
213 // Extract the newline-delimited message from the buffer.
214 std::string
msg(input_buffer_
.substr(0, n
- 1));
215 input_buffer_
.erase(0, n
);
219 channel_
.deliver(msg
);
224 // We received a heartbeat message from the client. If there's
225 // nothing else being sent or ready to be sent, send a heartbeat
227 if (output_queue_
.empty())
229 output_queue_
.push_back("\n");
231 // Signal that the output queue contains messages. Modifying
232 // the expiry will wake the output actor, if it is waiting on
234 non_empty_output_queue_
.expires_at(
235 steady_timer::time_point::min());
250 auto self(shared_from_this());
251 non_empty_output_queue_
.async_wait(
252 [this, self
](const boost::system::error_code
& /*error*/)
254 // Check if the session was stopped while the operation was pending.
258 if (output_queue_
.empty())
260 // There are no messages that are ready to be sent. The actor goes
261 // to sleep by waiting on the non_empty_output_queue_ timer. When a
262 // new message is added, the timer will be modified and the actor
264 non_empty_output_queue_
.expires_at(steady_timer::time_point::max());
276 // Set a deadline for the write operation.
277 output_deadline_
.expires_after(std::chrono::seconds(30));
279 // Start an asynchronous operation to send a message.
280 auto self(shared_from_this());
281 boost::asio::async_write(socket_
,
282 boost::asio::buffer(output_queue_
.front()),
283 [this, self
](const boost::system::error_code
& error
, std::size_t /*n*/)
285 // Check if the session was stopped while the operation was pending.
291 output_queue_
.pop_front();
302 void check_deadline(steady_timer
& deadline
)
304 auto self(shared_from_this());
306 [this, self
, &deadline
](const boost::system::error_code
& /*error*/)
308 // Check if the session was stopped while the operation was pending.
312 // Check whether the deadline has passed. We compare the deadline
313 // against the current time since a new asynchronous operation may
314 // have moved the deadline before this actor had a chance to run.
315 if (deadline
.expiry() <= steady_timer::clock_type::now())
317 // The deadline has passed. Stop the session. The other actors will
318 // terminate as soon as possible.
323 // Put the actor back to sleep.
324 check_deadline(deadline
);
331 std::string input_buffer_
;
332 steady_timer input_deadline_
{socket_
.get_executor()};
333 std::deque
<std::string
> output_queue_
;
334 steady_timer non_empty_output_queue_
{socket_
.get_executor()};
335 steady_timer output_deadline_
{socket_
.get_executor()};
338 typedef std::shared_ptr
<tcp_session
> tcp_session_ptr
;
340 //----------------------------------------------------------------------
342 class udp_broadcaster
346 udp_broadcaster(boost::asio::io_context
& io_context
,
347 const udp::endpoint
& broadcast_endpoint
)
348 : socket_(io_context
)
350 socket_
.connect(broadcast_endpoint
);
351 socket_
.set_option(udp::socket::broadcast(true));
355 void deliver(const std::string
& msg
)
357 boost::system::error_code ignored_error
;
358 socket_
.send(boost::asio::buffer(msg
), 0, ignored_error
);
364 //----------------------------------------------------------------------
369 server(boost::asio::io_context
& io_context
,
370 const tcp::endpoint
& listen_endpoint
,
371 const udp::endpoint
& broadcast_endpoint
)
372 : io_context_(io_context
),
373 acceptor_(io_context
, listen_endpoint
)
376 std::make_shared
<udp_broadcaster
>(
377 io_context_
, broadcast_endpoint
));
385 acceptor_
.async_accept(
386 [this](const boost::system::error_code
& error
, tcp::socket socket
)
390 std::make_shared
<tcp_session
>(std::move(socket
), channel_
)->start();
397 boost::asio::io_context
& io_context_
;
398 tcp::acceptor acceptor_
;
402 //----------------------------------------------------------------------
404 int main(int argc
, char* argv
[])
408 using namespace std
; // For atoi.
412 std::cerr
<< "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
416 boost::asio::io_context io_context
;
418 tcp::endpoint
listen_endpoint(tcp::v4(), atoi(argv
[1]));
420 udp::endpoint
broadcast_endpoint(
421 boost::asio::ip::make_address(argv
[2]), atoi(argv
[3]));
423 server
s(io_context
, listen_endpoint
, broadcast_endpoint
);
427 catch (std::exception
& e
)
429 std::cerr
<< "Exception: " << e
.what() << "\n";