]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/asio/example/cpp03/timeouts/server.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / asio / example / cpp03 / timeouts / server.cpp
1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <set>
16 #include <boost/bind.hpp>
17 #include <boost/shared_ptr.hpp>
18 #include <boost/enable_shared_from_this.hpp>
19 #include <boost/asio/deadline_timer.hpp>
20 #include <boost/asio/io_context.hpp>
21 #include <boost/asio/ip/tcp.hpp>
22 #include <boost/asio/ip/udp.hpp>
23 #include <boost/asio/read_until.hpp>
24 #include <boost/asio/streambuf.hpp>
25 #include <boost/asio/write.hpp>
26
27 using boost::asio::deadline_timer;
28 using boost::asio::ip::tcp;
29 using boost::asio::ip::udp;
30
31 //----------------------------------------------------------------------
32
33 class subscriber
34 {
35 public:
36 virtual ~subscriber() {}
37 virtual void deliver(const std::string& msg) = 0;
38 };
39
40 typedef boost::shared_ptr<subscriber> subscriber_ptr;
41
42 //----------------------------------------------------------------------
43
44 class channel
45 {
46 public:
47 void join(subscriber_ptr subscriber)
48 {
49 subscribers_.insert(subscriber);
50 }
51
52 void leave(subscriber_ptr subscriber)
53 {
54 subscribers_.erase(subscriber);
55 }
56
57 void deliver(const std::string& msg)
58 {
59 std::for_each(subscribers_.begin(), subscribers_.end(),
60 boost::bind(&subscriber::deliver, _1, boost::ref(msg)));
61 }
62
63 private:
64 std::set<subscriber_ptr> subscribers_;
65 };
66
67 //----------------------------------------------------------------------
68
69 //
70 // This class manages socket timeouts by applying the concept of a deadline.
71 // Some asynchronous operations are given deadlines by which they must complete.
72 // Deadlines are enforced by two "actors" that persist for the lifetime of the
73 // session object, one for input and one for output:
74 //
75 // +----------------+ +----------------+
76 // | | | |
77 // | check_deadline |<---+ | check_deadline |<---+
78 // | | | async_wait() | | | async_wait()
79 // +----------------+ | on input +----------------+ | on output
80 // | | deadline | | deadline
81 // +---------+ +---------+
82 //
83 // If either deadline actor determines that the corresponding deadline has
84 // expired, the socket is closed and any outstanding operations are cancelled.
85 //
86 // The input actor reads messages from the socket, where messages are delimited
87 // by the newline character:
88 //
89 // +------------+
90 // | |
91 // | start_read |<---+
92 // | | |
93 // +------------+ |
94 // | |
95 // async_- | +-------------+
96 // read_- | | |
97 // until() +--->| handle_read |
98 // | |
99 // +-------------+
100 //
101 // The deadline for receiving a complete message is 30 seconds. If a non-empty
102 // message is received, it is delivered to all subscribers. If a heartbeat (a
103 // message that consists of a single newline character) is received, a heartbeat
104 // is enqueued for the client, provided there are no other messages waiting to
105 // be sent.
106 //
107 // The output actor is responsible for sending messages to the client:
108 //
109 // +--------------+
110 // | |<---------------------+
111 // | await_output | |
112 // | |<---+ |
113 // +--------------+ | |
114 // | | | async_wait() |
115 // | +--------+ |
116 // V |
117 // +-------------+ +--------------+
118 // | | async_write() | |
119 // | start_write |-------------->| handle_write |
120 // | | | |
121 // +-------------+ +--------------+
122 //
123 // The output actor first waits for an output message to be enqueued. It does
124 // this by using a deadline_timer as an asynchronous condition variable. The
125 // deadline_timer will be signalled whenever the output queue is non-empty.
126 //
127 // Once a message is available, it is sent to the client. The deadline for
128 // sending a complete message is 30 seconds. After the message is successfully
129 // sent, the output actor again waits for the output queue to become non-empty.
130 //
131 class tcp_session
132 : public subscriber,
133 public boost::enable_shared_from_this<tcp_session>
134 {
135 public:
136 tcp_session(boost::asio::io_context& io_context, channel& ch)
137 : channel_(ch),
138 socket_(io_context),
139 input_deadline_(io_context),
140 non_empty_output_queue_(io_context),
141 output_deadline_(io_context)
142 {
143 input_deadline_.expires_at(boost::posix_time::pos_infin);
144 output_deadline_.expires_at(boost::posix_time::pos_infin);
145
146 // The non_empty_output_queue_ deadline_timer is set to pos_infin whenever
147 // the output queue is empty. This ensures that the output actor stays
148 // asleep until a message is put into the queue.
149 non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
150 }
151
152 tcp::socket& socket()
153 {
154 return socket_;
155 }
156
157 // Called by the server object to initiate the four actors.
158 void start()
159 {
160 channel_.join(shared_from_this());
161
162 start_read();
163
164 input_deadline_.async_wait(
165 boost::bind(&tcp_session::check_deadline,
166 shared_from_this(), &input_deadline_));
167
168 await_output();
169
170 output_deadline_.async_wait(
171 boost::bind(&tcp_session::check_deadline,
172 shared_from_this(), &output_deadline_));
173 }
174
175 private:
176 void stop()
177 {
178 channel_.leave(shared_from_this());
179
180 boost::system::error_code ignored_ec;
181 socket_.close(ignored_ec);
182 input_deadline_.cancel();
183 non_empty_output_queue_.cancel();
184 output_deadline_.cancel();
185 }
186
187 bool stopped() const
188 {
189 return !socket_.is_open();
190 }
191
192 void deliver(const std::string& msg)
193 {
194 output_queue_.push_back(msg + "\n");
195
196 // Signal that the output queue contains messages. Modifying the expiry
197 // will wake the output actor, if it is waiting on the timer.
198 non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
199 }
200
201 void start_read()
202 {
203 // Set a deadline for the read operation.
204 input_deadline_.expires_from_now(boost::posix_time::seconds(30));
205
206 // Start an asynchronous operation to read a newline-delimited message.
207 boost::asio::async_read_until(socket_, input_buffer_, '\n',
208 boost::bind(&tcp_session::handle_read, shared_from_this(), _1));
209 }
210
211 void handle_read(const boost::system::error_code& ec)
212 {
213 if (stopped())
214 return;
215
216 if (!ec)
217 {
218 // Extract the newline-delimited message from the buffer.
219 std::string msg;
220 std::istream is(&input_buffer_);
221 std::getline(is, msg);
222
223 if (!msg.empty())
224 {
225 channel_.deliver(msg);
226 }
227 else
228 {
229 // We received a heartbeat message from the client. If there's nothing
230 // else being sent or ready to be sent, send a heartbeat right back.
231 if (output_queue_.empty())
232 {
233 output_queue_.push_back("\n");
234
235 // Signal that the output queue contains messages. Modifying the
236 // expiry will wake the output actor, if it is waiting on the timer.
237 non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
238 }
239 }
240
241 start_read();
242 }
243 else
244 {
245 stop();
246 }
247 }
248
249 void await_output()
250 {
251 if (stopped())
252 return;
253
254 if (output_queue_.empty())
255 {
256 // There are no messages that are ready to be sent. The actor goes to
257 // sleep by waiting on the non_empty_output_queue_ timer. When a new
258 // message is added, the timer will be modified and the actor will wake.
259 non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
260 non_empty_output_queue_.async_wait(
261 boost::bind(&tcp_session::await_output, shared_from_this()));
262 }
263 else
264 {
265 start_write();
266 }
267 }
268
269 void start_write()
270 {
271 // Set a deadline for the write operation.
272 output_deadline_.expires_from_now(boost::posix_time::seconds(30));
273
274 // Start an asynchronous operation to send a message.
275 boost::asio::async_write(socket_,
276 boost::asio::buffer(output_queue_.front()),
277 boost::bind(&tcp_session::handle_write, shared_from_this(), _1));
278 }
279
280 void handle_write(const boost::system::error_code& ec)
281 {
282 if (stopped())
283 return;
284
285 if (!ec)
286 {
287 output_queue_.pop_front();
288
289 await_output();
290 }
291 else
292 {
293 stop();
294 }
295 }
296
297 void check_deadline(deadline_timer* deadline)
298 {
299 if (stopped())
300 return;
301
302 // Check whether the deadline has passed. We compare the deadline against
303 // the current time since a new asynchronous operation may have moved the
304 // deadline before this actor had a chance to run.
305 if (deadline->expires_at() <= deadline_timer::traits_type::now())
306 {
307 // The deadline has passed. Stop the session. The other actors will
308 // terminate as soon as possible.
309 stop();
310 }
311 else
312 {
313 // Put the actor back to sleep.
314 deadline->async_wait(
315 boost::bind(&tcp_session::check_deadline,
316 shared_from_this(), deadline));
317 }
318 }
319
320 channel& channel_;
321 tcp::socket socket_;
322 boost::asio::streambuf input_buffer_;
323 deadline_timer input_deadline_;
324 std::deque<std::string> output_queue_;
325 deadline_timer non_empty_output_queue_;
326 deadline_timer output_deadline_;
327 };
328
329 typedef boost::shared_ptr<tcp_session> tcp_session_ptr;
330
331 //----------------------------------------------------------------------
332
333 class udp_broadcaster
334 : public subscriber
335 {
336 public:
337 udp_broadcaster(boost::asio::io_context& io_context,
338 const udp::endpoint& broadcast_endpoint)
339 : socket_(io_context)
340 {
341 socket_.connect(broadcast_endpoint);
342 }
343
344 private:
345 void deliver(const std::string& msg)
346 {
347 boost::system::error_code ignored_ec;
348 socket_.send(boost::asio::buffer(msg), 0, ignored_ec);
349 }
350
351 udp::socket socket_;
352 };
353
354 //----------------------------------------------------------------------
355
356 class server
357 {
358 public:
359 server(boost::asio::io_context& io_context,
360 const tcp::endpoint& listen_endpoint,
361 const udp::endpoint& broadcast_endpoint)
362 : io_context_(io_context),
363 acceptor_(io_context, listen_endpoint)
364 {
365 subscriber_ptr bc(new udp_broadcaster(io_context_, broadcast_endpoint));
366 channel_.join(bc);
367
368 start_accept();
369 }
370
371 void start_accept()
372 {
373 tcp_session_ptr new_session(new tcp_session(io_context_, channel_));
374
375 acceptor_.async_accept(new_session->socket(),
376 boost::bind(&server::handle_accept, this, new_session, _1));
377 }
378
379 void handle_accept(tcp_session_ptr session,
380 const boost::system::error_code& ec)
381 {
382 if (!ec)
383 {
384 session->start();
385 }
386
387 start_accept();
388 }
389
390 private:
391 boost::asio::io_context& io_context_;
392 tcp::acceptor acceptor_;
393 channel channel_;
394 };
395
396 //----------------------------------------------------------------------
397
398 int main(int argc, char* argv[])
399 {
400 try
401 {
402 using namespace std; // For atoi.
403
404 if (argc != 4)
405 {
406 std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
407 return 1;
408 }
409
410 boost::asio::io_context io_context;
411
412 tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
413
414 udp::endpoint broadcast_endpoint(
415 boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
416
417 server s(io_context, listen_endpoint, broadcast_endpoint);
418
419 io_context.run();
420 }
421 catch (std::exception& e)
422 {
423 std::cerr << "Exception: " << e.what() << "\n";
424 }
425
426 return 0;
427 }