]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/beast/example/websocket/server/stackless/websocket_server_stackless.cpp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / libs / beast / example / websocket / server / stackless / websocket_server_stackless.cpp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 //------------------------------------------------------------------------------
11 //
12 // Example: WebSocket server, stackless coroutine
13 //
14 //------------------------------------------------------------------------------
15
16 #include <boost/beast/core.hpp>
17 #include <boost/beast/websocket.hpp>
18 #include <boost/asio/bind_executor.hpp>
19 #include <boost/asio/coroutine.hpp>
20 #include <boost/asio/strand.hpp>
21 #include <boost/asio/ip/tcp.hpp>
22 #include <algorithm>
23 #include <cstdlib>
24 #include <functional>
25 #include <iostream>
26 #include <memory>
27 #include <string>
28 #include <thread>
29 #include <vector>
30
31 using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
32 namespace websocket = boost::beast::websocket; // from <boost/beast/websocket.hpp>
33
34 //------------------------------------------------------------------------------
35
36 // Report a failure
37 void
38 fail(boost::system::error_code ec, char const* what)
39 {
40 std::cerr << what << ": " << ec.message() << "\n";
41 }
42
43 // Echoes back all received WebSocket messages
44 class session
45 : public boost::asio::coroutine
46 , public std::enable_shared_from_this<session>
47 {
48 websocket::stream<tcp::socket> ws_;
49 boost::asio::strand<
50 boost::asio::io_context::executor_type> strand_;
51 boost::beast::multi_buffer buffer_;
52
53 public:
54 // Take ownership of the socket
55 explicit
56 session(tcp::socket socket)
57 : ws_(std::move(socket))
58 , strand_(ws_.get_executor())
59 {
60 }
61
62 // Start the asynchronous operation
63 void
64 run()
65 {
66 loop({}, 0);
67 }
68
69 #include <boost/asio/yield.hpp>
70 void
71 loop(
72 boost::system::error_code ec,
73 std::size_t bytes_transferred)
74 {
75 boost::ignore_unused(bytes_transferred);
76 reenter(*this)
77 {
78 // Accept the websocket handshake
79 yield ws_.async_accept(
80 boost::asio::bind_executor(
81 strand_,
82 std::bind(
83 &session::loop,
84 shared_from_this(),
85 std::placeholders::_1,
86 0)));
87 if(ec)
88 return fail(ec, "accept");
89
90 for(;;)
91 {
92 // Read a message into our buffer
93 yield ws_.async_read(
94 buffer_,
95 boost::asio::bind_executor(
96 strand_,
97 std::bind(
98 &session::loop,
99 shared_from_this(),
100 std::placeholders::_1,
101 std::placeholders::_2)));
102 if(ec == websocket::error::closed)
103 {
104 // This indicates that the session was closed
105 return;
106 }
107 if(ec)
108 fail(ec, "read");
109
110 // Echo the message
111 ws_.text(ws_.got_text());
112 yield ws_.async_write(
113 buffer_.data(),
114 boost::asio::bind_executor(
115 strand_,
116 std::bind(
117 &session::loop,
118 shared_from_this(),
119 std::placeholders::_1,
120 std::placeholders::_2)));
121 if(ec)
122 return fail(ec, "write");
123
124 // Clear the buffer
125 buffer_.consume(buffer_.size());
126 }
127 }
128 }
129 #include <boost/asio/unyield.hpp>
130 };
131
132 //------------------------------------------------------------------------------
133
134 // Accepts incoming connections and launches the sessions
135 class listener
136 : public boost::asio::coroutine
137 , public std::enable_shared_from_this<listener>
138 {
139 tcp::acceptor acceptor_;
140 tcp::socket socket_;
141
142 public:
143 listener(
144 boost::asio::io_context& ioc,
145 tcp::endpoint endpoint)
146 : acceptor_(ioc)
147 , socket_(ioc)
148 {
149 boost::system::error_code ec;
150
151 // Open the acceptor
152 acceptor_.open(endpoint.protocol(), ec);
153 if(ec)
154 {
155 fail(ec, "open");
156 return;
157 }
158
159 // Allow address reuse
160 acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
161 if(ec)
162 {
163 fail(ec, "set_option");
164 return;
165 }
166
167 // Bind to the server address
168 acceptor_.bind(endpoint, ec);
169 if(ec)
170 {
171 fail(ec, "bind");
172 return;
173 }
174
175 // Start listening for connections
176 acceptor_.listen(
177 boost::asio::socket_base::max_listen_connections, ec);
178 if(ec)
179 {
180 fail(ec, "listen");
181 return;
182 }
183 }
184
185 // Start accepting incoming connections
186 void
187 run()
188 {
189 if(! acceptor_.is_open())
190 return;
191 loop();
192 }
193
194 #include <boost/asio/yield.hpp>
195 void
196 loop(boost::system::error_code ec = {})
197 {
198 reenter(*this)
199 {
200 for(;;)
201 {
202 yield acceptor_.async_accept(
203 socket_,
204 std::bind(
205 &listener::loop,
206 shared_from_this(),
207 std::placeholders::_1));
208 if(ec)
209 {
210 fail(ec, "accept");
211 }
212 else
213 {
214 // Create the session and run it
215 std::make_shared<session>(std::move(socket_))->run();
216 }
217 }
218 }
219 }
220 #include <boost/asio/unyield.hpp>
221 };
222
223 //------------------------------------------------------------------------------
224
225 int main(int argc, char* argv[])
226 {
227 // Check command line arguments.
228 if (argc != 4)
229 {
230 std::cerr <<
231 "Usage: websocket-server-stackless <address> <port> <threads>\n" <<
232 "Example:\n" <<
233 " websocket-server-stackless 0.0.0.0 8080 1\n";
234 return EXIT_FAILURE;
235 }
236 auto const address = boost::asio::ip::make_address(argv[1]);
237 auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
238 auto const threads = std::max<int>(1, std::atoi(argv[3]));
239
240 // The io_context is required for all I/O
241 boost::asio::io_context ioc{threads};
242
243 // Create and launch a listening port
244 std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();
245
246 // Run the I/O service on the requested number of threads
247 std::vector<std::thread> v;
248 v.reserve(threads - 1);
249 for(auto i = threads - 1; i > 0; --i)
250 v.emplace_back(
251 [&ioc]
252 {
253 ioc.run();
254 });
255 ioc.run();
256
257 return EXIT_SUCCESS;
258 }