]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // |
2 | // Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) | |
3 | // | |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | ||
8 | #ifndef BEAST_WEBSOCKET_ASYNC_ECHO_SERVER_HPP | |
9 | #define BEAST_WEBSOCKET_ASYNC_ECHO_SERVER_HPP | |
10 | ||
11 | #include <beast/core/placeholders.hpp> | |
12 | #include <beast/core/streambuf.hpp> | |
13 | #include <beast/websocket/stream.hpp> | |
14 | #include <boost/lexical_cast.hpp> | |
15 | #include <boost/optional.hpp> | |
16 | #include <atomic> | |
17 | #include <functional> | |
18 | #include <memory> | |
19 | #include <mutex> | |
20 | #include <ostream> | |
21 | #include <string> | |
22 | #include <thread> | |
23 | #include <type_traits> | |
24 | #include <typeindex> | |
25 | #include <unordered_map> | |
26 | #include <utility> | |
27 | ||
28 | namespace websocket { | |
29 | ||
30 | /** Asynchronous WebSocket echo client/server | |
31 | */ | |
32 | class async_echo_server | |
33 | { | |
34 | public: | |
35 | using error_code = beast::error_code; | |
36 | using address_type = boost::asio::ip::address; | |
37 | using socket_type = boost::asio::ip::tcp::socket; | |
38 | using endpoint_type = boost::asio::ip::tcp::endpoint; | |
39 | ||
40 | private: | |
41 | struct identity | |
42 | { | |
43 | template<class Body, class Fields> | |
44 | void | |
45 | operator()(beast::http::message< | |
46 | true, Body, Fields>& req) const | |
47 | { | |
48 | req.fields.replace("User-Agent", "async_echo_client"); | |
49 | } | |
50 | ||
51 | template<class Body, class Fields> | |
52 | void | |
53 | operator()(beast::http::message< | |
54 | false, Body, Fields>& resp) const | |
55 | { | |
56 | resp.fields.replace("Server", "async_echo_server"); | |
57 | } | |
58 | }; | |
59 | ||
60 | /** A container of type-erased option setters. | |
61 | */ | |
62 | template<class NextLayer> | |
63 | class options_set | |
64 | { | |
65 | // workaround for std::function bug in msvc | |
66 | struct callable | |
67 | { | |
68 | virtual ~callable() = default; | |
69 | virtual void operator()( | |
70 | beast::websocket::stream<NextLayer>&) = 0; | |
71 | }; | |
72 | ||
73 | template<class T> | |
74 | class callable_impl : public callable | |
75 | { | |
76 | T t_; | |
77 | ||
78 | public: | |
79 | template<class U> | |
80 | callable_impl(U&& u) | |
81 | : t_(std::forward<U>(u)) | |
82 | { | |
83 | } | |
84 | ||
85 | void | |
86 | operator()(beast::websocket::stream<NextLayer>& ws) | |
87 | { | |
88 | t_(ws); | |
89 | } | |
90 | }; | |
91 | ||
92 | template<class Opt> | |
93 | class lambda | |
94 | { | |
95 | Opt opt_; | |
96 | ||
97 | public: | |
98 | lambda(lambda&&) = default; | |
99 | lambda(lambda const&) = default; | |
100 | ||
101 | lambda(Opt const& opt) | |
102 | : opt_(opt) | |
103 | { | |
104 | } | |
105 | ||
106 | void | |
107 | operator()(beast::websocket::stream<NextLayer>& ws) const | |
108 | { | |
109 | ws.set_option(opt_); | |
110 | } | |
111 | }; | |
112 | ||
113 | std::unordered_map<std::type_index, | |
114 | std::unique_ptr<callable>> list_; | |
115 | ||
116 | public: | |
117 | template<class Opt> | |
118 | void | |
119 | set_option(Opt const& opt) | |
120 | { | |
121 | std::unique_ptr<callable> p; | |
122 | p.reset(new callable_impl<lambda<Opt>>{opt}); | |
123 | list_[std::type_index{ | |
124 | typeid(Opt)}] = std::move(p); | |
125 | } | |
126 | ||
127 | void | |
128 | set_options(beast::websocket::stream<NextLayer>& ws) | |
129 | { | |
130 | for(auto const& op : list_) | |
131 | (*op.second)(ws); | |
132 | } | |
133 | }; | |
134 | ||
135 | std::ostream* log_; | |
136 | boost::asio::io_service ios_; | |
137 | socket_type sock_; | |
138 | endpoint_type ep_; | |
139 | boost::asio::ip::tcp::acceptor acceptor_; | |
140 | std::vector<std::thread> thread_; | |
141 | boost::optional<boost::asio::io_service::work> work_; | |
142 | options_set<socket_type> opts_; | |
143 | ||
144 | public: | |
145 | async_echo_server(async_echo_server const&) = delete; | |
146 | async_echo_server& operator=(async_echo_server const&) = delete; | |
147 | ||
148 | /** Constructor. | |
149 | ||
150 | @param log A pointer to a stream to log to, or `nullptr` | |
151 | to disable logging. | |
152 | ||
153 | @param threads The number of threads in the io_service. | |
154 | */ | |
155 | async_echo_server(std::ostream* log, | |
156 | std::size_t threads) | |
157 | : log_(log) | |
158 | , sock_(ios_) | |
159 | , acceptor_(ios_) | |
160 | , work_(ios_) | |
161 | { | |
162 | opts_.set_option( | |
163 | beast::websocket::decorate(identity{})); | |
164 | thread_.reserve(threads); | |
165 | for(std::size_t i = 0; i < threads; ++i) | |
166 | thread_.emplace_back( | |
167 | [&]{ ios_.run(); }); | |
168 | } | |
169 | ||
170 | /** Destructor. | |
171 | */ | |
172 | ~async_echo_server() | |
173 | { | |
174 | work_ = boost::none; | |
175 | error_code ec; | |
176 | ios_.dispatch( | |
177 | [&]{ acceptor_.close(ec); }); | |
178 | for(auto& t : thread_) | |
179 | t.join(); | |
180 | } | |
181 | ||
182 | /** Return the listening endpoint. | |
183 | */ | |
184 | endpoint_type | |
185 | local_endpoint() const | |
186 | { | |
187 | return acceptor_.local_endpoint(); | |
188 | } | |
189 | ||
190 | /** Set a websocket option. | |
191 | ||
192 | The option will be applied to all new connections. | |
193 | ||
194 | @param opt The option to apply. | |
195 | */ | |
196 | template<class Opt> | |
197 | void | |
198 | set_option(Opt const& opt) | |
199 | { | |
200 | opts_.set_option(opt); | |
201 | } | |
202 | ||
203 | /** Open a listening port. | |
204 | ||
205 | @param ep The address and port to bind to. | |
206 | ||
207 | @param ec Set to the error, if any occurred. | |
208 | */ | |
209 | void | |
210 | open(endpoint_type const& ep, error_code& ec) | |
211 | { | |
212 | acceptor_.open(ep.protocol(), ec); | |
213 | if(ec) | |
214 | return fail("open", ec); | |
215 | acceptor_.set_option( | |
216 | boost::asio::socket_base::reuse_address{true}); | |
217 | acceptor_.bind(ep, ec); | |
218 | if(ec) | |
219 | return fail("bind", ec); | |
220 | acceptor_.listen( | |
221 | boost::asio::socket_base::max_connections, ec); | |
222 | if(ec) | |
223 | return fail("listen", ec); | |
224 | acceptor_.async_accept(sock_, ep_, | |
225 | std::bind(&async_echo_server::on_accept, this, | |
226 | beast::asio::placeholders::error)); | |
227 | } | |
228 | ||
229 | private: | |
230 | class peer | |
231 | { | |
232 | struct data | |
233 | { | |
234 | async_echo_server& server; | |
235 | endpoint_type ep; | |
236 | int state = 0; | |
237 | beast::websocket::stream<socket_type> ws; | |
238 | boost::asio::io_service::strand strand; | |
239 | beast::websocket::opcode op; | |
240 | beast::streambuf db; | |
241 | std::size_t id; | |
242 | ||
243 | data(async_echo_server& server_, | |
244 | endpoint_type const& ep_, | |
245 | socket_type&& sock_) | |
246 | : server(server_) | |
247 | , ep(ep_) | |
248 | , ws(std::move(sock_)) | |
249 | , strand(ws.get_io_service()) | |
250 | , id([] | |
251 | { | |
252 | static std::atomic<std::size_t> n{0}; | |
253 | return ++n; | |
254 | }()) | |
255 | { | |
256 | } | |
257 | }; | |
258 | ||
259 | // VFALCO This could be unique_ptr in [Net.TS] | |
260 | std::shared_ptr<data> d_; | |
261 | ||
262 | public: | |
263 | peer(peer&&) = default; | |
264 | peer(peer const&) = default; | |
265 | peer& operator=(peer&&) = delete; | |
266 | peer& operator=(peer const&) = delete; | |
267 | ||
268 | template<class... Args> | |
269 | explicit | |
270 | peer(async_echo_server& server, | |
271 | endpoint_type const& ep, socket_type&& sock, | |
272 | Args&&... args) | |
273 | : d_(std::make_shared<data>(server, ep, | |
274 | std::forward<socket_type>(sock), | |
275 | std::forward<Args>(args)...)) | |
276 | { | |
277 | auto& d = *d_; | |
278 | d.server.opts_.set_options(d.ws); | |
279 | run(); | |
280 | } | |
281 | ||
282 | void run() | |
283 | { | |
284 | auto& d = *d_; | |
285 | d.ws.async_accept(std::move(*this)); | |
286 | } | |
287 | ||
288 | template<class DynamicBuffer, std::size_t N> | |
289 | static | |
290 | bool | |
291 | match(DynamicBuffer& db, char const(&s)[N]) | |
292 | { | |
293 | using boost::asio::buffer; | |
294 | using boost::asio::buffer_copy; | |
295 | if(db.size() < N-1) | |
296 | return false; | |
297 | beast::static_string<N-1> t; | |
298 | t.resize(N-1); | |
299 | buffer_copy(buffer(t.data(), t.size()), | |
300 | db.data()); | |
301 | if(t != s) | |
302 | return false; | |
303 | db.consume(N-1); | |
304 | return true; | |
305 | } | |
306 | ||
307 | void operator()(error_code ec, std::size_t) | |
308 | { | |
309 | (*this)(ec); | |
310 | } | |
311 | ||
312 | void operator()(error_code ec) | |
313 | { | |
314 | using boost::asio::buffer; | |
315 | using boost::asio::buffer_copy; | |
316 | auto& d = *d_; | |
317 | switch(d.state) | |
318 | { | |
319 | // did accept | |
320 | case 0: | |
321 | if(ec) | |
322 | return fail("async_accept", ec); | |
323 | ||
324 | // start | |
325 | case 1: | |
326 | if(ec) | |
327 | return fail("async_handshake", ec); | |
328 | d.db.consume(d.db.size()); | |
329 | // read message | |
330 | d.state = 2; | |
331 | d.ws.async_read(d.op, d.db, | |
332 | d.strand.wrap(std::move(*this))); | |
333 | return; | |
334 | ||
335 | // got message | |
336 | case 2: | |
337 | if(ec == beast::websocket::error::closed) | |
338 | return; | |
339 | if(ec) | |
340 | return fail("async_read", ec); | |
341 | if(match(d.db, "RAW")) | |
342 | { | |
343 | d.state = 1; | |
344 | boost::asio::async_write(d.ws.next_layer(), | |
345 | d.db.data(), d.strand.wrap(std::move(*this))); | |
346 | return; | |
347 | } | |
348 | else if(match(d.db, "TEXT")) | |
349 | { | |
350 | d.state = 1; | |
351 | d.ws.set_option( | |
352 | beast::websocket::message_type{ | |
353 | beast::websocket::opcode::text}); | |
354 | d.ws.async_write( | |
355 | d.db.data(), d.strand.wrap(std::move(*this))); | |
356 | return; | |
357 | } | |
358 | else if(match(d.db, "PING")) | |
359 | { | |
360 | beast::websocket::ping_data payload; | |
361 | d.db.consume(buffer_copy( | |
362 | buffer(payload.data(), payload.size()), | |
363 | d.db.data())); | |
364 | d.state = 1; | |
365 | d.ws.async_ping(payload, | |
366 | d.strand.wrap(std::move(*this))); | |
367 | return; | |
368 | } | |
369 | else if(match(d.db, "CLOSE")) | |
370 | { | |
371 | d.state = 1; | |
372 | d.ws.async_close({}, | |
373 | d.strand.wrap(std::move(*this))); | |
374 | return; | |
375 | } | |
376 | // write message | |
377 | d.state = 1; | |
378 | d.ws.set_option( | |
379 | beast::websocket::message_type(d.op)); | |
380 | d.ws.async_write(d.db.data(), | |
381 | d.strand.wrap(std::move(*this))); | |
382 | return; | |
383 | } | |
384 | } | |
385 | ||
386 | private: | |
387 | void | |
388 | fail(std::string what, error_code ec) | |
389 | { | |
390 | auto& d = *d_; | |
391 | if(d.server.log_) | |
392 | if(ec != beast::websocket::error::closed) | |
393 | d.server.fail("[#" + std::to_string(d.id) + | |
394 | " " + boost::lexical_cast<std::string>(d.ep) + | |
395 | "] " + what, ec); | |
396 | } | |
397 | }; | |
398 | ||
399 | void | |
400 | fail(std::string what, error_code ec) | |
401 | { | |
402 | if(log_) | |
403 | { | |
404 | static std::mutex m; | |
405 | std::lock_guard<std::mutex> lock{m}; | |
406 | (*log_) << what << ": " << | |
407 | ec.message() << std::endl; | |
408 | } | |
409 | } | |
410 | ||
411 | void | |
412 | on_accept(error_code ec) | |
413 | { | |
414 | if(! acceptor_.is_open()) | |
415 | return; | |
416 | if(ec == boost::asio::error::operation_aborted) | |
417 | return; | |
418 | if(ec) | |
419 | fail("accept", ec); | |
420 | peer{*this, ep_, std::move(sock_)}; | |
421 | acceptor_.async_accept(sock_, ep_, | |
422 | std::bind(&async_echo_server::on_accept, this, | |
423 | beast::asio::placeholders::error)); | |
424 | } | |
425 | }; | |
426 | ||
427 | } // websocket | |
428 | ||
429 | #endif |