]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/asio/example/cpp03/porthopper/server.cpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / libs / asio / example / cpp03 / porthopper / server.cpp
CommitLineData
7c673cae
FG
1//
2// server.cpp
3// ~~~~~~~~~~
4//
92f5a8d4 5// Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7c673cae
FG
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#include <boost/asio.hpp>
12#include <boost/bind.hpp>
13#include <boost/shared_ptr.hpp>
14#include <cmath>
15#include <cstdlib>
16#include <exception>
17#include <iostream>
18#include <set>
19#include "protocol.hpp"
20
21using boost::asio::ip::tcp;
22using boost::asio::ip::udp;
23
24typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr;
11fdf7f2 25typedef boost::shared_ptr<boost::asio::steady_timer> timer_ptr;
7c673cae
FG
26typedef boost::shared_ptr<control_request> control_request_ptr;
27
28class server
29{
30public:
31 // Construct the server to wait for incoming control connections.
b32b8144
FG
32 server(boost::asio::io_context& io_context, unsigned short port)
33 : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
34 timer_(io_context),
35 udp_socket_(io_context, udp::endpoint(udp::v4(), 0)),
7c673cae
FG
36 next_frame_number_(1)
37 {
38 // Start waiting for a new control connection.
92f5a8d4 39 tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
7c673cae
FG
40 acceptor_.async_accept(*new_socket,
41 boost::bind(&server::handle_accept, this,
42 boost::asio::placeholders::error, new_socket));
43
44 // Start the timer used to generate outgoing frames.
11fdf7f2 45 timer_.expires_after(boost::asio::chrono::milliseconds(100));
7c673cae
FG
46 timer_.async_wait(boost::bind(&server::handle_timer, this));
47 }
48
49 // Handle a new control connection.
50 void handle_accept(const boost::system::error_code& ec, tcp_socket_ptr socket)
51 {
52 if (!ec)
53 {
54 // Start receiving control requests on the connection.
55 control_request_ptr request(new control_request);
56 boost::asio::async_read(*socket, request->to_buffers(),
57 boost::bind(&server::handle_control_request, this,
58 boost::asio::placeholders::error, socket, request));
59 }
60
61 // Start waiting for a new control connection.
92f5a8d4 62 tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
7c673cae
FG
63 acceptor_.async_accept(*new_socket,
64 boost::bind(&server::handle_accept, this,
65 boost::asio::placeholders::error, new_socket));
66 }
67
68 // Handle a new control request.
69 void handle_control_request(const boost::system::error_code& ec,
70 tcp_socket_ptr socket, control_request_ptr request)
71 {
72 if (!ec)
73 {
74 // Delay handling of the control request to simulate network latency.
75 timer_ptr delay_timer(
92f5a8d4 76 new boost::asio::steady_timer(acceptor_.get_executor()));
11fdf7f2 77 delay_timer->expires_after(boost::asio::chrono::seconds(2));
7c673cae
FG
78 delay_timer->async_wait(
79 boost::bind(&server::handle_control_request_timer, this,
80 socket, request, delay_timer));
81 }
82 }
83
84 void handle_control_request_timer(tcp_socket_ptr socket,
85 control_request_ptr request, timer_ptr /*delay_timer*/)
86 {
87 // Determine what address this client is connected from, since
88 // subscriptions must be stored on the server as a complete endpoint, not
89 // just a port. We use the non-throwing overload of remote_endpoint() since
90 // it may fail if the socket is no longer connected.
91 boost::system::error_code ec;
92 tcp::endpoint remote_endpoint = socket->remote_endpoint(ec);
93 if (!ec)
94 {
95 // Remove old port subscription, if any.
96 if (unsigned short old_port = request->old_port())
97 {
98 udp::endpoint old_endpoint(remote_endpoint.address(), old_port);
99 subscribers_.erase(old_endpoint);
100 std::cout << "Removing subscription " << old_endpoint << std::endl;
101 }
102
103 // Add new port subscription, if any.
104 if (unsigned short new_port = request->new_port())
105 {
106 udp::endpoint new_endpoint(remote_endpoint.address(), new_port);
107 subscribers_.insert(new_endpoint);
108 std::cout << "Adding subscription " << new_endpoint << std::endl;
109 }
110 }
111
112 // Wait for next control request on this connection.
113 boost::asio::async_read(*socket, request->to_buffers(),
114 boost::bind(&server::handle_control_request, this,
115 boost::asio::placeholders::error, socket, request));
116 }
117
118 // Every time the timer fires we will generate a new frame and send it to all
119 // subscribers.
120 void handle_timer()
121 {
122 // Generate payload.
123 double x = next_frame_number_ * 0.2;
124 double y = std::sin(x);
125 int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
126 std::string payload;
127 for (int i = 0; i < frame::payload_size; ++i)
128 payload += (i == char_index ? '*' : '.');
129
130 // Create the frame to be sent to all subscribers.
131 frame f(next_frame_number_++, payload);
132
133 // Send frame to all subscribers. We can use synchronous calls here since
134 // UDP send operations typically do not block.
135 std::set<udp::endpoint>::iterator j;
136 for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
137 {
138 boost::system::error_code ec;
139 udp_socket_.send_to(f.to_buffers(), *j, 0, ec);
140 }
141
142 // Wait for next timeout.
11fdf7f2 143 timer_.expires_after(boost::asio::chrono::milliseconds(100));
7c673cae
FG
144 timer_.async_wait(boost::bind(&server::handle_timer, this));
145 }
146
147private:
148 // The acceptor used to accept incoming control connections.
149 tcp::acceptor acceptor_;
150
151 // The timer used for generating data.
11fdf7f2 152 boost::asio::steady_timer timer_;
7c673cae
FG
153
154 // The socket used to send data to subscribers.
155 udp::socket udp_socket_;
156
157 // The next frame number.
158 unsigned long next_frame_number_;
159
160 // The set of endpoints that are subscribed.
161 std::set<udp::endpoint> subscribers_;
162};
163
164int main(int argc, char* argv[])
165{
166 try
167 {
168 if (argc != 2)
169 {
170 std::cerr << "Usage: server <port>\n";
171 return 1;
172 }
173
b32b8144 174 boost::asio::io_context io_context;
7c673cae
FG
175
176 using namespace std; // For atoi.
b32b8144 177 server s(io_context, atoi(argv[1]));
7c673cae 178
b32b8144 179 io_context.run();
7c673cae
FG
180 }
181 catch (std::exception& e)
182 {
183 std::cerr << "Exception: " << e.what() << std::endl;
184 }
185
186 return 0;
187}