]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/libs/beast/example/advanced/server-flex/advanced_server_flex.cpp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / libs / beast / example / advanced / server-flex / advanced_server_flex.cpp
index 30d6f462ca187e21cf0f49add43ade24dd97f189..b1f4ef7109564430602af7c095e811fb98401011 100644 (file)
 #include <boost/beast/version.hpp>
 #include <boost/asio/bind_executor.hpp>
 #include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/signal_set.hpp>
 #include <boost/asio/ssl/stream.hpp>
 #include <boost/asio/strand.hpp>
 #include <boost/asio/steady_timer.hpp>
+#include <boost/make_unique.hpp>
 #include <boost/config.hpp>
 #include <algorithm>
 #include <cstdlib>
@@ -185,13 +187,16 @@ handle_request(
     if(ec)
         return send(server_error(ec.message()));
 
+    // Cache the size since we need it after the move
+    auto const size = body.size();
+
     // Respond to HEAD request
     if(req.method() == http::verb::head)
     {
         http::response<http::empty_body> res{http::status::ok, req.version()};
         res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
         res.set(http::field::content_type, mime_type(path));
-        res.content_length(body.size());
+        res.content_length(size);
         res.keep_alive(req.keep_alive());
         return send(std::move(res));
     }
@@ -203,7 +208,7 @@ handle_request(
         std::make_tuple(http::status::ok, req.version())};
     res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
     res.set(http::field::content_type, mime_type(path));
-    res.content_length(body.size());
+    res.content_length(size);
     res.keep_alive(req.keep_alive());
     return send(std::move(res));
 }
@@ -234,6 +239,7 @@ class websocket_session
     }
 
     boost::beast::multi_buffer buffer_;
+    char ping_state_ = 0;
 
 protected:
     boost::asio::strand<
@@ -255,6 +261,15 @@ public:
     void
     do_accept(http::request<Body, http::basic_fields<Allocator>> req)
     {
+        // Set the control callback. This will be called
+        // on every incoming ping, pong, and close frame.
+        derived().ws().control_callback(
+            std::bind(
+                &websocket_session::on_control_callback,
+                this,
+                std::placeholders::_1,
+                std::placeholders::_2));
+
         // Set the timer
         timer_.expires_after(std::chrono::seconds(15));
 
@@ -269,6 +284,20 @@ public:
                     std::placeholders::_1)));
     }
 
+    void
+    on_accept(boost::system::error_code ec)
+    {
+        // Happens when the timer closes the socket
+        if(ec == boost::asio::error::operation_aborted)
+            return;
+
+        if(ec)
+            return fail(ec, "accept");
+
+        // Read a message
+        do_read();
+    }
+
     // Called when the timer expires.
     void
     on_timer(boost::system::error_code ec)
@@ -276,9 +305,38 @@ public:
         if(ec && ec != boost::asio::error::operation_aborted)
             return fail(ec, "timer");
 
-        // Verify that the timer really expired since the deadline may have moved.
+        // See if the timer really expired since the deadline may have moved.
         if(timer_.expiry() <= std::chrono::steady_clock::now())
-            derived().do_timeout();
+        {
+            // If this is the first time the timer expired,
+            // send a ping to see if the other end is there.
+            if(derived().ws().is_open() && ping_state_ == 0)
+            {
+                // Note that we are sending a ping
+                ping_state_ = 1;
+
+                // Set the timer
+                timer_.expires_after(std::chrono::seconds(15));
+
+                // Now send the ping
+                derived().ws().async_ping({},
+                    boost::asio::bind_executor(
+                        strand_,
+                        std::bind(
+                            &websocket_session::on_ping,
+                            derived().shared_from_this(),
+                            std::placeholders::_1)));
+            }
+            else
+            {
+                // The timer expired while trying to handshake,
+                // or we sent a ping and it never completed or
+                // we never got back a control frame, so close.
+
+                derived().do_timeout();
+                return;
+            }
+        }
 
         // Wait on the timer
         timer_.async_wait(
@@ -290,26 +348,56 @@ public:
                     std::placeholders::_1)));
     }
 
+    // Called to indicate activity from the remote peer
     void
-    on_accept(boost::system::error_code ec)
+    activity()
+    {
+        // Note that the connection is alive
+        ping_state_ = 0;
+
+        // Set the timer
+        timer_.expires_after(std::chrono::seconds(15));
+    }
+
+    // Called after a ping is sent.
+    void
+    on_ping(boost::system::error_code ec)
     {
         // Happens when the timer closes the socket
         if(ec == boost::asio::error::operation_aborted)
             return;
 
         if(ec)
-            return fail(ec, "accept");
+            return fail(ec, "ping");
 
-        // Read a message
-        do_read();
+        // Note that the ping was sent.
+        if(ping_state_ == 1)
+        {
+            ping_state_ = 2;
+        }
+        else
+        {
+            // ping_state_ could have been set to 0
+            // if an incoming control frame was received
+            // at exactly the same time we sent a ping.
+            BOOST_ASSERT(ping_state_ == 0);
+        }
     }
 
     void
-    do_read()
+    on_control_callback(
+        websocket::frame_type kind,
+        boost::beast::string_view payload)
     {
-        // Set the timer
-        timer_.expires_after(std::chrono::seconds(15));
+        boost::ignore_unused(kind, payload);
 
+        // Note that there is activity
+        activity();
+    }
+
+    void
+    do_read()
+    {
         // Read a message into our buffer
         derived().ws().async_read(
             buffer_,
@@ -340,6 +428,9 @@ public:
         if(ec)
             fail(ec, "read");
 
+        // Note that there is activity
+        activity();
+
         // Echo the message
         derived().ws().text(derived().ws().got_text());
         derived().ws().async_write(
@@ -656,7 +747,8 @@ class http_session
             };
 
             // Allocate and store the work
-            items_.emplace_back(new work_impl(self_, std::move(msg)));
+            items_.push_back(
+                boost::make_unique<work_impl>(self_, std::move(msg)));
 
             // If there was no previous work, start this one
             if(items_.size() == 1)
@@ -695,6 +787,10 @@ public:
         // Set the timer
         timer_.expires_after(std::chrono::seconds(15));
 
+        // Make the request empty before reading,
+        // otherwise the operation behavior is undefined.
+        req_ = {};
+
         // Read a request
         http::async_read(
             derived().stream(),
@@ -1080,6 +1176,14 @@ public:
             return;
         }
 
+        // Allow address reuse
+        acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
+        if(ec)
+        {
+            fail(ec, "set_option");
+            return;
+        }
+
         // Bind to the server address
         acceptor_.bind(endpoint, ec);
         if(ec)
@@ -1173,6 +1277,17 @@ int main(int argc, char* argv[])
         tcp::endpoint{address, port},
         doc_root)->run();
 
+    // Capture SIGINT and SIGTERM to perform a clean shutdown
+    boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
+    signals.async_wait(
+        [&](boost::system::error_code const&, int)
+        {
+            // Stop the `io_context`. This will cause `run()`
+            // to return immediately, eventually destroying the
+            // `io_context` and all of the sockets in it.
+            ioc.stop();
+        });
+
     // Run the I/O service on the requested number of threads
     std::vector<std::thread> v;
     v.reserve(threads - 1);
@@ -1184,5 +1299,11 @@ int main(int argc, char* argv[])
         });
     ioc.run();
 
+    // (If we get here, it means we got a SIGINT or SIGTERM)
+
+    // Block until all the threads exit
+    for(auto& t : v)
+        t.join();
+
     return EXIT_SUCCESS;
 }