]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/asio/example/cpp03/timeouts/server.cpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / libs / asio / example / cpp03 / timeouts / server.cpp
CommitLineData
7c673cae
FG
1//
2// server.cpp
3// ~~~~~~~~~~
4//
1e59de90 5// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7c673cae
FG
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#include <algorithm>
12#include <cstdlib>
13#include <deque>
14#include <iostream>
15#include <set>
11fdf7f2 16#include <string>
f67539c2 17#include <boost/bind/bind.hpp>
7c673cae
FG
18#include <boost/shared_ptr.hpp>
19#include <boost/enable_shared_from_this.hpp>
11fdf7f2 20#include <boost/asio/buffer.hpp>
b32b8144 21#include <boost/asio/io_context.hpp>
7c673cae
FG
22#include <boost/asio/ip/tcp.hpp>
23#include <boost/asio/ip/udp.hpp>
24#include <boost/asio/read_until.hpp>
11fdf7f2 25#include <boost/asio/steady_timer.hpp>
7c673cae
FG
26#include <boost/asio/write.hpp>
27
11fdf7f2 28using boost::asio::steady_timer;
7c673cae
FG
29using boost::asio::ip::tcp;
30using boost::asio::ip::udp;
31
32//----------------------------------------------------------------------
33
34class subscriber
35{
36public:
37 virtual ~subscriber() {}
38 virtual void deliver(const std::string& msg) = 0;
39};
40
41typedef boost::shared_ptr<subscriber> subscriber_ptr;
42
43//----------------------------------------------------------------------
44
45class channel
46{
47public:
48 void join(subscriber_ptr subscriber)
49 {
50 subscribers_.insert(subscriber);
51 }
52
53 void leave(subscriber_ptr subscriber)
54 {
55 subscribers_.erase(subscriber);
56 }
57
58 void deliver(const std::string& msg)
59 {
60 std::for_each(subscribers_.begin(), subscribers_.end(),
f67539c2
TL
61 boost::bind(&subscriber::deliver,
62 boost::placeholders::_1, boost::ref(msg)));
7c673cae
FG
63 }
64
65private:
66 std::set<subscriber_ptr> subscribers_;
67};
68
69//----------------------------------------------------------------------
70
71//
72// This class manages socket timeouts by applying the concept of a deadline.
73// Some asynchronous operations are given deadlines by which they must complete.
74// Deadlines are enforced by two "actors" that persist for the lifetime of the
75// session object, one for input and one for output:
76//
77// +----------------+ +----------------+
78// | | | |
79// | check_deadline |<---+ | check_deadline |<---+
80// | | | async_wait() | | | async_wait()
81// +----------------+ | on input +----------------+ | on output
82// | | deadline | | deadline
83// +---------+ +---------+
84//
85// If either deadline actor determines that the corresponding deadline has
86// expired, the socket is closed and any outstanding operations are cancelled.
87//
88// The input actor reads messages from the socket, where messages are delimited
89// by the newline character:
90//
91// +------------+
92// | |
93// | start_read |<---+
94// | | |
95// +------------+ |
96// | |
97// async_- | +-------------+
98// read_- | | |
99// until() +--->| handle_read |
100// | |
101// +-------------+
102//
103// The deadline for receiving a complete message is 30 seconds. If a non-empty
104// message is received, it is delivered to all subscribers. If a heartbeat (a
105// message that consists of a single newline character) is received, a heartbeat
106// is enqueued for the client, provided there are no other messages waiting to
107// be sent.
108//
109// The output actor is responsible for sending messages to the client:
110//
111// +--------------+
112// | |<---------------------+
113// | await_output | |
114// | |<---+ |
115// +--------------+ | |
116// | | | async_wait() |
117// | +--------+ |
118// V |
119// +-------------+ +--------------+
120// | | async_write() | |
121// | start_write |-------------->| handle_write |
122// | | | |
123// +-------------+ +--------------+
124//
125// The output actor first waits for an output message to be enqueued. It does
11fdf7f2
TL
126// this by using a steady_timer as an asynchronous condition variable. The
127// steady_timer will be signalled whenever the output queue is non-empty.
7c673cae
FG
128//
129// Once a message is available, it is sent to the client. The deadline for
130// sending a complete message is 30 seconds. After the message is successfully
131// sent, the output actor again waits for the output queue to become non-empty.
132//
133class tcp_session
134 : public subscriber,
135 public boost::enable_shared_from_this<tcp_session>
136{
137public:
b32b8144 138 tcp_session(boost::asio::io_context& io_context, channel& ch)
7c673cae 139 : channel_(ch),
b32b8144
FG
140 socket_(io_context),
141 input_deadline_(io_context),
142 non_empty_output_queue_(io_context),
143 output_deadline_(io_context)
7c673cae 144 {
11fdf7f2
TL
145 input_deadline_.expires_at(steady_timer::time_point::max());
146 output_deadline_.expires_at(steady_timer::time_point::max());
7c673cae 147
11fdf7f2
TL
148 // The non_empty_output_queue_ steady_timer is set to the maximum time
149 // point whenever the output queue is empty. This ensures that the output
150 // actor stays asleep until a message is put into the queue.
151 non_empty_output_queue_.expires_at(steady_timer::time_point::max());
7c673cae
FG
152 }
153
154 tcp::socket& socket()
155 {
156 return socket_;
157 }
158
159 // Called by the server object to initiate the four actors.
160 void start()
161 {
162 channel_.join(shared_from_this());
163
164 start_read();
165
166 input_deadline_.async_wait(
167 boost::bind(&tcp_session::check_deadline,
168 shared_from_this(), &input_deadline_));
169
170 await_output();
171
172 output_deadline_.async_wait(
173 boost::bind(&tcp_session::check_deadline,
174 shared_from_this(), &output_deadline_));
175 }
176
177private:
178 void stop()
179 {
180 channel_.leave(shared_from_this());
181
182 boost::system::error_code ignored_ec;
183 socket_.close(ignored_ec);
184 input_deadline_.cancel();
185 non_empty_output_queue_.cancel();
186 output_deadline_.cancel();
187 }
188
189 bool stopped() const
190 {
191 return !socket_.is_open();
192 }
193
194 void deliver(const std::string& msg)
195 {
196 output_queue_.push_back(msg + "\n");
197
198 // Signal that the output queue contains messages. Modifying the expiry
199 // will wake the output actor, if it is waiting on the timer.
11fdf7f2 200 non_empty_output_queue_.expires_at(steady_timer::time_point::min());
7c673cae
FG
201 }
202
203 void start_read()
204 {
205 // Set a deadline for the read operation.
11fdf7f2 206 input_deadline_.expires_after(boost::asio::chrono::seconds(30));
7c673cae
FG
207
208 // Start an asynchronous operation to read a newline-delimited message.
11fdf7f2
TL
209 boost::asio::async_read_until(socket_,
210 boost::asio::dynamic_buffer(input_buffer_), '\n',
f67539c2
TL
211 boost::bind(&tcp_session::handle_read, shared_from_this(),
212 boost::placeholders::_1, boost::placeholders::_2));
7c673cae
FG
213 }
214
11fdf7f2 215 void handle_read(const boost::system::error_code& ec, std::size_t n)
7c673cae
FG
216 {
217 if (stopped())
218 return;
219
220 if (!ec)
221 {
222 // Extract the newline-delimited message from the buffer.
11fdf7f2
TL
223 std::string msg(input_buffer_.substr(0, n - 1));
224 input_buffer_.erase(0, n);
7c673cae
FG
225
226 if (!msg.empty())
227 {
228 channel_.deliver(msg);
229 }
230 else
231 {
232 // We received a heartbeat message from the client. If there's nothing
233 // else being sent or ready to be sent, send a heartbeat right back.
234 if (output_queue_.empty())
235 {
236 output_queue_.push_back("\n");
237
238 // Signal that the output queue contains messages. Modifying the
239 // expiry will wake the output actor, if it is waiting on the timer.
11fdf7f2 240 non_empty_output_queue_.expires_at(steady_timer::time_point::min());
7c673cae
FG
241 }
242 }
243
244 start_read();
245 }
246 else
247 {
248 stop();
249 }
250 }
251
252 void await_output()
253 {
254 if (stopped())
255 return;
256
257 if (output_queue_.empty())
258 {
259 // There are no messages that are ready to be sent. The actor goes to
260 // sleep by waiting on the non_empty_output_queue_ timer. When a new
261 // message is added, the timer will be modified and the actor will wake.
11fdf7f2 262 non_empty_output_queue_.expires_at(steady_timer::time_point::max());
7c673cae
FG
263 non_empty_output_queue_.async_wait(
264 boost::bind(&tcp_session::await_output, shared_from_this()));
265 }
266 else
267 {
268 start_write();
269 }
270 }
271
272 void start_write()
273 {
274 // Set a deadline for the write operation.
11fdf7f2 275 output_deadline_.expires_after(boost::asio::chrono::seconds(30));
7c673cae
FG
276
277 // Start an asynchronous operation to send a message.
278 boost::asio::async_write(socket_,
279 boost::asio::buffer(output_queue_.front()),
f67539c2
TL
280 boost::bind(&tcp_session::handle_write,
281 shared_from_this(), boost::placeholders::_1));
7c673cae
FG
282 }
283
284 void handle_write(const boost::system::error_code& ec)
285 {
286 if (stopped())
287 return;
288
289 if (!ec)
290 {
291 output_queue_.pop_front();
292
293 await_output();
294 }
295 else
296 {
297 stop();
298 }
299 }
300
11fdf7f2 301 void check_deadline(steady_timer* deadline)
7c673cae
FG
302 {
303 if (stopped())
304 return;
305
306 // Check whether the deadline has passed. We compare the deadline against
307 // the current time since a new asynchronous operation may have moved the
308 // deadline before this actor had a chance to run.
11fdf7f2 309 if (deadline->expiry() <= steady_timer::clock_type::now())
7c673cae
FG
310 {
311 // The deadline has passed. Stop the session. The other actors will
312 // terminate as soon as possible.
313 stop();
314 }
315 else
316 {
317 // Put the actor back to sleep.
318 deadline->async_wait(
319 boost::bind(&tcp_session::check_deadline,
320 shared_from_this(), deadline));
321 }
322 }
323
324 channel& channel_;
325 tcp::socket socket_;
11fdf7f2
TL
326 std::string input_buffer_;
327 steady_timer input_deadline_;
7c673cae 328 std::deque<std::string> output_queue_;
11fdf7f2
TL
329 steady_timer non_empty_output_queue_;
330 steady_timer output_deadline_;
7c673cae
FG
331};
332
333typedef boost::shared_ptr<tcp_session> tcp_session_ptr;
334
335//----------------------------------------------------------------------
336
337class udp_broadcaster
338 : public subscriber
339{
340public:
b32b8144 341 udp_broadcaster(boost::asio::io_context& io_context,
7c673cae 342 const udp::endpoint& broadcast_endpoint)
b32b8144 343 : socket_(io_context)
7c673cae
FG
344 {
345 socket_.connect(broadcast_endpoint);
11fdf7f2 346 socket_.set_option(udp::socket::broadcast(true));
7c673cae
FG
347 }
348
349private:
350 void deliver(const std::string& msg)
351 {
352 boost::system::error_code ignored_ec;
353 socket_.send(boost::asio::buffer(msg), 0, ignored_ec);
354 }
355
356 udp::socket socket_;
357};
358
359//----------------------------------------------------------------------
360
361class server
362{
363public:
b32b8144 364 server(boost::asio::io_context& io_context,
7c673cae
FG
365 const tcp::endpoint& listen_endpoint,
366 const udp::endpoint& broadcast_endpoint)
b32b8144
FG
367 : io_context_(io_context),
368 acceptor_(io_context, listen_endpoint)
7c673cae 369 {
b32b8144 370 subscriber_ptr bc(new udp_broadcaster(io_context_, broadcast_endpoint));
7c673cae
FG
371 channel_.join(bc);
372
373 start_accept();
374 }
375
376 void start_accept()
377 {
b32b8144 378 tcp_session_ptr new_session(new tcp_session(io_context_, channel_));
7c673cae
FG
379
380 acceptor_.async_accept(new_session->socket(),
f67539c2
TL
381 boost::bind(&server::handle_accept,
382 this, new_session, boost::placeholders::_1));
7c673cae
FG
383 }
384
385 void handle_accept(tcp_session_ptr session,
386 const boost::system::error_code& ec)
387 {
388 if (!ec)
389 {
390 session->start();
391 }
392
393 start_accept();
394 }
395
396private:
b32b8144 397 boost::asio::io_context& io_context_;
7c673cae
FG
398 tcp::acceptor acceptor_;
399 channel channel_;
400};
401
402//----------------------------------------------------------------------
403
404int main(int argc, char* argv[])
405{
406 try
407 {
408 using namespace std; // For atoi.
409
410 if (argc != 4)
411 {
412 std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
413 return 1;
414 }
415
b32b8144 416 boost::asio::io_context io_context;
7c673cae
FG
417
418 tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
419
420 udp::endpoint broadcast_endpoint(
b32b8144 421 boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
7c673cae 422
b32b8144 423 server s(io_context, listen_endpoint, broadcast_endpoint);
7c673cae 424
b32b8144 425 io_context.run();
7c673cae
FG
426 }
427 catch (std::exception& e)
428 {
429 std::cerr << "Exception: " << e.what() << "\n";
430 }
431
432 return 0;
433}