]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/asio/example/cpp03/porthopper/server.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / asio / example / cpp03 / porthopper / server.cpp
1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <boost/asio.hpp>
12 #include <boost/bind.hpp>
13 #include <boost/shared_ptr.hpp>
14 #include <cmath>
15 #include <cstdlib>
16 #include <exception>
17 #include <iostream>
18 #include <set>
19 #include "protocol.hpp"
20
21 using boost::asio::ip::tcp;
22 using boost::asio::ip::udp;
23
24 typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr;
25 typedef boost::shared_ptr<boost::asio::deadline_timer> timer_ptr;
26 typedef boost::shared_ptr<control_request> control_request_ptr;
27
28 class server
29 {
30 public:
31 // Construct the server to wait for incoming control connections.
32 server(boost::asio::io_context& io_context, unsigned short port)
33 : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
34 timer_(io_context),
35 udp_socket_(io_context, udp::endpoint(udp::v4(), 0)),
36 next_frame_number_(1)
37 {
38 // Start waiting for a new control connection.
39 tcp_socket_ptr new_socket(
40 new tcp::socket(acceptor_.get_executor().context()));
41 acceptor_.async_accept(*new_socket,
42 boost::bind(&server::handle_accept, this,
43 boost::asio::placeholders::error, new_socket));
44
45 // Start the timer used to generate outgoing frames.
46 timer_.expires_from_now(boost::posix_time::milliseconds(100));
47 timer_.async_wait(boost::bind(&server::handle_timer, this));
48 }
49
50 // Handle a new control connection.
51 void handle_accept(const boost::system::error_code& ec, tcp_socket_ptr socket)
52 {
53 if (!ec)
54 {
55 // Start receiving control requests on the connection.
56 control_request_ptr request(new control_request);
57 boost::asio::async_read(*socket, request->to_buffers(),
58 boost::bind(&server::handle_control_request, this,
59 boost::asio::placeholders::error, socket, request));
60 }
61
62 // Start waiting for a new control connection.
63 tcp_socket_ptr new_socket(
64 new tcp::socket(acceptor_.get_executor().context()));
65 acceptor_.async_accept(*new_socket,
66 boost::bind(&server::handle_accept, this,
67 boost::asio::placeholders::error, new_socket));
68 }
69
70 // Handle a new control request.
71 void handle_control_request(const boost::system::error_code& ec,
72 tcp_socket_ptr socket, control_request_ptr request)
73 {
74 if (!ec)
75 {
76 // Delay handling of the control request to simulate network latency.
77 timer_ptr delay_timer(
78 new boost::asio::deadline_timer(acceptor_.get_executor().context()));
79 delay_timer->expires_from_now(boost::posix_time::seconds(2));
80 delay_timer->async_wait(
81 boost::bind(&server::handle_control_request_timer, this,
82 socket, request, delay_timer));
83 }
84 }
85
86 void handle_control_request_timer(tcp_socket_ptr socket,
87 control_request_ptr request, timer_ptr /*delay_timer*/)
88 {
89 // Determine what address this client is connected from, since
90 // subscriptions must be stored on the server as a complete endpoint, not
91 // just a port. We use the non-throwing overload of remote_endpoint() since
92 // it may fail if the socket is no longer connected.
93 boost::system::error_code ec;
94 tcp::endpoint remote_endpoint = socket->remote_endpoint(ec);
95 if (!ec)
96 {
97 // Remove old port subscription, if any.
98 if (unsigned short old_port = request->old_port())
99 {
100 udp::endpoint old_endpoint(remote_endpoint.address(), old_port);
101 subscribers_.erase(old_endpoint);
102 std::cout << "Removing subscription " << old_endpoint << std::endl;
103 }
104
105 // Add new port subscription, if any.
106 if (unsigned short new_port = request->new_port())
107 {
108 udp::endpoint new_endpoint(remote_endpoint.address(), new_port);
109 subscribers_.insert(new_endpoint);
110 std::cout << "Adding subscription " << new_endpoint << std::endl;
111 }
112 }
113
114 // Wait for next control request on this connection.
115 boost::asio::async_read(*socket, request->to_buffers(),
116 boost::bind(&server::handle_control_request, this,
117 boost::asio::placeholders::error, socket, request));
118 }
119
120 // Every time the timer fires we will generate a new frame and send it to all
121 // subscribers.
122 void handle_timer()
123 {
124 // Generate payload.
125 double x = next_frame_number_ * 0.2;
126 double y = std::sin(x);
127 int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
128 std::string payload;
129 for (int i = 0; i < frame::payload_size; ++i)
130 payload += (i == char_index ? '*' : '.');
131
132 // Create the frame to be sent to all subscribers.
133 frame f(next_frame_number_++, payload);
134
135 // Send frame to all subscribers. We can use synchronous calls here since
136 // UDP send operations typically do not block.
137 std::set<udp::endpoint>::iterator j;
138 for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
139 {
140 boost::system::error_code ec;
141 udp_socket_.send_to(f.to_buffers(), *j, 0, ec);
142 }
143
144 // Wait for next timeout.
145 timer_.expires_from_now(boost::posix_time::milliseconds(100));
146 timer_.async_wait(boost::bind(&server::handle_timer, this));
147 }
148
149 private:
150 // The acceptor used to accept incoming control connections.
151 tcp::acceptor acceptor_;
152
153 // The timer used for generating data.
154 boost::asio::deadline_timer timer_;
155
156 // The socket used to send data to subscribers.
157 udp::socket udp_socket_;
158
159 // The next frame number.
160 unsigned long next_frame_number_;
161
162 // The set of endpoints that are subscribed.
163 std::set<udp::endpoint> subscribers_;
164 };
165
166 int main(int argc, char* argv[])
167 {
168 try
169 {
170 if (argc != 2)
171 {
172 std::cerr << "Usage: server <port>\n";
173 return 1;
174 }
175
176 boost::asio::io_context io_context;
177
178 using namespace std; // For atoi.
179 server s(io_context, atoi(argv[1]));
180
181 io_context.run();
182 }
183 catch (std::exception& e)
184 {
185 std::cerr << "Exception: " << e.what() << std::endl;
186 }
187
188 return 0;
189 }