]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // server.cpp | |
3 | // ~~~~~~~~~~ | |
4 | // | |
1e59de90 | 5 | // Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
7c673cae FG |
6 | // |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #include <algorithm> | |
12 | #include <cstdlib> | |
13 | #include <deque> | |
14 | #include <iostream> | |
15 | #include <set> | |
11fdf7f2 | 16 | #include <string> |
f67539c2 | 17 | #include <boost/bind/bind.hpp> |
7c673cae FG |
18 | #include <boost/shared_ptr.hpp> |
19 | #include <boost/enable_shared_from_this.hpp> | |
11fdf7f2 | 20 | #include <boost/asio/buffer.hpp> |
b32b8144 | 21 | #include <boost/asio/io_context.hpp> |
7c673cae FG |
22 | #include <boost/asio/ip/tcp.hpp> |
23 | #include <boost/asio/ip/udp.hpp> | |
24 | #include <boost/asio/read_until.hpp> | |
11fdf7f2 | 25 | #include <boost/asio/steady_timer.hpp> |
7c673cae FG |
26 | #include <boost/asio/write.hpp> |
27 | ||
11fdf7f2 | 28 | using boost::asio::steady_timer; |
7c673cae FG |
29 | using boost::asio::ip::tcp; |
30 | using boost::asio::ip::udp; | |
31 | ||
32 | //---------------------------------------------------------------------- | |
33 | ||
34 | class subscriber | |
35 | { | |
36 | public: | |
37 | virtual ~subscriber() {} | |
38 | virtual void deliver(const std::string& msg) = 0; | |
39 | }; | |
40 | ||
41 | typedef boost::shared_ptr<subscriber> subscriber_ptr; | |
42 | ||
43 | //---------------------------------------------------------------------- | |
44 | ||
45 | class channel | |
46 | { | |
47 | public: | |
48 | void join(subscriber_ptr subscriber) | |
49 | { | |
50 | subscribers_.insert(subscriber); | |
51 | } | |
52 | ||
53 | void leave(subscriber_ptr subscriber) | |
54 | { | |
55 | subscribers_.erase(subscriber); | |
56 | } | |
57 | ||
58 | void deliver(const std::string& msg) | |
59 | { | |
60 | std::for_each(subscribers_.begin(), subscribers_.end(), | |
f67539c2 TL |
61 | boost::bind(&subscriber::deliver, |
62 | boost::placeholders::_1, boost::ref(msg))); | |
7c673cae FG |
63 | } |
64 | ||
65 | private: | |
66 | std::set<subscriber_ptr> subscribers_; | |
67 | }; | |
68 | ||
69 | //---------------------------------------------------------------------- | |
70 | ||
71 | // | |
72 | // This class manages socket timeouts by applying the concept of a deadline. | |
73 | // Some asynchronous operations are given deadlines by which they must complete. | |
74 | // Deadlines are enforced by two "actors" that persist for the lifetime of the | |
75 | // session object, one for input and one for output: | |
76 | // | |
77 | // +----------------+ +----------------+ | |
78 | // | | | | | |
79 | // | check_deadline |<---+ | check_deadline |<---+ | |
80 | // | | | async_wait() | | | async_wait() | |
81 | // +----------------+ | on input +----------------+ | on output | |
82 | // | | deadline | | deadline | |
83 | // +---------+ +---------+ | |
84 | // | |
85 | // If either deadline actor determines that the corresponding deadline has | |
86 | // expired, the socket is closed and any outstanding operations are cancelled. | |
87 | // | |
88 | // The input actor reads messages from the socket, where messages are delimited | |
89 | // by the newline character: | |
90 | // | |
91 | // +------------+ | |
92 | // | | | |
93 | // | start_read |<---+ | |
94 | // | | | | |
95 | // +------------+ | | |
96 | // | | | |
97 | // async_- | +-------------+ | |
98 | // read_- | | | | |
99 | // until() +--->| handle_read | | |
100 | // | | | |
101 | // +-------------+ | |
102 | // | |
103 | // The deadline for receiving a complete message is 30 seconds. If a non-empty | |
104 | // message is received, it is delivered to all subscribers. If a heartbeat (a | |
105 | // message that consists of a single newline character) is received, a heartbeat | |
106 | // is enqueued for the client, provided there are no other messages waiting to | |
107 | // be sent. | |
108 | // | |
109 | // The output actor is responsible for sending messages to the client: | |
110 | // | |
111 | // +--------------+ | |
112 | // | |<---------------------+ | |
113 | // | await_output | | | |
114 | // | |<---+ | | |
115 | // +--------------+ | | | |
116 | // | | | async_wait() | | |
117 | // | +--------+ | | |
118 | // V | | |
119 | // +-------------+ +--------------+ | |
120 | // | | async_write() | | | |
121 | // | start_write |-------------->| handle_write | | |
122 | // | | | | | |
123 | // +-------------+ +--------------+ | |
124 | // | |
125 | // The output actor first waits for an output message to be enqueued. It does | |
11fdf7f2 TL |
126 | // this by using a steady_timer as an asynchronous condition variable. The |
127 | // steady_timer will be signalled whenever the output queue is non-empty. | |
7c673cae FG |
128 | // |
129 | // Once a message is available, it is sent to the client. The deadline for | |
130 | // sending a complete message is 30 seconds. After the message is successfully | |
131 | // sent, the output actor again waits for the output queue to become non-empty. | |
132 | // | |
133 | class tcp_session | |
134 | : public subscriber, | |
135 | public boost::enable_shared_from_this<tcp_session> | |
136 | { | |
137 | public: | |
b32b8144 | 138 | tcp_session(boost::asio::io_context& io_context, channel& ch) |
7c673cae | 139 | : channel_(ch), |
b32b8144 FG |
140 | socket_(io_context), |
141 | input_deadline_(io_context), | |
142 | non_empty_output_queue_(io_context), | |
143 | output_deadline_(io_context) | |
7c673cae | 144 | { |
11fdf7f2 TL |
145 | input_deadline_.expires_at(steady_timer::time_point::max()); |
146 | output_deadline_.expires_at(steady_timer::time_point::max()); | |
7c673cae | 147 | |
11fdf7f2 TL |
148 | // The non_empty_output_queue_ steady_timer is set to the maximum time |
149 | // point whenever the output queue is empty. This ensures that the output | |
150 | // actor stays asleep until a message is put into the queue. | |
151 | non_empty_output_queue_.expires_at(steady_timer::time_point::max()); | |
7c673cae FG |
152 | } |
153 | ||
154 | tcp::socket& socket() | |
155 | { | |
156 | return socket_; | |
157 | } | |
158 | ||
159 | // Called by the server object to initiate the four actors. | |
160 | void start() | |
161 | { | |
162 | channel_.join(shared_from_this()); | |
163 | ||
164 | start_read(); | |
165 | ||
166 | input_deadline_.async_wait( | |
167 | boost::bind(&tcp_session::check_deadline, | |
168 | shared_from_this(), &input_deadline_)); | |
169 | ||
170 | await_output(); | |
171 | ||
172 | output_deadline_.async_wait( | |
173 | boost::bind(&tcp_session::check_deadline, | |
174 | shared_from_this(), &output_deadline_)); | |
175 | } | |
176 | ||
177 | private: | |
178 | void stop() | |
179 | { | |
180 | channel_.leave(shared_from_this()); | |
181 | ||
182 | boost::system::error_code ignored_ec; | |
183 | socket_.close(ignored_ec); | |
184 | input_deadline_.cancel(); | |
185 | non_empty_output_queue_.cancel(); | |
186 | output_deadline_.cancel(); | |
187 | } | |
188 | ||
189 | bool stopped() const | |
190 | { | |
191 | return !socket_.is_open(); | |
192 | } | |
193 | ||
194 | void deliver(const std::string& msg) | |
195 | { | |
196 | output_queue_.push_back(msg + "\n"); | |
197 | ||
198 | // Signal that the output queue contains messages. Modifying the expiry | |
199 | // will wake the output actor, if it is waiting on the timer. | |
11fdf7f2 | 200 | non_empty_output_queue_.expires_at(steady_timer::time_point::min()); |
7c673cae FG |
201 | } |
202 | ||
203 | void start_read() | |
204 | { | |
205 | // Set a deadline for the read operation. | |
11fdf7f2 | 206 | input_deadline_.expires_after(boost::asio::chrono::seconds(30)); |
7c673cae FG |
207 | |
208 | // Start an asynchronous operation to read a newline-delimited message. | |
11fdf7f2 TL |
209 | boost::asio::async_read_until(socket_, |
210 | boost::asio::dynamic_buffer(input_buffer_), '\n', | |
f67539c2 TL |
211 | boost::bind(&tcp_session::handle_read, shared_from_this(), |
212 | boost::placeholders::_1, boost::placeholders::_2)); | |
7c673cae FG |
213 | } |
214 | ||
11fdf7f2 | 215 | void handle_read(const boost::system::error_code& ec, std::size_t n) |
7c673cae FG |
216 | { |
217 | if (stopped()) | |
218 | return; | |
219 | ||
220 | if (!ec) | |
221 | { | |
222 | // Extract the newline-delimited message from the buffer. | |
11fdf7f2 TL |
223 | std::string msg(input_buffer_.substr(0, n - 1)); |
224 | input_buffer_.erase(0, n); | |
7c673cae FG |
225 | |
226 | if (!msg.empty()) | |
227 | { | |
228 | channel_.deliver(msg); | |
229 | } | |
230 | else | |
231 | { | |
232 | // We received a heartbeat message from the client. If there's nothing | |
233 | // else being sent or ready to be sent, send a heartbeat right back. | |
234 | if (output_queue_.empty()) | |
235 | { | |
236 | output_queue_.push_back("\n"); | |
237 | ||
238 | // Signal that the output queue contains messages. Modifying the | |
239 | // expiry will wake the output actor, if it is waiting on the timer. | |
11fdf7f2 | 240 | non_empty_output_queue_.expires_at(steady_timer::time_point::min()); |
7c673cae FG |
241 | } |
242 | } | |
243 | ||
244 | start_read(); | |
245 | } | |
246 | else | |
247 | { | |
248 | stop(); | |
249 | } | |
250 | } | |
251 | ||
252 | void await_output() | |
253 | { | |
254 | if (stopped()) | |
255 | return; | |
256 | ||
257 | if (output_queue_.empty()) | |
258 | { | |
259 | // There are no messages that are ready to be sent. The actor goes to | |
260 | // sleep by waiting on the non_empty_output_queue_ timer. When a new | |
261 | // message is added, the timer will be modified and the actor will wake. | |
11fdf7f2 | 262 | non_empty_output_queue_.expires_at(steady_timer::time_point::max()); |
7c673cae FG |
263 | non_empty_output_queue_.async_wait( |
264 | boost::bind(&tcp_session::await_output, shared_from_this())); | |
265 | } | |
266 | else | |
267 | { | |
268 | start_write(); | |
269 | } | |
270 | } | |
271 | ||
272 | void start_write() | |
273 | { | |
274 | // Set a deadline for the write operation. | |
11fdf7f2 | 275 | output_deadline_.expires_after(boost::asio::chrono::seconds(30)); |
7c673cae FG |
276 | |
277 | // Start an asynchronous operation to send a message. | |
278 | boost::asio::async_write(socket_, | |
279 | boost::asio::buffer(output_queue_.front()), | |
f67539c2 TL |
280 | boost::bind(&tcp_session::handle_write, |
281 | shared_from_this(), boost::placeholders::_1)); | |
7c673cae FG |
282 | } |
283 | ||
284 | void handle_write(const boost::system::error_code& ec) | |
285 | { | |
286 | if (stopped()) | |
287 | return; | |
288 | ||
289 | if (!ec) | |
290 | { | |
291 | output_queue_.pop_front(); | |
292 | ||
293 | await_output(); | |
294 | } | |
295 | else | |
296 | { | |
297 | stop(); | |
298 | } | |
299 | } | |
300 | ||
11fdf7f2 | 301 | void check_deadline(steady_timer* deadline) |
7c673cae FG |
302 | { |
303 | if (stopped()) | |
304 | return; | |
305 | ||
306 | // Check whether the deadline has passed. We compare the deadline against | |
307 | // the current time since a new asynchronous operation may have moved the | |
308 | // deadline before this actor had a chance to run. | |
11fdf7f2 | 309 | if (deadline->expiry() <= steady_timer::clock_type::now()) |
7c673cae FG |
310 | { |
311 | // The deadline has passed. Stop the session. The other actors will | |
312 | // terminate as soon as possible. | |
313 | stop(); | |
314 | } | |
315 | else | |
316 | { | |
317 | // Put the actor back to sleep. | |
318 | deadline->async_wait( | |
319 | boost::bind(&tcp_session::check_deadline, | |
320 | shared_from_this(), deadline)); | |
321 | } | |
322 | } | |
323 | ||
324 | channel& channel_; | |
325 | tcp::socket socket_; | |
11fdf7f2 TL |
326 | std::string input_buffer_; |
327 | steady_timer input_deadline_; | |
7c673cae | 328 | std::deque<std::string> output_queue_; |
11fdf7f2 TL |
329 | steady_timer non_empty_output_queue_; |
330 | steady_timer output_deadline_; | |
7c673cae FG |
331 | }; |
332 | ||
333 | typedef boost::shared_ptr<tcp_session> tcp_session_ptr; | |
334 | ||
335 | //---------------------------------------------------------------------- | |
336 | ||
337 | class udp_broadcaster | |
338 | : public subscriber | |
339 | { | |
340 | public: | |
b32b8144 | 341 | udp_broadcaster(boost::asio::io_context& io_context, |
7c673cae | 342 | const udp::endpoint& broadcast_endpoint) |
b32b8144 | 343 | : socket_(io_context) |
7c673cae FG |
344 | { |
345 | socket_.connect(broadcast_endpoint); | |
11fdf7f2 | 346 | socket_.set_option(udp::socket::broadcast(true)); |
7c673cae FG |
347 | } |
348 | ||
349 | private: | |
350 | void deliver(const std::string& msg) | |
351 | { | |
352 | boost::system::error_code ignored_ec; | |
353 | socket_.send(boost::asio::buffer(msg), 0, ignored_ec); | |
354 | } | |
355 | ||
356 | udp::socket socket_; | |
357 | }; | |
358 | ||
359 | //---------------------------------------------------------------------- | |
360 | ||
361 | class server | |
362 | { | |
363 | public: | |
b32b8144 | 364 | server(boost::asio::io_context& io_context, |
7c673cae FG |
365 | const tcp::endpoint& listen_endpoint, |
366 | const udp::endpoint& broadcast_endpoint) | |
b32b8144 FG |
367 | : io_context_(io_context), |
368 | acceptor_(io_context, listen_endpoint) | |
7c673cae | 369 | { |
b32b8144 | 370 | subscriber_ptr bc(new udp_broadcaster(io_context_, broadcast_endpoint)); |
7c673cae FG |
371 | channel_.join(bc); |
372 | ||
373 | start_accept(); | |
374 | } | |
375 | ||
376 | void start_accept() | |
377 | { | |
b32b8144 | 378 | tcp_session_ptr new_session(new tcp_session(io_context_, channel_)); |
7c673cae FG |
379 | |
380 | acceptor_.async_accept(new_session->socket(), | |
f67539c2 TL |
381 | boost::bind(&server::handle_accept, |
382 | this, new_session, boost::placeholders::_1)); | |
7c673cae FG |
383 | } |
384 | ||
385 | void handle_accept(tcp_session_ptr session, | |
386 | const boost::system::error_code& ec) | |
387 | { | |
388 | if (!ec) | |
389 | { | |
390 | session->start(); | |
391 | } | |
392 | ||
393 | start_accept(); | |
394 | } | |
395 | ||
396 | private: | |
b32b8144 | 397 | boost::asio::io_context& io_context_; |
7c673cae FG |
398 | tcp::acceptor acceptor_; |
399 | channel channel_; | |
400 | }; | |
401 | ||
402 | //---------------------------------------------------------------------- | |
403 | ||
404 | int main(int argc, char* argv[]) | |
405 | { | |
406 | try | |
407 | { | |
408 | using namespace std; // For atoi. | |
409 | ||
410 | if (argc != 4) | |
411 | { | |
412 | std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n"; | |
413 | return 1; | |
414 | } | |
415 | ||
b32b8144 | 416 | boost::asio::io_context io_context; |
7c673cae FG |
417 | |
418 | tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1])); | |
419 | ||
420 | udp::endpoint broadcast_endpoint( | |
b32b8144 | 421 | boost::asio::ip::make_address(argv[2]), atoi(argv[3])); |
7c673cae | 422 | |
b32b8144 | 423 | server s(io_context, listen_endpoint, broadcast_endpoint); |
7c673cae | 424 | |
b32b8144 | 425 | io_context.run(); |
7c673cae FG |
426 | } |
427 | catch (std::exception& e) | |
428 | { | |
429 | std::cerr << "Exception: " << e.what() << "\n"; | |
430 | } | |
431 | ||
432 | return 0; | |
433 | } |