5 // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
11 #include <boost/asio.hpp>
12 #include <boost/bind.hpp>
13 #include <boost/shared_ptr.hpp>
19 #include "protocol.hpp"
21 using boost::asio::ip::tcp
;
22 using boost::asio::ip::udp
;
24 typedef boost::shared_ptr
<tcp::socket
> tcp_socket_ptr
;
25 typedef boost::shared_ptr
<boost::asio::deadline_timer
> timer_ptr
;
26 typedef boost::shared_ptr
<control_request
> control_request_ptr
;
31 // Construct the server to wait for incoming control connections.
32 server(boost::asio::io_context
& io_context
, unsigned short port
)
33 : acceptor_(io_context
, tcp::endpoint(tcp::v4(), port
)),
35 udp_socket_(io_context
, udp::endpoint(udp::v4(), 0)),
38 // Start waiting for a new control connection.
39 tcp_socket_ptr
new_socket(
40 new tcp::socket(acceptor_
.get_executor().context()));
41 acceptor_
.async_accept(*new_socket
,
42 boost::bind(&server::handle_accept
, this,
43 boost::asio::placeholders::error
, new_socket
));
45 // Start the timer used to generate outgoing frames.
46 timer_
.expires_from_now(boost::posix_time::milliseconds(100));
47 timer_
.async_wait(boost::bind(&server::handle_timer
, this));
50 // Handle a new control connection.
51 void handle_accept(const boost::system::error_code
& ec
, tcp_socket_ptr socket
)
55 // Start receiving control requests on the connection.
56 control_request_ptr
request(new control_request
);
57 boost::asio::async_read(*socket
, request
->to_buffers(),
58 boost::bind(&server::handle_control_request
, this,
59 boost::asio::placeholders::error
, socket
, request
));
62 // Start waiting for a new control connection.
63 tcp_socket_ptr
new_socket(
64 new tcp::socket(acceptor_
.get_executor().context()));
65 acceptor_
.async_accept(*new_socket
,
66 boost::bind(&server::handle_accept
, this,
67 boost::asio::placeholders::error
, new_socket
));
70 // Handle a new control request.
71 void handle_control_request(const boost::system::error_code
& ec
,
72 tcp_socket_ptr socket
, control_request_ptr request
)
76 // Delay handling of the control request to simulate network latency.
77 timer_ptr
delay_timer(
78 new boost::asio::deadline_timer(acceptor_
.get_executor().context()));
79 delay_timer
->expires_from_now(boost::posix_time::seconds(2));
80 delay_timer
->async_wait(
81 boost::bind(&server::handle_control_request_timer
, this,
82 socket
, request
, delay_timer
));
86 void handle_control_request_timer(tcp_socket_ptr socket
,
87 control_request_ptr request
, timer_ptr
/*delay_timer*/)
89 // Determine what address this client is connected from, since
90 // subscriptions must be stored on the server as a complete endpoint, not
91 // just a port. We use the non-throwing overload of remote_endpoint() since
92 // it may fail if the socket is no longer connected.
93 boost::system::error_code ec
;
94 tcp::endpoint remote_endpoint
= socket
->remote_endpoint(ec
);
97 // Remove old port subscription, if any.
98 if (unsigned short old_port
= request
->old_port())
100 udp::endpoint
old_endpoint(remote_endpoint
.address(), old_port
);
101 subscribers_
.erase(old_endpoint
);
102 std::cout
<< "Removing subscription " << old_endpoint
<< std::endl
;
105 // Add new port subscription, if any.
106 if (unsigned short new_port
= request
->new_port())
108 udp::endpoint
new_endpoint(remote_endpoint
.address(), new_port
);
109 subscribers_
.insert(new_endpoint
);
110 std::cout
<< "Adding subscription " << new_endpoint
<< std::endl
;
114 // Wait for next control request on this connection.
115 boost::asio::async_read(*socket
, request
->to_buffers(),
116 boost::bind(&server::handle_control_request
, this,
117 boost::asio::placeholders::error
, socket
, request
));
120 // Every time the timer fires we will generate a new frame and send it to all
125 double x
= next_frame_number_
* 0.2;
126 double y
= std::sin(x
);
127 int char_index
= static_cast<int>((y
+ 1.0) * (frame::payload_size
/ 2));
129 for (int i
= 0; i
< frame::payload_size
; ++i
)
130 payload
+= (i
== char_index
? '*' : '.');
132 // Create the frame to be sent to all subscribers.
133 frame
f(next_frame_number_
++, payload
);
135 // Send frame to all subscribers. We can use synchronous calls here since
136 // UDP send operations typically do not block.
137 std::set
<udp::endpoint
>::iterator j
;
138 for (j
= subscribers_
.begin(); j
!= subscribers_
.end(); ++j
)
140 boost::system::error_code ec
;
141 udp_socket_
.send_to(f
.to_buffers(), *j
, 0, ec
);
144 // Wait for next timeout.
145 timer_
.expires_from_now(boost::posix_time::milliseconds(100));
146 timer_
.async_wait(boost::bind(&server::handle_timer
, this));
150 // The acceptor used to accept incoming control connections.
151 tcp::acceptor acceptor_
;
153 // The timer used for generating data.
154 boost::asio::deadline_timer timer_
;
156 // The socket used to send data to subscribers.
157 udp::socket udp_socket_
;
159 // The next frame number.
160 unsigned long next_frame_number_
;
162 // The set of endpoints that are subscribed.
163 std::set
<udp::endpoint
> subscribers_
;
166 int main(int argc
, char* argv
[])
172 std::cerr
<< "Usage: server <port>\n";
176 boost::asio::io_context io_context
;
178 using namespace std
; // For atoi.
179 server
s(io_context
, atoi(argv
[1]));
183 catch (std::exception
& e
)
185 std::cerr
<< "Exception: " << e
.what() << std::endl
;