]>
Commit | Line | Data |
---|---|---|
b32b8144 FG |
1 | // |
2 | // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) | |
3 | // | |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // Official repository: https://github.com/boostorg/beast | |
8 | // | |
9 | ||
10 | //------------------------------------------------------------------------------ | |
11 | // | |
12 | // Example: WebSocket server, fast | |
13 | // | |
14 | //------------------------------------------------------------------------------ | |
15 | ||
16 | /* This server contains the following ports: | |
17 | ||
18 | Synchronous <base port + 0> | |
19 | Asynchronous <base port + 1> | |
20 | Coroutine <base port + 2> | |
21 | ||
22 | This program is optimized for the Autobahn|Testsuite | |
23 | benchmarking and WebSocket compliants testing program. | |
24 | ||
25 | See: | |
26 | https://github.com/crossbario/autobahn-testsuite | |
27 | */ | |
28 | ||
29 | #include <boost/beast/core.hpp> | |
30 | #include <boost/beast/http.hpp> | |
31 | #include <boost/beast/version.hpp> | |
32 | #include <boost/beast/websocket.hpp> | |
33 | #include <boost/asio/bind_executor.hpp> | |
34 | #include <boost/asio/spawn.hpp> | |
35 | #include <boost/asio/strand.hpp> | |
36 | #include <boost/asio/ip/tcp.hpp> | |
37 | #include <algorithm> | |
38 | #include <cstdlib> | |
39 | #include <functional> | |
40 | #include <iostream> | |
41 | #include <memory> | |
42 | #include <string> | |
43 | #include <thread> | |
44 | #include <vector> | |
45 | ||
46 | using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
47 | namespace http = boost::beast::http; // from <boost/beast/http.hpp> | |
48 | namespace websocket = boost::beast::websocket; // from <boost/beast/websocket.hpp> | |
49 | ||
50 | //------------------------------------------------------------------------------ | |
51 | ||
52 | // Report a failure | |
53 | void | |
54 | fail(boost::system::error_code ec, char const* what) | |
55 | { | |
56 | std::cerr << (std::string(what) + ": " + ec.message() + "\n"); | |
57 | } | |
58 | ||
59 | // Adjust settings on the stream | |
60 | template<class NextLayer> | |
61 | void | |
62 | setup_stream(websocket::stream<NextLayer>& ws) | |
63 | { | |
64 | // These values are tuned for Autobahn|Testsuite, and | |
65 | // should also be generally helpful for increased performance. | |
66 | ||
67 | websocket::permessage_deflate pmd; | |
68 | pmd.client_enable = true; | |
69 | pmd.server_enable = true; | |
70 | pmd.compLevel = 3; | |
71 | ws.set_option(pmd); | |
72 | ||
73 | ws.auto_fragment(false); | |
74 | ||
75 | // Autobahn|Testsuite needs this | |
76 | ws.read_message_max(64 * 1024 * 1024); | |
77 | } | |
78 | ||
79 | //------------------------------------------------------------------------------ | |
80 | ||
81 | void | |
82 | do_sync_session(tcp::socket& socket) | |
83 | { | |
84 | boost::system::error_code ec; | |
85 | ||
86 | websocket::stream<tcp::socket> ws{std::move(socket)}; | |
87 | setup_stream(ws); | |
88 | ||
89 | ws.accept_ex( | |
90 | [](websocket::response_type& res) | |
91 | { | |
92 | res.set(http::field::server, | |
93 | "Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Sync"); | |
94 | }, | |
95 | ec); | |
96 | if(ec) | |
97 | return fail(ec, "accept"); | |
98 | ||
99 | for(;;) | |
100 | { | |
101 | boost::beast::multi_buffer buffer; | |
102 | ||
103 | ws.read(buffer, ec); | |
104 | if(ec == websocket::error::closed) | |
105 | break; | |
106 | if(ec) | |
107 | return fail(ec, "read"); | |
108 | ws.text(ws.got_text()); | |
109 | ws.write(buffer.data(), ec); | |
110 | if(ec) | |
111 | return fail(ec, "write"); | |
112 | } | |
113 | } | |
114 | ||
115 | void | |
116 | do_sync_listen( | |
117 | boost::asio::io_context& ioc, | |
118 | tcp::endpoint endpoint) | |
119 | { | |
120 | boost::system::error_code ec; | |
121 | tcp::acceptor acceptor{ioc, endpoint}; | |
122 | for(;;) | |
123 | { | |
124 | tcp::socket socket{ioc}; | |
125 | ||
126 | acceptor.accept(socket, ec); | |
127 | if(ec) | |
128 | return fail(ec, "accept"); | |
129 | ||
130 | std::thread{std::bind( | |
131 | &do_sync_session, | |
132 | std::move(socket))}.detach(); | |
133 | } | |
134 | } | |
135 | ||
136 | //------------------------------------------------------------------------------ | |
137 | ||
138 | // Echoes back all received WebSocket messages | |
139 | class async_session : public std::enable_shared_from_this<async_session> | |
140 | { | |
141 | websocket::stream<tcp::socket> ws_; | |
142 | boost::asio::strand< | |
143 | boost::asio::io_context::executor_type> strand_; | |
144 | boost::beast::multi_buffer buffer_; | |
145 | ||
146 | public: | |
147 | // Take ownership of the socket | |
148 | explicit | |
149 | async_session(tcp::socket socket) | |
150 | : ws_(std::move(socket)) | |
151 | , strand_(ws_.get_executor()) | |
152 | { | |
153 | setup_stream(ws_); | |
154 | } | |
155 | ||
156 | // Start the asynchronous operation | |
157 | void | |
158 | run() | |
159 | { | |
160 | // Accept the websocket handshake | |
161 | ws_.async_accept_ex( | |
162 | [](websocket::response_type& res) | |
163 | { | |
164 | res.set(http::field::server, | |
165 | "Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Async"); | |
166 | }, | |
167 | boost::asio::bind_executor( | |
168 | strand_, | |
169 | std::bind( | |
170 | &async_session::on_accept, | |
171 | shared_from_this(), | |
172 | std::placeholders::_1))); | |
173 | } | |
174 | ||
175 | void | |
176 | on_accept(boost::system::error_code ec) | |
177 | { | |
178 | if(ec) | |
179 | return fail(ec, "accept"); | |
180 | ||
181 | // Read a message | |
182 | do_read(); | |
183 | } | |
184 | ||
185 | void | |
186 | do_read() | |
187 | { | |
188 | // Read a message into our buffer | |
189 | ws_.async_read( | |
190 | buffer_, | |
191 | boost::asio::bind_executor( | |
192 | strand_, | |
193 | std::bind( | |
194 | &async_session::on_read, | |
195 | shared_from_this(), | |
196 | std::placeholders::_1, | |
197 | std::placeholders::_2))); | |
198 | } | |
199 | ||
200 | void | |
201 | on_read( | |
202 | boost::system::error_code ec, | |
203 | std::size_t bytes_transferred) | |
204 | { | |
205 | boost::ignore_unused(bytes_transferred); | |
206 | ||
207 | // This indicates that the async_session was closed | |
208 | if(ec == websocket::error::closed) | |
209 | return; | |
210 | ||
211 | if(ec) | |
212 | fail(ec, "read"); | |
213 | ||
214 | // Echo the message | |
215 | ws_.text(ws_.got_text()); | |
216 | ws_.async_write( | |
217 | buffer_.data(), | |
218 | boost::asio::bind_executor( | |
219 | strand_, | |
220 | std::bind( | |
221 | &async_session::on_write, | |
222 | shared_from_this(), | |
223 | std::placeholders::_1, | |
224 | std::placeholders::_2))); | |
225 | } | |
226 | ||
227 | void | |
228 | on_write( | |
229 | boost::system::error_code ec, | |
230 | std::size_t bytes_transferred) | |
231 | { | |
232 | boost::ignore_unused(bytes_transferred); | |
233 | ||
234 | if(ec) | |
235 | return fail(ec, "write"); | |
236 | ||
237 | // Clear the buffer | |
238 | buffer_.consume(buffer_.size()); | |
239 | ||
240 | // Do another read | |
241 | do_read(); | |
242 | } | |
243 | }; | |
244 | ||
245 | // Accepts incoming connections and launches the sessions | |
246 | class async_listener : public std::enable_shared_from_this<async_listener> | |
247 | { | |
248 | boost::asio::strand< | |
249 | boost::asio::io_context::executor_type> strand_; | |
250 | tcp::acceptor acceptor_; | |
251 | tcp::socket socket_; | |
252 | ||
253 | public: | |
254 | async_listener( | |
255 | boost::asio::io_context& ioc, | |
256 | tcp::endpoint endpoint) | |
257 | : strand_(ioc.get_executor()) | |
258 | , acceptor_(ioc) | |
259 | , socket_(ioc) | |
260 | { | |
261 | boost::system::error_code ec; | |
262 | ||
263 | // Open the acceptor | |
264 | acceptor_.open(endpoint.protocol(), ec); | |
265 | if(ec) | |
266 | { | |
267 | fail(ec, "open"); | |
268 | return; | |
269 | } | |
270 | ||
271 | // Bind to the server address | |
272 | acceptor_.bind(endpoint, ec); | |
273 | if(ec) | |
274 | { | |
275 | fail(ec, "bind"); | |
276 | return; | |
277 | } | |
278 | ||
279 | // Start listening for connections | |
280 | acceptor_.listen( | |
281 | boost::asio::socket_base::max_listen_connections, ec); | |
282 | if(ec) | |
283 | { | |
284 | fail(ec, "listen"); | |
285 | return; | |
286 | } | |
287 | } | |
288 | ||
289 | // Start accepting incoming connections | |
290 | void | |
291 | run() | |
292 | { | |
293 | if(! acceptor_.is_open()) | |
294 | return; | |
295 | do_accept(); | |
296 | } | |
297 | ||
298 | void | |
299 | do_accept() | |
300 | { | |
301 | acceptor_.async_accept( | |
302 | socket_, | |
303 | boost::asio::bind_executor( | |
304 | strand_, | |
305 | std::bind( | |
306 | &async_listener::on_accept, | |
307 | shared_from_this(), | |
308 | std::placeholders::_1))); | |
309 | } | |
310 | ||
311 | void | |
312 | on_accept(boost::system::error_code ec) | |
313 | { | |
314 | if(ec) | |
315 | { | |
316 | fail(ec, "accept"); | |
317 | } | |
318 | else | |
319 | { | |
320 | // Create the async_session and run it | |
321 | std::make_shared<async_session>(std::move(socket_))->run(); | |
322 | } | |
323 | ||
324 | // Accept another connection | |
325 | do_accept(); | |
326 | } | |
327 | }; | |
328 | ||
329 | //------------------------------------------------------------------------------ | |
330 | ||
331 | void | |
332 | do_coro_session(tcp::socket& socket, boost::asio::yield_context yield) | |
333 | { | |
334 | boost::system::error_code ec; | |
335 | ||
336 | websocket::stream<tcp::socket> ws{std::move(socket)}; | |
337 | setup_stream(ws); | |
338 | ||
339 | ws.async_accept_ex( | |
340 | [&](websocket::response_type& res) | |
341 | { | |
342 | res.set(http::field::server, | |
343 | "Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Coro"); | |
344 | }, | |
345 | yield[ec]); | |
346 | if(ec) | |
347 | return fail(ec, "accept"); | |
348 | ||
349 | for(;;) | |
350 | { | |
351 | boost::beast::multi_buffer buffer; | |
352 | ||
353 | ws.async_read(buffer, yield[ec]); | |
354 | if(ec == websocket::error::closed) | |
355 | break; | |
356 | if(ec) | |
357 | return fail(ec, "read"); | |
358 | ||
359 | ws.text(ws.got_text()); | |
360 | ws.async_write(buffer.data(), yield[ec]); | |
361 | if(ec) | |
362 | return fail(ec, "write"); | |
363 | } | |
364 | } | |
365 | ||
366 | void | |
367 | do_coro_listen( | |
368 | boost::asio::io_context& ioc, | |
369 | tcp::endpoint endpoint, | |
370 | boost::asio::yield_context yield) | |
371 | { | |
372 | boost::system::error_code ec; | |
373 | ||
374 | tcp::acceptor acceptor(ioc); | |
375 | acceptor.open(endpoint.protocol(), ec); | |
376 | if(ec) | |
377 | return fail(ec, "open"); | |
378 | ||
379 | acceptor.bind(endpoint, ec); | |
380 | if(ec) | |
381 | return fail(ec, "bind"); | |
382 | ||
383 | acceptor.listen(boost::asio::socket_base::max_listen_connections, ec); | |
384 | if(ec) | |
385 | return fail(ec, "listen"); | |
386 | ||
387 | for(;;) | |
388 | { | |
389 | tcp::socket socket(ioc); | |
390 | ||
391 | acceptor.async_accept(socket, yield[ec]); | |
392 | if(ec) | |
393 | { | |
394 | fail(ec, "accept"); | |
395 | continue; | |
396 | } | |
397 | ||
398 | boost::asio::spawn( | |
399 | acceptor.get_executor().context(), | |
400 | std::bind( | |
401 | &do_coro_session, | |
402 | std::move(socket), | |
403 | std::placeholders::_1)); | |
404 | } | |
405 | } | |
406 | ||
407 | //------------------------------------------------------------------------------ | |
408 | ||
409 | int main(int argc, char* argv[]) | |
410 | { | |
411 | // Check command line arguments. | |
412 | if (argc != 4) | |
413 | { | |
414 | std::cerr << | |
415 | "Usage: websocket-server-fast <address> <starting-port> <threads>\n" << | |
416 | "Example:\n" | |
417 | " websocket-server-fast 0.0.0.0 8080 1\n" | |
418 | " Connect to:\n" | |
419 | " starting-port+0 for synchronous,\n" | |
420 | " starting-port+1 for asynchronous,\n" | |
421 | " starting-port+2 for coroutine.\n"; | |
422 | return EXIT_FAILURE; | |
423 | } | |
424 | auto const address = boost::asio::ip::make_address(argv[1]); | |
425 | auto const port = static_cast<unsigned short>(std::atoi(argv[2])); | |
426 | auto const threads = std::max<int>(1, std::atoi(argv[3])); | |
427 | ||
428 | // The io_context is required for all I/O | |
429 | boost::asio::io_context ioc{threads}; | |
430 | ||
431 | // Create sync port | |
432 | std::thread(std::bind( | |
433 | &do_sync_listen, | |
434 | std::ref(ioc), | |
435 | tcp::endpoint{ | |
436 | address, | |
437 | static_cast<unsigned short>(port + 0u)} | |
438 | )).detach(); | |
439 | ||
440 | // Create async port | |
441 | std::make_shared<async_listener>( | |
442 | ioc, | |
443 | tcp::endpoint{ | |
444 | address, | |
445 | static_cast<unsigned short>(port + 1u)})->run(); | |
446 | ||
447 | // Create coro port | |
448 | boost::asio::spawn(ioc, | |
449 | std::bind( | |
450 | &do_coro_listen, | |
451 | std::ref(ioc), | |
452 | tcp::endpoint{ | |
453 | address, | |
454 | static_cast<unsigned short>(port + 2u)}, | |
455 | std::placeholders::_1)); | |
456 | ||
457 | // Run the I/O service on the requested number of threads | |
458 | std::vector<std::thread> v; | |
459 | v.reserve(threads - 1); | |
460 | for(auto i = threads - 1; i > 0; --i) | |
461 | v.emplace_back( | |
462 | [&ioc] | |
463 | { | |
464 | ioc.run(); | |
465 | }); | |
466 | ioc.run(); | |
467 | ||
468 | return EXIT_SUCCESS; | |
469 | } |