#include <boost/beast/version.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/signal_set.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/steady_timer.hpp>
+#include <boost/make_unique.hpp>
#include <boost/config.hpp>
#include <algorithm>
#include <cstdlib>
if(ec)
return send(server_error(ec.message()));
+ // Cache the size since we need it after the move
+ auto const size = body.size();
+
// Respond to HEAD request
if(req.method() == http::verb::head)
{
http::response<http::empty_body> res{http::status::ok, req.version()};
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, mime_type(path));
- res.content_length(body.size());
+ res.content_length(size);
res.keep_alive(req.keep_alive());
return send(std::move(res));
}
std::make_tuple(http::status::ok, req.version())};
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, mime_type(path));
- res.content_length(body.size());
+ res.content_length(size);
res.keep_alive(req.keep_alive());
return send(std::move(res));
}
}
boost::beast::multi_buffer buffer_;
+ char ping_state_ = 0;
protected:
boost::asio::strand<
void
do_accept(http::request<Body, http::basic_fields<Allocator>> req)
{
+ // Set the control callback. This will be called
+ // on every incoming ping, pong, and close frame.
+ derived().ws().control_callback(
+ std::bind(
+ &websocket_session::on_control_callback,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
std::placeholders::_1)));
}
+ void
+ on_accept(boost::system::error_code ec)
+ {
+ // Happens when the timer closes the socket
+ if(ec == boost::asio::error::operation_aborted)
+ return;
+
+ if(ec)
+ return fail(ec, "accept");
+
+ // Read a message
+ do_read();
+ }
+
// Called when the timer expires.
void
on_timer(boost::system::error_code ec)
if(ec && ec != boost::asio::error::operation_aborted)
return fail(ec, "timer");
- // Verify that the timer really expired since the deadline may have moved.
+ // See if the timer really expired since the deadline may have moved.
if(timer_.expiry() <= std::chrono::steady_clock::now())
- derived().do_timeout();
+ {
+ // If this is the first time the timer expired,
+ // send a ping to see if the other end is there.
+ if(derived().ws().is_open() && ping_state_ == 0)
+ {
+ // Note that we are sending a ping
+ ping_state_ = 1;
+
+ // Set the timer
+ timer_.expires_after(std::chrono::seconds(15));
+
+ // Now send the ping
+ derived().ws().async_ping({},
+ boost::asio::bind_executor(
+ strand_,
+ std::bind(
+ &websocket_session::on_ping,
+ derived().shared_from_this(),
+ std::placeholders::_1)));
+ }
+ else
+ {
+ // The timer expired while trying to handshake,
+ // or we sent a ping and it never completed or
+ // we never got back a control frame, so close.
+
+ derived().do_timeout();
+ return;
+ }
+ }
// Wait on the timer
timer_.async_wait(
std::placeholders::_1)));
}
+ // Called to indicate activity from the remote peer
void
- on_accept(boost::system::error_code ec)
+ activity()
+ {
+ // Note that the connection is alive
+ ping_state_ = 0;
+
+ // Set the timer
+ timer_.expires_after(std::chrono::seconds(15));
+ }
+
+ // Called after a ping is sent.
+ void
+ on_ping(boost::system::error_code ec)
{
// Happens when the timer closes the socket
if(ec == boost::asio::error::operation_aborted)
return;
if(ec)
- return fail(ec, "accept");
+ return fail(ec, "ping");
- // Read a message
- do_read();
+ // Note that the ping was sent.
+ if(ping_state_ == 1)
+ {
+ ping_state_ = 2;
+ }
+ else
+ {
+ // ping_state_ could have been set to 0
+ // if an incoming control frame was received
+ // at exactly the same time we sent a ping.
+ BOOST_ASSERT(ping_state_ == 0);
+ }
}
void
- do_read()
+ on_control_callback(
+ websocket::frame_type kind,
+ boost::beast::string_view payload)
{
- // Set the timer
- timer_.expires_after(std::chrono::seconds(15));
+ boost::ignore_unused(kind, payload);
+ // Note that there is activity
+ activity();
+ }
+
+ void
+ do_read()
+ {
// Read a message into our buffer
derived().ws().async_read(
buffer_,
if(ec)
fail(ec, "read");
+ // Note that there is activity
+ activity();
+
// Echo the message
derived().ws().text(derived().ws().got_text());
derived().ws().async_write(
};
// Allocate and store the work
- items_.emplace_back(new work_impl(self_, std::move(msg)));
+ items_.push_back(
+ boost::make_unique<work_impl>(self_, std::move(msg)));
// If there was no previous work, start this one
if(items_.size() == 1)
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
+ // Make the request empty before reading,
+ // otherwise the operation behavior is undefined.
+ req_ = {};
+
// Read a request
http::async_read(
derived().stream(),
return;
}
+ // Allow address reuse
+ acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
+ if(ec)
+ {
+ fail(ec, "set_option");
+ return;
+ }
+
// Bind to the server address
acceptor_.bind(endpoint, ec);
if(ec)
tcp::endpoint{address, port},
doc_root)->run();
+ // Capture SIGINT and SIGTERM to perform a clean shutdown
+ boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
+ signals.async_wait(
+ [&](boost::system::error_code const&, int)
+ {
+ // Stop the `io_context`. This will cause `run()`
+ // to return immediately, eventually destroying the
+ // `io_context` and all of the sockets in it.
+ ioc.stop();
+ });
+
// Run the I/O service on the requested number of threads
std::vector<std::thread> v;
v.reserve(threads - 1);
});
ioc.run();
+ // (If we get here, it means we got a SIGINT or SIGTERM)
+
+ // Block until all the threads exit
+ for(auto& t : v)
+ t.join();
+
return EXIT_SUCCESS;
}