]> git.proxmox.com Git - ceph.git/blob - ceph/src/Beast/test/websocket/websocket_sync_echo_server.hpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / Beast / test / websocket / websocket_sync_echo_server.hpp
1 //
2 // Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BEAST_WEBSOCKET_SYNC_ECHO_SERVER_HPP
9 #define BEAST_WEBSOCKET_SYNC_ECHO_SERVER_HPP
10
11 #include <beast/core/placeholders.hpp>
12 #include <beast/core/streambuf.hpp>
13 #include <beast/websocket.hpp>
14 #include <boost/lexical_cast.hpp>
15 #include <boost/optional.hpp>
16 #include <atomic>
17 #include <functional>
18 #include <memory>
19 #include <mutex>
20 #include <ostream>
21 #include <string>
22 #include <thread>
23 #include <type_traits>
24 #include <typeindex>
25 #include <unordered_map>
26 #include <utility>
27
28 namespace websocket {
29
30 /** Synchronous WebSocket echo client/server
31 */
32 class sync_echo_server
33 {
34 public:
35 using error_code = beast::error_code;
36 using endpoint_type = boost::asio::ip::tcp::endpoint;
37 using address_type = boost::asio::ip::address;
38 using socket_type = boost::asio::ip::tcp::socket;
39
40 private:
41 struct identity
42 {
43 template<class Body, class Fields>
44 void
45 operator()(beast::http::message<
46 true, Body, Fields>& req) const
47 {
48 req.fields.replace("User-Agent", "sync_echo_client");
49 }
50
51 template<class Body, class Fields>
52 void
53 operator()(beast::http::message<
54 false, Body, Fields>& resp) const
55 {
56 resp.fields.replace("Server", "sync_echo_server");
57 }
58 };
59
60 /** A container of type-erased option setters.
61 */
62 template<class NextLayer>
63 class options_set
64 {
65 // workaround for std::function bug in msvc
66 struct callable
67 {
68 virtual ~callable() = default;
69 virtual void operator()(
70 beast::websocket::stream<NextLayer>&) = 0;
71 };
72
73 template<class T>
74 class callable_impl : public callable
75 {
76 T t_;
77
78 public:
79 template<class U>
80 callable_impl(U&& u)
81 : t_(std::forward<U>(u))
82 {
83 }
84
85 void
86 operator()(beast::websocket::stream<NextLayer>& ws)
87 {
88 t_(ws);
89 }
90 };
91
92 template<class Opt>
93 class lambda
94 {
95 Opt opt_;
96
97 public:
98 lambda(lambda&&) = default;
99 lambda(lambda const&) = default;
100
101 lambda(Opt const& opt)
102 : opt_(opt)
103 {
104 }
105
106 void
107 operator()(beast::websocket::stream<NextLayer>& ws) const
108 {
109 ws.set_option(opt_);
110 }
111 };
112
113 std::unordered_map<std::type_index,
114 std::unique_ptr<callable>> list_;
115
116 public:
117 template<class Opt>
118 void
119 set_option(Opt const& opt)
120 {
121 std::unique_ptr<callable> p;
122 p.reset(new callable_impl<lambda<Opt>>{opt});
123 list_[std::type_index{
124 typeid(Opt)}] = std::move(p);
125 }
126
127 void
128 set_options(beast::websocket::stream<NextLayer>& ws)
129 {
130 for(auto const& op : list_)
131 (*op.second)(ws);
132 }
133 };
134
135 std::ostream* log_;
136 boost::asio::io_service ios_;
137 socket_type sock_;
138 endpoint_type ep_;
139 boost::asio::ip::tcp::acceptor acceptor_;
140 std::thread thread_;
141 options_set<socket_type> opts_;
142
143 public:
144 /** Constructor.
145
146 @param log A pointer to a stream to log to, or `nullptr`
147 to disable logging.
148 */
149 sync_echo_server(std::ostream* log)
150 : log_(log)
151 , sock_(ios_)
152 , acceptor_(ios_)
153 {
154 opts_.set_option(
155 beast::websocket::decorate(identity{}));
156 }
157
158 /** Destructor.
159 */
160 ~sync_echo_server()
161 {
162 if(thread_.joinable())
163 {
164 error_code ec;
165 ios_.dispatch(
166 [&]{ acceptor_.close(ec); });
167 thread_.join();
168 }
169 }
170
171 /** Return the listening endpoint.
172 */
173 endpoint_type
174 local_endpoint() const
175 {
176 return acceptor_.local_endpoint();
177 }
178
179 /** Set a websocket option.
180
181 The option will be applied to all new connections.
182
183 @param opt The option to apply.
184 */
185 template<class Opt>
186 void
187 set_option(Opt const& opt)
188 {
189 opts_.set_option(opt);
190 }
191
192 /** Open a listening port.
193
194 @param ep The address and port to bind to.
195
196 @param ec Set to the error, if any occurred.
197 */
198 void
199 open(endpoint_type const& ep, error_code& ec)
200 {
201 acceptor_.open(ep.protocol(), ec);
202 if(ec)
203 return fail("open", ec);
204 acceptor_.set_option(
205 boost::asio::socket_base::reuse_address{true});
206 acceptor_.bind(ep, ec);
207 if(ec)
208 return fail("bind", ec);
209 acceptor_.listen(
210 boost::asio::socket_base::max_connections, ec);
211 if(ec)
212 return fail("listen", ec);
213 acceptor_.async_accept(sock_, ep_,
214 std::bind(&sync_echo_server::on_accept, this,
215 beast::asio::placeholders::error));
216 thread_ = std::thread{[&]{ ios_.run(); }};
217 }
218
219 private:
220 void
221 fail(std::string what, error_code ec)
222 {
223 if(log_)
224 {
225 static std::mutex m;
226 std::lock_guard<std::mutex> lock{m};
227 (*log_) << what << ": " <<
228 ec.message() << std::endl;
229 }
230 }
231
232 void
233 fail(std::string what, error_code ec,
234 int id, endpoint_type const& ep)
235 {
236 if(log_)
237 if(ec != beast::websocket::error::closed)
238 fail("[#" + std::to_string(id) + " " +
239 boost::lexical_cast<std::string>(ep) +
240 "] " + what, ec);
241 }
242
243 void
244 on_accept(error_code ec)
245 {
246 if(ec == boost::asio::error::operation_aborted)
247 return;
248 if(ec)
249 return fail("accept", ec);
250 struct lambda
251 {
252 std::size_t id;
253 endpoint_type ep;
254 sync_echo_server& self;
255 boost::asio::io_service::work work;
256 // Must be destroyed before work otherwise the
257 // io_service could be destroyed before the socket.
258 socket_type sock;
259
260 lambda(sync_echo_server& self_,
261 endpoint_type const& ep_,
262 socket_type&& sock_)
263 : id([]
264 {
265 static std::atomic<std::size_t> n{0};
266 return ++n;
267 }())
268 , ep(ep_)
269 , self(self_)
270 , work(sock_.get_io_service())
271 , sock(std::move(sock_))
272 {
273 }
274
275 void operator()()
276 {
277 self.do_peer(id, ep, std::move(sock));
278 }
279 };
280 std::thread{lambda{*this, ep_, std::move(sock_)}}.detach();
281 acceptor_.async_accept(sock_, ep_,
282 std::bind(&sync_echo_server::on_accept, this,
283 beast::asio::placeholders::error));
284 }
285
286 template<class DynamicBuffer, std::size_t N>
287 static
288 bool
289 match(DynamicBuffer& db, char const(&s)[N])
290 {
291 using boost::asio::buffer;
292 using boost::asio::buffer_copy;
293 if(db.size() < N-1)
294 return false;
295 beast::static_string<N-1> t;
296 t.resize(N-1);
297 buffer_copy(buffer(t.data(), t.size()),
298 db.data());
299 if(t != s)
300 return false;
301 db.consume(N-1);
302 return true;
303 }
304
305 void
306 do_peer(std::size_t id,
307 endpoint_type const& ep, socket_type&& sock)
308 {
309 using boost::asio::buffer;
310 using boost::asio::buffer_copy;
311 beast::websocket::stream<
312 socket_type> ws{std::move(sock)};
313 opts_.set_options(ws);
314 error_code ec;
315 ws.accept(ec);
316 if(ec)
317 {
318 fail("accept", ec, id, ep);
319 return;
320 }
321 for(;;)
322 {
323 beast::websocket::opcode op;
324 beast::streambuf sb;
325 ws.read(op, sb, ec);
326 if(ec)
327 {
328 auto const s = ec.message();
329 break;
330 }
331 ws.set_option(beast::websocket::message_type{op});
332 if(match(sb, "RAW"))
333 {
334 boost::asio::write(
335 ws.next_layer(), sb.data(), ec);
336 }
337 else if(match(sb, "TEXT"))
338 {
339 ws.set_option(
340 beast::websocket::message_type{
341 beast::websocket::opcode::text});
342 ws.write(sb.data(), ec);
343 }
344 else if(match(sb, "PING"))
345 {
346 beast::websocket::ping_data payload;
347 sb.consume(buffer_copy(
348 buffer(payload.data(), payload.size()),
349 sb.data()));
350 ws.ping(payload, ec);
351 }
352 else if(match(sb, "CLOSE"))
353 {
354 ws.close({}, ec);
355 }
356 else
357 {
358 ws.write(sb.data(), ec);
359 }
360 if(ec)
361 break;
362 }
363 if(ec && ec != beast::websocket::error::closed)
364 {
365 fail("read", ec, id, ep);
366 }
367 }
368 };
369
370 } // websocket
371
372 #endif