]> git.proxmox.com Git - ceph.git/blame - ceph/src/Beast/examples/websocket_sync_echo_server.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / Beast / examples / websocket_sync_echo_server.hpp
CommitLineData
7c673cae
FG
1//
2// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef WEBSOCKET_SYNC_ECHO_SERVER_HPP
9#define WEBSOCKET_SYNC_ECHO_SERVER_HPP
10
11#include <beast/core/placeholders.hpp>
12#include <beast/core/streambuf.hpp>
13#include <beast/websocket.hpp>
14#include <boost/lexical_cast.hpp>
15#include <boost/optional.hpp>
16#include <atomic>
17#include <functional>
18#include <memory>
19#include <mutex>
20#include <ostream>
21#include <string>
22#include <thread>
23#include <type_traits>
24#include <typeindex>
25#include <unordered_map>
26#include <utility>
27
28namespace websocket {
29
30/** Synchronous WebSocket echo client/server
31*/
32class sync_echo_server
33{
34public:
35 using error_code = beast::error_code;
36 using endpoint_type = boost::asio::ip::tcp::endpoint;
37 using address_type = boost::asio::ip::address;
38 using socket_type = boost::asio::ip::tcp::socket;
39
40private:
41 struct identity
42 {
43 template<class Body, class Fields>
44 void
45 operator()(beast::http::message<
46 true, Body, Fields>& req) const
47 {
48 req.fields.replace("User-Agent", "sync_echo_client");
49 }
50
51 template<class Body, class Fields>
52 void
53 operator()(beast::http::message<
54 false, Body, Fields>& resp) const
55 {
56 resp.fields.replace("Server", "sync_echo_server");
57 }
58 };
59
60 /** A container of type-erased option setters.
61 */
62 template<class NextLayer>
63 class options_set
64 {
65 // workaround for std::function bug in msvc
66 struct callable
67 {
68 virtual ~callable() = default;
69 virtual void operator()(
70 beast::websocket::stream<NextLayer>&) = 0;
71 };
72
73 template<class T>
74 class callable_impl : public callable
75 {
76 T t_;
77
78 public:
79 template<class U>
80 callable_impl(U&& u)
81 : t_(std::forward<U>(u))
82 {
83 }
84
85 void
86 operator()(beast::websocket::stream<NextLayer>& ws)
87 {
88 t_(ws);
89 }
90 };
91
92 template<class Opt>
93 class lambda
94 {
95 Opt opt_;
96
97 public:
98 lambda(lambda&&) = default;
99 lambda(lambda const&) = default;
100
101 lambda(Opt const& opt)
102 : opt_(opt)
103 {
104 }
105
106 void
107 operator()(beast::websocket::stream<NextLayer>& ws) const
108 {
109 ws.set_option(opt_);
110 }
111 };
112
113 std::unordered_map<std::type_index,
114 std::unique_ptr<callable>> list_;
115
116 public:
117 template<class Opt>
118 void
119 set_option(Opt const& opt)
120 {
121 std::unique_ptr<callable> p;
122 p.reset(new callable_impl<lambda<Opt>>{opt});
123 list_[std::type_index{
124 typeid(Opt)}] = std::move(p);
125 }
126
127 void
128 set_options(beast::websocket::stream<NextLayer>& ws)
129 {
130 for(auto const& op : list_)
131 (*op.second)(ws);
132 }
133 };
134
135 std::ostream* log_;
136 boost::asio::io_service ios_;
137 socket_type sock_;
138 endpoint_type ep_;
139 boost::asio::ip::tcp::acceptor acceptor_;
140 std::thread thread_;
141 options_set<socket_type> opts_;
142
143public:
144 /** Constructor.
145
146 @param log A pointer to a stream to log to, or `nullptr`
147 to disable logging.
148 */
149 sync_echo_server(std::ostream* log)
150 : log_(log)
151 , sock_(ios_)
152 , acceptor_(ios_)
153 {
154 opts_.set_option(
155 beast::websocket::decorate(identity{}));
156 }
157
158 /** Destructor.
159 */
160 ~sync_echo_server()
161 {
162 if(thread_.joinable())
163 {
164 error_code ec;
165 ios_.dispatch(
166 [&]{ acceptor_.close(ec); });
167 thread_.join();
168 }
169 }
170
171 /** Return the listening endpoint.
172 */
173 endpoint_type
174 local_endpoint() const
175 {
176 return acceptor_.local_endpoint();
177 }
178
179 /** Set a websocket option.
180
181 The option will be applied to all new connections.
182
183 @param opt The option to apply.
184 */
185 template<class Opt>
186 void
187 set_option(Opt const& opt)
188 {
189 opts_.set_option(opt);
190 }
191
192 /** Open a listening port.
193
194 @param ep The address and port to bind to.
195
196 @param ec Set to the error, if any occurred.
197 */
198 void
199 open(endpoint_type const& ep, error_code& ec)
200 {
201 acceptor_.open(ep.protocol(), ec);
202 if(ec)
203 return fail("open", ec);
204 acceptor_.set_option(
205 boost::asio::socket_base::reuse_address{true});
206 acceptor_.bind(ep, ec);
207 if(ec)
208 return fail("bind", ec);
209 acceptor_.listen(
210 boost::asio::socket_base::max_connections, ec);
211 if(ec)
212 return fail("listen", ec);
213 acceptor_.async_accept(sock_, ep_,
214 std::bind(&sync_echo_server::on_accept, this,
215 beast::asio::placeholders::error));
216 thread_ = std::thread{[&]{ ios_.run(); }};
217 }
218
219private:
220 void
221 fail(std::string what, error_code ec)
222 {
223 if(log_)
224 {
225 static std::mutex m;
226 std::lock_guard<std::mutex> lock{m};
227 (*log_) << what << ": " <<
228 ec.message() << std::endl;
229 }
230 }
231
232 void
233 fail(std::string what, error_code ec,
234 int id, endpoint_type const& ep)
235 {
236 if(log_)
237 if(ec != beast::websocket::error::closed)
238 fail("[#" + std::to_string(id) + " " +
239 boost::lexical_cast<std::string>(ep) +
240 "] " + what, ec);
241 }
242
243 void
244 on_accept(error_code ec)
245 {
246 if(ec == boost::asio::error::operation_aborted)
247 return;
248 if(ec)
249 return fail("accept", ec);
250 struct lambda
251 {
252 std::size_t id;
253 endpoint_type ep;
254 sync_echo_server& self;
255 boost::asio::io_service::work work;
256 // Must be destroyed before work otherwise the
257 // io_service could be destroyed before the socket.
258 socket_type sock;
259
260 lambda(sync_echo_server& self_,
261 endpoint_type const& ep_,
262 socket_type&& sock_)
263 : id([]
264 {
265 static std::atomic<std::size_t> n{0};
266 return ++n;
267 }())
268 , ep(ep_)
269 , self(self_)
270 , work(sock_.get_io_service())
271 , sock(std::move(sock_))
272 {
273 }
274
275 void operator()()
276 {
277 self.do_peer(id, ep, std::move(sock));
278 }
279 };
280 std::thread{lambda{*this, ep_, std::move(sock_)}}.detach();
281 acceptor_.async_accept(sock_, ep_,
282 std::bind(&sync_echo_server::on_accept, this,
283 beast::asio::placeholders::error));
284 }
285
286 void
287 do_peer(std::size_t id,
288 endpoint_type const& ep, socket_type&& sock)
289 {
290 using boost::asio::buffer;
291 using boost::asio::buffer_copy;
292 beast::websocket::stream<
293 socket_type> ws{std::move(sock)};
294 opts_.set_options(ws);
295 error_code ec;
296 ws.accept(ec);
297 if(ec)
298 {
299 fail("accept", ec, id, ep);
300 return;
301 }
302 for(;;)
303 {
304 beast::websocket::opcode op;
305 beast::streambuf sb;
306 ws.read(op, sb, ec);
307 if(ec)
308 {
309 auto const s = ec.message();
310 break;
311 }
312 ws.set_option(beast::websocket::message_type{op});
313 ws.write(sb.data(), ec);
314 if(ec)
315 break;
316 }
317 if(ec && ec != beast::websocket::error::closed)
318 {
319 fail("read", ec, id, ep);
320 }
321 }
322};
323
324} // websocket
325
326#endif