2 // Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BEAST_WEBSOCKET_SYNC_ECHO_SERVER_HPP
9 #define BEAST_WEBSOCKET_SYNC_ECHO_SERVER_HPP
11 #include <beast/core/placeholders.hpp>
12 #include <beast/core/streambuf.hpp>
13 #include <beast/websocket.hpp>
14 #include <boost/lexical_cast.hpp>
15 #include <boost/optional.hpp>
23 #include <type_traits>
25 #include <unordered_map>
30 /** Synchronous WebSocket echo client/server
32 class sync_echo_server
35 using error_code = beast::error_code;
36 using endpoint_type = boost::asio::ip::tcp::endpoint;
37 using address_type = boost::asio::ip::address;
38 using socket_type = boost::asio::ip::tcp::socket;
43 template<class Body, class Fields>
45 operator()(beast::http::message<
46 true, Body, Fields>& req) const
48 req.fields.replace("User-Agent", "sync_echo_client");
51 template<class Body, class Fields>
53 operator()(beast::http::message<
54 false, Body, Fields>& resp) const
56 resp.fields.replace("Server", "sync_echo_server");
60 /** A container of type-erased option setters.
62 template<class NextLayer>
65 // workaround for std::function bug in msvc
68 virtual ~callable() = default;
69 virtual void operator()(
70 beast::websocket::stream<NextLayer>&) = 0;
74 class callable_impl : public callable
81 : t_(std::forward<U>(u))
86 operator()(beast::websocket::stream<NextLayer>& ws)
98 lambda(lambda&&) = default;
99 lambda(lambda const&) = default;
101 lambda(Opt const& opt)
107 operator()(beast::websocket::stream<NextLayer>& ws) const
113 std::unordered_map<std::type_index,
114 std::unique_ptr<callable>> list_;
119 set_option(Opt const& opt)
121 std::unique_ptr<callable> p;
122 p.reset(new callable_impl<lambda<Opt>>{opt});
123 list_[std::type_index{
124 typeid(Opt)}] = std::move(p);
128 set_options(beast::websocket::stream<NextLayer>& ws)
130 for(auto const& op : list_)
136 boost::asio::io_service ios_;
139 boost::asio::ip::tcp::acceptor acceptor_;
141 options_set<socket_type> opts_;
146 @param log A pointer to a stream to log to, or `nullptr`
149 sync_echo_server(std::ostream* log)
155 beast::websocket::decorate(identity{}));
162 if(thread_.joinable())
166 [&]{ acceptor_.close(ec); });
171 /** Return the listening endpoint.
174 local_endpoint() const
176 return acceptor_.local_endpoint();
179 /** Set a websocket option.
181 The option will be applied to all new connections.
183 @param opt The option to apply.
187 set_option(Opt const& opt)
189 opts_.set_option(opt);
192 /** Open a listening port.
194 @param ep The address and port to bind to.
196 @param ec Set to the error, if any occurred.
199 open(endpoint_type const& ep, error_code& ec)
201 acceptor_.open(ep.protocol(), ec);
203 return fail("open", ec);
204 acceptor_.set_option(
205 boost::asio::socket_base::reuse_address{true});
206 acceptor_.bind(ep, ec);
208 return fail("bind", ec);
210 boost::asio::socket_base::max_connections, ec);
212 return fail("listen", ec);
213 acceptor_.async_accept(sock_, ep_,
214 std::bind(&sync_echo_server::on_accept, this,
215 beast::asio::placeholders::error));
216 thread_ = std::thread{[&]{ ios_.run(); }};
221 fail(std::string what, error_code ec)
226 std::lock_guard<std::mutex> lock{m};
227 (*log_) << what << ": " <<
228 ec.message() << std::endl;
233 fail(std::string what, error_code ec,
234 int id, endpoint_type const& ep)
237 if(ec != beast::websocket::error::closed)
238 fail("[#" + std::to_string(id) + " " +
239 boost::lexical_cast<std::string>(ep) +
244 on_accept(error_code ec)
246 if(ec == boost::asio::error::operation_aborted)
249 return fail("accept", ec);
254 sync_echo_server& self;
255 boost::asio::io_service::work work;
256 // Must be destroyed before work otherwise the
257 // io_service could be destroyed before the socket.
260 lambda(sync_echo_server& self_,
261 endpoint_type const& ep_,
265 static std::atomic<std::size_t> n{0};
270 , work(sock_.get_io_service())
271 , sock(std::move(sock_))
277 self.do_peer(id, ep, std::move(sock));
280 std::thread{lambda{*this, ep_, std::move(sock_)}}.detach();
281 acceptor_.async_accept(sock_, ep_,
282 std::bind(&sync_echo_server::on_accept, this,
283 beast::asio::placeholders::error));
286 template<class DynamicBuffer, std::size_t N>
289 match(DynamicBuffer& db, char const(&s)[N])
291 using boost::asio::buffer;
292 using boost::asio::buffer_copy;
295 beast::static_string<N-1> t;
297 buffer_copy(buffer(t.data(), t.size()),
306 do_peer(std::size_t id,
307 endpoint_type const& ep, socket_type&& sock)
309 using boost::asio::buffer;
310 using boost::asio::buffer_copy;
311 beast::websocket::stream<
312 socket_type> ws{std::move(sock)};
313 opts_.set_options(ws);
318 fail("accept", ec, id, ep);
323 beast::websocket::opcode op;
328 auto const s = ec.message();
331 ws.set_option(beast::websocket::message_type{op});
335 ws.next_layer(), sb.data(), ec);
337 else if(match(sb, "TEXT"))
340 beast::websocket::message_type{
341 beast::websocket::opcode::text});
342 ws.write(sb.data(), ec);
344 else if(match(sb, "PING"))
346 beast::websocket::ping_data payload;
347 sb.consume(buffer_copy(
348 buffer(payload.data(), payload.size()),
350 ws.ping(payload, ec);
352 else if(match(sb, "CLOSE"))
358 ws.write(sb.data(), ec);
363 if(ec && ec != beast::websocket::error::closed)
365 fail("read", ec, id, ep);