]>
Commit | Line | Data |
---|---|---|
1 | // | |
2 | // server.cpp | |
3 | // ~~~~~~~~~~ | |
4 | // | |
5 | // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
6 | // | |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
9 | // | |
10 | ||
11 | #include <algorithm> | |
12 | #include <cstdlib> | |
13 | #include <deque> | |
14 | #include <iostream> | |
15 | #include <set> | |
16 | #include <boost/bind.hpp> | |
17 | #include <boost/shared_ptr.hpp> | |
18 | #include <boost/enable_shared_from_this.hpp> | |
19 | #include <boost/asio/deadline_timer.hpp> | |
20 | #include <boost/asio/io_context.hpp> | |
21 | #include <boost/asio/ip/tcp.hpp> | |
22 | #include <boost/asio/ip/udp.hpp> | |
23 | #include <boost/asio/read_until.hpp> | |
24 | #include <boost/asio/streambuf.hpp> | |
25 | #include <boost/asio/write.hpp> | |
26 | ||
27 | using boost::asio::deadline_timer; | |
28 | using boost::asio::ip::tcp; | |
29 | using boost::asio::ip::udp; | |
30 | ||
31 | //---------------------------------------------------------------------- | |
32 | ||
33 | class subscriber | |
34 | { | |
35 | public: | |
36 | virtual ~subscriber() {} | |
37 | virtual void deliver(const std::string& msg) = 0; | |
38 | }; | |
39 | ||
40 | typedef boost::shared_ptr<subscriber> subscriber_ptr; | |
41 | ||
42 | //---------------------------------------------------------------------- | |
43 | ||
44 | class channel | |
45 | { | |
46 | public: | |
47 | void join(subscriber_ptr subscriber) | |
48 | { | |
49 | subscribers_.insert(subscriber); | |
50 | } | |
51 | ||
52 | void leave(subscriber_ptr subscriber) | |
53 | { | |
54 | subscribers_.erase(subscriber); | |
55 | } | |
56 | ||
57 | void deliver(const std::string& msg) | |
58 | { | |
59 | std::for_each(subscribers_.begin(), subscribers_.end(), | |
60 | boost::bind(&subscriber::deliver, _1, boost::ref(msg))); | |
61 | } | |
62 | ||
63 | private: | |
64 | std::set<subscriber_ptr> subscribers_; | |
65 | }; | |
66 | ||
67 | //---------------------------------------------------------------------- | |
68 | ||
69 | // | |
70 | // This class manages socket timeouts by applying the concept of a deadline. | |
71 | // Some asynchronous operations are given deadlines by which they must complete. | |
72 | // Deadlines are enforced by two "actors" that persist for the lifetime of the | |
73 | // session object, one for input and one for output: | |
74 | // | |
75 | // +----------------+ +----------------+ | |
76 | // | | | | | |
77 | // | check_deadline |<---+ | check_deadline |<---+ | |
78 | // | | | async_wait() | | | async_wait() | |
79 | // +----------------+ | on input +----------------+ | on output | |
80 | // | | deadline | | deadline | |
81 | // +---------+ +---------+ | |
82 | // | |
83 | // If either deadline actor determines that the corresponding deadline has | |
84 | // expired, the socket is closed and any outstanding operations are cancelled. | |
85 | // | |
86 | // The input actor reads messages from the socket, where messages are delimited | |
87 | // by the newline character: | |
88 | // | |
89 | // +------------+ | |
90 | // | | | |
91 | // | start_read |<---+ | |
92 | // | | | | |
93 | // +------------+ | | |
94 | // | | | |
95 | // async_- | +-------------+ | |
96 | // read_- | | | | |
97 | // until() +--->| handle_read | | |
98 | // | | | |
99 | // +-------------+ | |
100 | // | |
101 | // The deadline for receiving a complete message is 30 seconds. If a non-empty | |
102 | // message is received, it is delivered to all subscribers. If a heartbeat (a | |
103 | // message that consists of a single newline character) is received, a heartbeat | |
104 | // is enqueued for the client, provided there are no other messages waiting to | |
105 | // be sent. | |
106 | // | |
107 | // The output actor is responsible for sending messages to the client: | |
108 | // | |
109 | // +--------------+ | |
110 | // | |<---------------------+ | |
111 | // | await_output | | | |
112 | // | |<---+ | | |
113 | // +--------------+ | | | |
114 | // | | | async_wait() | | |
115 | // | +--------+ | | |
116 | // V | | |
117 | // +-------------+ +--------------+ | |
118 | // | | async_write() | | | |
119 | // | start_write |-------------->| handle_write | | |
120 | // | | | | | |
121 | // +-------------+ +--------------+ | |
122 | // | |
123 | // The output actor first waits for an output message to be enqueued. It does | |
124 | // this by using a deadline_timer as an asynchronous condition variable. The | |
125 | // deadline_timer will be signalled whenever the output queue is non-empty. | |
126 | // | |
127 | // Once a message is available, it is sent to the client. The deadline for | |
128 | // sending a complete message is 30 seconds. After the message is successfully | |
129 | // sent, the output actor again waits for the output queue to become non-empty. | |
130 | // | |
131 | class tcp_session | |
132 | : public subscriber, | |
133 | public boost::enable_shared_from_this<tcp_session> | |
134 | { | |
135 | public: | |
136 | tcp_session(boost::asio::io_context& io_context, channel& ch) | |
137 | : channel_(ch), | |
138 | socket_(io_context), | |
139 | input_deadline_(io_context), | |
140 | non_empty_output_queue_(io_context), | |
141 | output_deadline_(io_context) | |
142 | { | |
143 | input_deadline_.expires_at(boost::posix_time::pos_infin); | |
144 | output_deadline_.expires_at(boost::posix_time::pos_infin); | |
145 | ||
146 | // The non_empty_output_queue_ deadline_timer is set to pos_infin whenever | |
147 | // the output queue is empty. This ensures that the output actor stays | |
148 | // asleep until a message is put into the queue. | |
149 | non_empty_output_queue_.expires_at(boost::posix_time::pos_infin); | |
150 | } | |
151 | ||
152 | tcp::socket& socket() | |
153 | { | |
154 | return socket_; | |
155 | } | |
156 | ||
157 | // Called by the server object to initiate the four actors. | |
158 | void start() | |
159 | { | |
160 | channel_.join(shared_from_this()); | |
161 | ||
162 | start_read(); | |
163 | ||
164 | input_deadline_.async_wait( | |
165 | boost::bind(&tcp_session::check_deadline, | |
166 | shared_from_this(), &input_deadline_)); | |
167 | ||
168 | await_output(); | |
169 | ||
170 | output_deadline_.async_wait( | |
171 | boost::bind(&tcp_session::check_deadline, | |
172 | shared_from_this(), &output_deadline_)); | |
173 | } | |
174 | ||
175 | private: | |
176 | void stop() | |
177 | { | |
178 | channel_.leave(shared_from_this()); | |
179 | ||
180 | boost::system::error_code ignored_ec; | |
181 | socket_.close(ignored_ec); | |
182 | input_deadline_.cancel(); | |
183 | non_empty_output_queue_.cancel(); | |
184 | output_deadline_.cancel(); | |
185 | } | |
186 | ||
187 | bool stopped() const | |
188 | { | |
189 | return !socket_.is_open(); | |
190 | } | |
191 | ||
192 | void deliver(const std::string& msg) | |
193 | { | |
194 | output_queue_.push_back(msg + "\n"); | |
195 | ||
196 | // Signal that the output queue contains messages. Modifying the expiry | |
197 | // will wake the output actor, if it is waiting on the timer. | |
198 | non_empty_output_queue_.expires_at(boost::posix_time::neg_infin); | |
199 | } | |
200 | ||
201 | void start_read() | |
202 | { | |
203 | // Set a deadline for the read operation. | |
204 | input_deadline_.expires_from_now(boost::posix_time::seconds(30)); | |
205 | ||
206 | // Start an asynchronous operation to read a newline-delimited message. | |
207 | boost::asio::async_read_until(socket_, input_buffer_, '\n', | |
208 | boost::bind(&tcp_session::handle_read, shared_from_this(), _1)); | |
209 | } | |
210 | ||
211 | void handle_read(const boost::system::error_code& ec) | |
212 | { | |
213 | if (stopped()) | |
214 | return; | |
215 | ||
216 | if (!ec) | |
217 | { | |
218 | // Extract the newline-delimited message from the buffer. | |
219 | std::string msg; | |
220 | std::istream is(&input_buffer_); | |
221 | std::getline(is, msg); | |
222 | ||
223 | if (!msg.empty()) | |
224 | { | |
225 | channel_.deliver(msg); | |
226 | } | |
227 | else | |
228 | { | |
229 | // We received a heartbeat message from the client. If there's nothing | |
230 | // else being sent or ready to be sent, send a heartbeat right back. | |
231 | if (output_queue_.empty()) | |
232 | { | |
233 | output_queue_.push_back("\n"); | |
234 | ||
235 | // Signal that the output queue contains messages. Modifying the | |
236 | // expiry will wake the output actor, if it is waiting on the timer. | |
237 | non_empty_output_queue_.expires_at(boost::posix_time::neg_infin); | |
238 | } | |
239 | } | |
240 | ||
241 | start_read(); | |
242 | } | |
243 | else | |
244 | { | |
245 | stop(); | |
246 | } | |
247 | } | |
248 | ||
249 | void await_output() | |
250 | { | |
251 | if (stopped()) | |
252 | return; | |
253 | ||
254 | if (output_queue_.empty()) | |
255 | { | |
256 | // There are no messages that are ready to be sent. The actor goes to | |
257 | // sleep by waiting on the non_empty_output_queue_ timer. When a new | |
258 | // message is added, the timer will be modified and the actor will wake. | |
259 | non_empty_output_queue_.expires_at(boost::posix_time::pos_infin); | |
260 | non_empty_output_queue_.async_wait( | |
261 | boost::bind(&tcp_session::await_output, shared_from_this())); | |
262 | } | |
263 | else | |
264 | { | |
265 | start_write(); | |
266 | } | |
267 | } | |
268 | ||
269 | void start_write() | |
270 | { | |
271 | // Set a deadline for the write operation. | |
272 | output_deadline_.expires_from_now(boost::posix_time::seconds(30)); | |
273 | ||
274 | // Start an asynchronous operation to send a message. | |
275 | boost::asio::async_write(socket_, | |
276 | boost::asio::buffer(output_queue_.front()), | |
277 | boost::bind(&tcp_session::handle_write, shared_from_this(), _1)); | |
278 | } | |
279 | ||
280 | void handle_write(const boost::system::error_code& ec) | |
281 | { | |
282 | if (stopped()) | |
283 | return; | |
284 | ||
285 | if (!ec) | |
286 | { | |
287 | output_queue_.pop_front(); | |
288 | ||
289 | await_output(); | |
290 | } | |
291 | else | |
292 | { | |
293 | stop(); | |
294 | } | |
295 | } | |
296 | ||
297 | void check_deadline(deadline_timer* deadline) | |
298 | { | |
299 | if (stopped()) | |
300 | return; | |
301 | ||
302 | // Check whether the deadline has passed. We compare the deadline against | |
303 | // the current time since a new asynchronous operation may have moved the | |
304 | // deadline before this actor had a chance to run. | |
305 | if (deadline->expires_at() <= deadline_timer::traits_type::now()) | |
306 | { | |
307 | // The deadline has passed. Stop the session. The other actors will | |
308 | // terminate as soon as possible. | |
309 | stop(); | |
310 | } | |
311 | else | |
312 | { | |
313 | // Put the actor back to sleep. | |
314 | deadline->async_wait( | |
315 | boost::bind(&tcp_session::check_deadline, | |
316 | shared_from_this(), deadline)); | |
317 | } | |
318 | } | |
319 | ||
320 | channel& channel_; | |
321 | tcp::socket socket_; | |
322 | boost::asio::streambuf input_buffer_; | |
323 | deadline_timer input_deadline_; | |
324 | std::deque<std::string> output_queue_; | |
325 | deadline_timer non_empty_output_queue_; | |
326 | deadline_timer output_deadline_; | |
327 | }; | |
328 | ||
329 | typedef boost::shared_ptr<tcp_session> tcp_session_ptr; | |
330 | ||
331 | //---------------------------------------------------------------------- | |
332 | ||
333 | class udp_broadcaster | |
334 | : public subscriber | |
335 | { | |
336 | public: | |
337 | udp_broadcaster(boost::asio::io_context& io_context, | |
338 | const udp::endpoint& broadcast_endpoint) | |
339 | : socket_(io_context) | |
340 | { | |
341 | socket_.connect(broadcast_endpoint); | |
342 | } | |
343 | ||
344 | private: | |
345 | void deliver(const std::string& msg) | |
346 | { | |
347 | boost::system::error_code ignored_ec; | |
348 | socket_.send(boost::asio::buffer(msg), 0, ignored_ec); | |
349 | } | |
350 | ||
351 | udp::socket socket_; | |
352 | }; | |
353 | ||
354 | //---------------------------------------------------------------------- | |
355 | ||
356 | class server | |
357 | { | |
358 | public: | |
359 | server(boost::asio::io_context& io_context, | |
360 | const tcp::endpoint& listen_endpoint, | |
361 | const udp::endpoint& broadcast_endpoint) | |
362 | : io_context_(io_context), | |
363 | acceptor_(io_context, listen_endpoint) | |
364 | { | |
365 | subscriber_ptr bc(new udp_broadcaster(io_context_, broadcast_endpoint)); | |
366 | channel_.join(bc); | |
367 | ||
368 | start_accept(); | |
369 | } | |
370 | ||
371 | void start_accept() | |
372 | { | |
373 | tcp_session_ptr new_session(new tcp_session(io_context_, channel_)); | |
374 | ||
375 | acceptor_.async_accept(new_session->socket(), | |
376 | boost::bind(&server::handle_accept, this, new_session, _1)); | |
377 | } | |
378 | ||
379 | void handle_accept(tcp_session_ptr session, | |
380 | const boost::system::error_code& ec) | |
381 | { | |
382 | if (!ec) | |
383 | { | |
384 | session->start(); | |
385 | } | |
386 | ||
387 | start_accept(); | |
388 | } | |
389 | ||
390 | private: | |
391 | boost::asio::io_context& io_context_; | |
392 | tcp::acceptor acceptor_; | |
393 | channel channel_; | |
394 | }; | |
395 | ||
396 | //---------------------------------------------------------------------- | |
397 | ||
398 | int main(int argc, char* argv[]) | |
399 | { | |
400 | try | |
401 | { | |
402 | using namespace std; // For atoi. | |
403 | ||
404 | if (argc != 4) | |
405 | { | |
406 | std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n"; | |
407 | return 1; | |
408 | } | |
409 | ||
410 | boost::asio::io_context io_context; | |
411 | ||
412 | tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1])); | |
413 | ||
414 | udp::endpoint broadcast_endpoint( | |
415 | boost::asio::ip::make_address(argv[2]), atoi(argv[3])); | |
416 | ||
417 | server s(io_context, listen_endpoint, broadcast_endpoint); | |
418 | ||
419 | io_context.run(); | |
420 | } | |
421 | catch (std::exception& e) | |
422 | { | |
423 | std::cerr << "Exception: " << e.what() << "\n"; | |
424 | } | |
425 | ||
426 | return 0; | |
427 | } |