]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/beast/example/websocket/server/fast/websocket_server_fast.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / beast / example / websocket / server / fast / websocket_server_fast.cpp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 //------------------------------------------------------------------------------
11 //
12 // Example: WebSocket server, fast
13 //
14 //------------------------------------------------------------------------------
15
16 /* This server contains the following ports:
17
18 Synchronous <base port + 0>
19 Asynchronous <base port + 1>
20 Coroutine <base port + 2>
21
22 This program is optimized for the Autobahn|Testsuite
23 benchmarking and WebSocket compliants testing program.
24
25 See:
26 https://github.com/crossbario/autobahn-testsuite
27 */
28
29 #include <boost/beast/core.hpp>
30 #include <boost/beast/http.hpp>
31 #include <boost/beast/version.hpp>
32 #include <boost/beast/websocket.hpp>
33 #include <boost/asio/bind_executor.hpp>
34 #include <boost/asio/spawn.hpp>
35 #include <boost/asio/strand.hpp>
36 #include <boost/asio/ip/tcp.hpp>
37 #include <algorithm>
38 #include <cstdlib>
39 #include <functional>
40 #include <iostream>
41 #include <memory>
42 #include <string>
43 #include <thread>
44 #include <vector>
45
46 using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
47 namespace http = boost::beast::http; // from <boost/beast/http.hpp>
48 namespace websocket = boost::beast::websocket; // from <boost/beast/websocket.hpp>
49
50 //------------------------------------------------------------------------------
51
52 // Report a failure
53 void
54 fail(boost::system::error_code ec, char const* what)
55 {
56 std::cerr << (std::string(what) + ": " + ec.message() + "\n");
57 }
58
59 // Adjust settings on the stream
60 template<class NextLayer>
61 void
62 setup_stream(websocket::stream<NextLayer>& ws)
63 {
64 // These values are tuned for Autobahn|Testsuite, and
65 // should also be generally helpful for increased performance.
66
67 websocket::permessage_deflate pmd;
68 pmd.client_enable = true;
69 pmd.server_enable = true;
70 pmd.compLevel = 3;
71 ws.set_option(pmd);
72
73 ws.auto_fragment(false);
74
75 // Autobahn|Testsuite needs this
76 ws.read_message_max(64 * 1024 * 1024);
77 }
78
79 //------------------------------------------------------------------------------
80
81 void
82 do_sync_session(tcp::socket& socket)
83 {
84 boost::system::error_code ec;
85
86 websocket::stream<tcp::socket> ws{std::move(socket)};
87 setup_stream(ws);
88
89 ws.accept_ex(
90 [](websocket::response_type& res)
91 {
92 res.set(http::field::server,
93 "Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Sync");
94 },
95 ec);
96 if(ec)
97 return fail(ec, "accept");
98
99 for(;;)
100 {
101 boost::beast::multi_buffer buffer;
102
103 ws.read(buffer, ec);
104 if(ec == websocket::error::closed)
105 break;
106 if(ec)
107 return fail(ec, "read");
108 ws.text(ws.got_text());
109 ws.write(buffer.data(), ec);
110 if(ec)
111 return fail(ec, "write");
112 }
113 }
114
115 void
116 do_sync_listen(
117 boost::asio::io_context& ioc,
118 tcp::endpoint endpoint)
119 {
120 boost::system::error_code ec;
121 tcp::acceptor acceptor{ioc, endpoint};
122 for(;;)
123 {
124 tcp::socket socket{ioc};
125
126 acceptor.accept(socket, ec);
127 if(ec)
128 return fail(ec, "accept");
129
130 std::thread{std::bind(
131 &do_sync_session,
132 std::move(socket))}.detach();
133 }
134 }
135
136 //------------------------------------------------------------------------------
137
138 // Echoes back all received WebSocket messages
139 class async_session : public std::enable_shared_from_this<async_session>
140 {
141 websocket::stream<tcp::socket> ws_;
142 boost::asio::strand<
143 boost::asio::io_context::executor_type> strand_;
144 boost::beast::multi_buffer buffer_;
145
146 public:
147 // Take ownership of the socket
148 explicit
149 async_session(tcp::socket socket)
150 : ws_(std::move(socket))
151 , strand_(ws_.get_executor())
152 {
153 setup_stream(ws_);
154 }
155
156 // Start the asynchronous operation
157 void
158 run()
159 {
160 // Accept the websocket handshake
161 ws_.async_accept_ex(
162 [](websocket::response_type& res)
163 {
164 res.set(http::field::server,
165 "Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Async");
166 },
167 boost::asio::bind_executor(
168 strand_,
169 std::bind(
170 &async_session::on_accept,
171 shared_from_this(),
172 std::placeholders::_1)));
173 }
174
175 void
176 on_accept(boost::system::error_code ec)
177 {
178 if(ec)
179 return fail(ec, "accept");
180
181 // Read a message
182 do_read();
183 }
184
185 void
186 do_read()
187 {
188 // Read a message into our buffer
189 ws_.async_read(
190 buffer_,
191 boost::asio::bind_executor(
192 strand_,
193 std::bind(
194 &async_session::on_read,
195 shared_from_this(),
196 std::placeholders::_1,
197 std::placeholders::_2)));
198 }
199
200 void
201 on_read(
202 boost::system::error_code ec,
203 std::size_t bytes_transferred)
204 {
205 boost::ignore_unused(bytes_transferred);
206
207 // This indicates that the async_session was closed
208 if(ec == websocket::error::closed)
209 return;
210
211 if(ec)
212 fail(ec, "read");
213
214 // Echo the message
215 ws_.text(ws_.got_text());
216 ws_.async_write(
217 buffer_.data(),
218 boost::asio::bind_executor(
219 strand_,
220 std::bind(
221 &async_session::on_write,
222 shared_from_this(),
223 std::placeholders::_1,
224 std::placeholders::_2)));
225 }
226
227 void
228 on_write(
229 boost::system::error_code ec,
230 std::size_t bytes_transferred)
231 {
232 boost::ignore_unused(bytes_transferred);
233
234 if(ec)
235 return fail(ec, "write");
236
237 // Clear the buffer
238 buffer_.consume(buffer_.size());
239
240 // Do another read
241 do_read();
242 }
243 };
244
245 // Accepts incoming connections and launches the sessions
246 class async_listener : public std::enable_shared_from_this<async_listener>
247 {
248 boost::asio::strand<
249 boost::asio::io_context::executor_type> strand_;
250 tcp::acceptor acceptor_;
251 tcp::socket socket_;
252
253 public:
254 async_listener(
255 boost::asio::io_context& ioc,
256 tcp::endpoint endpoint)
257 : strand_(ioc.get_executor())
258 , acceptor_(ioc)
259 , socket_(ioc)
260 {
261 boost::system::error_code ec;
262
263 // Open the acceptor
264 acceptor_.open(endpoint.protocol(), ec);
265 if(ec)
266 {
267 fail(ec, "open");
268 return;
269 }
270
271 // Bind to the server address
272 acceptor_.bind(endpoint, ec);
273 if(ec)
274 {
275 fail(ec, "bind");
276 return;
277 }
278
279 // Start listening for connections
280 acceptor_.listen(
281 boost::asio::socket_base::max_listen_connections, ec);
282 if(ec)
283 {
284 fail(ec, "listen");
285 return;
286 }
287 }
288
289 // Start accepting incoming connections
290 void
291 run()
292 {
293 if(! acceptor_.is_open())
294 return;
295 do_accept();
296 }
297
298 void
299 do_accept()
300 {
301 acceptor_.async_accept(
302 socket_,
303 boost::asio::bind_executor(
304 strand_,
305 std::bind(
306 &async_listener::on_accept,
307 shared_from_this(),
308 std::placeholders::_1)));
309 }
310
311 void
312 on_accept(boost::system::error_code ec)
313 {
314 if(ec)
315 {
316 fail(ec, "accept");
317 }
318 else
319 {
320 // Create the async_session and run it
321 std::make_shared<async_session>(std::move(socket_))->run();
322 }
323
324 // Accept another connection
325 do_accept();
326 }
327 };
328
329 //------------------------------------------------------------------------------
330
331 void
332 do_coro_session(tcp::socket& socket, boost::asio::yield_context yield)
333 {
334 boost::system::error_code ec;
335
336 websocket::stream<tcp::socket> ws{std::move(socket)};
337 setup_stream(ws);
338
339 ws.async_accept_ex(
340 [&](websocket::response_type& res)
341 {
342 res.set(http::field::server,
343 "Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Coro");
344 },
345 yield[ec]);
346 if(ec)
347 return fail(ec, "accept");
348
349 for(;;)
350 {
351 boost::beast::multi_buffer buffer;
352
353 ws.async_read(buffer, yield[ec]);
354 if(ec == websocket::error::closed)
355 break;
356 if(ec)
357 return fail(ec, "read");
358
359 ws.text(ws.got_text());
360 ws.async_write(buffer.data(), yield[ec]);
361 if(ec)
362 return fail(ec, "write");
363 }
364 }
365
366 void
367 do_coro_listen(
368 boost::asio::io_context& ioc,
369 tcp::endpoint endpoint,
370 boost::asio::yield_context yield)
371 {
372 boost::system::error_code ec;
373
374 tcp::acceptor acceptor(ioc);
375 acceptor.open(endpoint.protocol(), ec);
376 if(ec)
377 return fail(ec, "open");
378
379 acceptor.bind(endpoint, ec);
380 if(ec)
381 return fail(ec, "bind");
382
383 acceptor.listen(boost::asio::socket_base::max_listen_connections, ec);
384 if(ec)
385 return fail(ec, "listen");
386
387 for(;;)
388 {
389 tcp::socket socket(ioc);
390
391 acceptor.async_accept(socket, yield[ec]);
392 if(ec)
393 {
394 fail(ec, "accept");
395 continue;
396 }
397
398 boost::asio::spawn(
399 acceptor.get_executor().context(),
400 std::bind(
401 &do_coro_session,
402 std::move(socket),
403 std::placeholders::_1));
404 }
405 }
406
407 //------------------------------------------------------------------------------
408
409 int main(int argc, char* argv[])
410 {
411 // Check command line arguments.
412 if (argc != 4)
413 {
414 std::cerr <<
415 "Usage: websocket-server-fast <address> <starting-port> <threads>\n" <<
416 "Example:\n"
417 " websocket-server-fast 0.0.0.0 8080 1\n"
418 " Connect to:\n"
419 " starting-port+0 for synchronous,\n"
420 " starting-port+1 for asynchronous,\n"
421 " starting-port+2 for coroutine.\n";
422 return EXIT_FAILURE;
423 }
424 auto const address = boost::asio::ip::make_address(argv[1]);
425 auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
426 auto const threads = std::max<int>(1, std::atoi(argv[3]));
427
428 // The io_context is required for all I/O
429 boost::asio::io_context ioc{threads};
430
431 // Create sync port
432 std::thread(std::bind(
433 &do_sync_listen,
434 std::ref(ioc),
435 tcp::endpoint{
436 address,
437 static_cast<unsigned short>(port + 0u)}
438 )).detach();
439
440 // Create async port
441 std::make_shared<async_listener>(
442 ioc,
443 tcp::endpoint{
444 address,
445 static_cast<unsigned short>(port + 1u)})->run();
446
447 // Create coro port
448 boost::asio::spawn(ioc,
449 std::bind(
450 &do_coro_listen,
451 std::ref(ioc),
452 tcp::endpoint{
453 address,
454 static_cast<unsigned short>(port + 2u)},
455 std::placeholders::_1));
456
457 // Run the I/O service on the requested number of threads
458 std::vector<std::thread> v;
459 v.reserve(threads - 1);
460 for(auto i = threads - 1; i > 0; --i)
461 v.emplace_back(
462 [&ioc]
463 {
464 ioc.run();
465 });
466 ioc.run();
467
468 return EXIT_SUCCESS;
469 }