]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/boost/beast/websocket/impl/write.ipp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / write.ipp
index b04f2826fcb1d29a9fe8845690b935406f31ce84..d12f2f9e12e5a65047f1d869377337286a2efa36 100644 (file)
@@ -14,7 +14,6 @@
 #include <boost/beast/core/buffers_cat.hpp>
 #include <boost/beast/core/buffers_prefix.hpp>
 #include <boost/beast/core/buffers_suffix.hpp>
-#include <boost/beast/core/handler_ptr.hpp>
 #include <boost/beast/core/flat_static_buffer.hpp>
 #include <boost/beast/core/type_traits.hpp>
 #include <boost/beast/core/detail/clamp.hpp>
@@ -24,6 +23,7 @@
 #include <boost/asio/associated_executor.hpp>
 #include <boost/asio/coroutine.hpp>
 #include <boost/asio/handler_continuation_hook.hpp>
+#include <boost/asio/handler_invoke_hook.hpp>
 #include <boost/assert.hpp>
 #include <boost/config.hpp>
 #include <boost/throw_exception.hpp>
@@ -34,39 +34,139 @@ namespace boost {
 namespace beast {
 namespace websocket {
 
-template<class NextLayer>
+namespace detail {
+
+// Compress a buffer sequence
+// Returns: `true` if more calls are needed
+//
+template<>
+template<class ConstBufferSequence>
+bool
+stream_base<true>::
+deflate(
+    boost::asio::mutable_buffer& out,
+    buffers_suffix<ConstBufferSequence>& cb,
+    bool fin,
+    std::size_t& total_in,
+    error_code& ec)
+{
+    using boost::asio::buffer;
+    BOOST_ASSERT(out.size() >= 6);
+    auto& zo = this->pmd_->zo;
+    zlib::z_params zs;
+    zs.avail_in = 0;
+    zs.next_in = nullptr;
+    zs.avail_out = out.size();
+    zs.next_out = out.data();
+    for(auto in : beast::detail::buffers_range(cb))
+    {
+        zs.avail_in = in.size();
+        if(zs.avail_in == 0)
+            continue;
+        zs.next_in = in.data();
+        zo.write(zs, zlib::Flush::none, ec);
+        if(ec)
+        {
+            if(ec != zlib::error::need_buffers)
+                return false;
+            BOOST_ASSERT(zs.avail_out == 0);
+            BOOST_ASSERT(zs.total_out == out.size());
+            ec.assign(0, ec.category());
+            break;
+        }
+        if(zs.avail_out == 0)
+        {
+            BOOST_ASSERT(zs.total_out == out.size());
+            break;
+        }
+        BOOST_ASSERT(zs.avail_in == 0);
+    }
+    total_in = zs.total_in;
+    cb.consume(zs.total_in);
+    if(zs.avail_out > 0 && fin)
+    {
+        auto const remain = boost::asio::buffer_size(cb);
+        if(remain == 0)
+        {
+            // Inspired by Mark Adler
+            // https://github.com/madler/zlib/issues/149
+            //
+            // VFALCO We could do this flush twice depending
+            //        on how much space is in the output.
+            zo.write(zs, zlib::Flush::block, ec);
+            BOOST_ASSERT(! ec || ec == zlib::error::need_buffers);
+            if(ec == zlib::error::need_buffers)
+                ec.assign(0, ec.category());
+            if(ec)
+                return false;
+            if(zs.avail_out >= 6)
+            {
+                zo.write(zs, zlib::Flush::full, ec);
+                BOOST_ASSERT(! ec);
+                // remove flush marker
+                zs.total_out -= 4;
+                out = buffer(out.data(), zs.total_out);
+                return false;
+            }
+        }
+    }
+    ec.assign(0, ec.category());
+    out = buffer(out.data(), zs.total_out);
+    return true;
+}
+
+template<>
+inline
+void
+stream_base<true>::
+do_context_takeover_write(role_type role)
+{
+    if((role == role_type::client &&
+        this->pmd_config_.client_no_context_takeover) ||
+       (role == role_type::server &&
+        this->pmd_config_.server_no_context_takeover))
+    {
+        this->pmd_->zo.reset();
+    }
+}
+
+} // detail
+
+//------------------------------------------------------------------------------
+
+template<class NextLayer, bool deflateSupported>
 template<class Buffers, class Handler>
-class stream<NextLayer>::write_some_op
+class stream<NextLayer, deflateSupported>::write_some_op
     : public boost::asio::coroutine
 {
     Handler h_;
-    stream<NextLayer>& ws_;
+    stream<NextLayer, deflateSupported>& ws_;
     buffers_suffix<Buffers> cb_;
     detail::frame_header fh_;
     detail::prepared_key key_;
     std::size_t bytes_transferred_ = 0;
     std::size_t remain_;
     std::size_t in_;
-    token tok_;
     int how_;
     bool fin_;
     bool more_;
     bool cont_ = false;
 
 public:
+    static constexpr int id = 2; // for soft_mutex
+
     write_some_op(write_some_op&&) = default;
-    write_some_op(write_some_op const&) = default;
+    write_some_op(write_some_op const&) = delete;
 
     template<class DeducedHandler>
     write_some_op(
         DeducedHandler&& h,
-        stream<NextLayer>& ws,
+        stream<NextLayer, deflateSupported>& ws,
         bool fin,
         Buffers const& bs)
         : h_(std::forward<DeducedHandler>(h))
         , ws_(ws)
         , cb_(bs)
-        , tok_(ws_.tok_.unique())
         , fin_(fin)
     {
     }
@@ -77,16 +177,16 @@ public:
     allocator_type
     get_allocator() const noexcept
     {
-        return boost::asio::get_associated_allocator(h_);
+        return (boost::asio::get_associated_allocator)(h_);
     }
 
     using executor_type = boost::asio::associated_executor_t<
-        Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
+        Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
 
     executor_type
     get_executor() const noexcept
     {
-        return boost::asio::get_associated_executor(
+        return (boost::asio::get_associated_executor)(
             h_, ws_.get_executor());
     }
 
@@ -108,12 +208,21 @@ public:
         return op->cont_ || asio_handler_is_continuation(
             std::addressof(op->h_));
     }
+
+    template<class Function>
+    friend
+    void asio_handler_invoke(Function&& f, write_some_op* op)
+    {
+        using boost::asio::asio_handler_invoke;
+        asio_handler_invoke(
+            f, std::addressof(op->h_));
+    }
 };
 
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
 template<class Buffers, class Handler>
 void
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
 write_some_op<Buffers, Handler>::
 operator()(
     error_code ec,
@@ -194,11 +303,8 @@ operator()(
         }
 
         // Maybe suspend
-        if(! ws_.wr_block_)
+        if(ws_.wr_block_.try_lock(this))
         {
-            // Acquire the write block
-            ws_.wr_block_ = tok_;
-
             // Make sure the stream is open
             if(! ws_.check_open(ec))
                 goto upcall;
@@ -207,19 +313,17 @@ operator()(
         {
         do_suspend:
             // Suspend
-            BOOST_ASSERT(ws_.wr_block_ != tok_);
             BOOST_ASIO_CORO_YIELD
-            ws_.paused_wr_.save(std::move(*this));
+            ws_.paused_wr_.emplace(std::move(*this));
 
             // Acquire the write block
-            BOOST_ASSERT(! ws_.wr_block_);
-            ws_.wr_block_ = tok_;
+            ws_.wr_block_.lock(this);
 
             // Resume
             BOOST_ASIO_CORO_YIELD
             boost::asio::post(
                 ws_.get_executor(), std::move(*this));
-            BOOST_ASSERT(ws_.wr_block_ == tok_);
+            BOOST_ASSERT(ws_.wr_block_.is_locked(this));
 
             // Make sure the stream is open
             if(! ws_.check_open(ec))
@@ -278,15 +382,15 @@ operator()(
                 fh_.op = detail::opcode::cont;
                 // Allow outgoing control frames to
                 // be sent in between message frames
-                ws_.wr_block_.reset();
+                ws_.wr_block_.unlock(this);
                 if( ws_.paused_close_.maybe_invoke() ||
                     ws_.paused_rd_.maybe_invoke() ||
                     ws_.paused_ping_.maybe_invoke())
                 {
-                    BOOST_ASSERT(ws_.wr_block_);
+                    BOOST_ASSERT(ws_.wr_block_.is_locked());
                     goto do_suspend;
                 }
-                ws_.wr_block_ = tok_;
+                ws_.wr_block_.lock(this);
             }
             goto upcall;
         }
@@ -377,15 +481,15 @@ operator()(
                 fh_.op = detail::opcode::cont;
                 // Allow outgoing control frames to
                 // be sent in between message frames:
-                ws_.wr_block_.reset();
+                ws_.wr_block_.unlock(this);
                 if( ws_.paused_close_.maybe_invoke() ||
                     ws_.paused_rd_.maybe_invoke() ||
                     ws_.paused_ping_.maybe_invoke())
                 {
-                    BOOST_ASSERT(ws_.wr_block_);
+                    BOOST_ASSERT(ws_.wr_block_.is_locked());
                     goto do_suspend;
                 }
-                ws_.wr_block_ = tok_;
+                ws_.wr_block_.lock(this);
             }
             goto upcall;
         }
@@ -398,8 +502,7 @@ operator()(
             {
                 b = buffer(ws_.wr_buf_.get(),
                     ws_.wr_buf_size_);
-                more_ = detail::deflate(ws_.pmd_->zo,
-                    b, cb_, fin_, in_, ec);
+                more_ = ws_.deflate(b, cb_, fin_, in_, ec);
                 if(! ws_.check_ok(ec))
                     goto upcall;
                 n = buffer_size(b);
@@ -439,24 +542,20 @@ operator()(
                     fh_.rsv1 = false;
                     // Allow outgoing control frames to
                     // be sent in between message frames:
-                    ws_.wr_block_.reset();
+                    ws_.wr_block_.unlock(this);
                     if( ws_.paused_close_.maybe_invoke() ||
                         ws_.paused_rd_.maybe_invoke() ||
                         ws_.paused_ping_.maybe_invoke())
                     {
-                        BOOST_ASSERT(ws_.wr_block_);
+                        BOOST_ASSERT(ws_.wr_block_.is_locked());
                         goto do_suspend;
                     }
-                    ws_.wr_block_ = tok_;
+                    ws_.wr_block_.lock(this);
                 }
                 else
                 {
-                    if(fh_.fin && (
-                        (ws_.role_ == role_type::client &&
-                            ws_.pmd_config_.client_no_context_takeover) ||
-                        (ws_.role_ == role_type::server &&
-                            ws_.pmd_config_.server_no_context_takeover)))
-                        ws_.pmd_->zo.reset();
+                    if(fh_.fin)
+                        ws_.do_context_takeover_write(ws_.role_);
                     goto upcall;
                 }
             }
@@ -465,25 +564,24 @@ operator()(
     //--------------------------------------------------------------------------
 
     upcall:
-        BOOST_ASSERT(ws_.wr_block_ == tok_);
-        ws_.wr_block_.reset();
+        ws_.wr_block_.unlock(this);
         ws_.paused_close_.maybe_invoke() ||
             ws_.paused_rd_.maybe_invoke() ||
             ws_.paused_ping_.maybe_invoke();
         if(! cont_)
             return boost::asio::post(
                 ws_.stream_.get_executor(),
-                bind_handler(h_, ec, bytes_transferred_));
+                bind_handler(std::move(h_), ec, bytes_transferred_));
         h_(ec, bytes_transferred_);
     }
 }
 
 //------------------------------------------------------------------------------
 
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
 template<class ConstBufferSequence>
 std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
 write_some(bool fin, ConstBufferSequence const& buffers)
 {
     static_assert(is_sync_stream<next_layer_type>::value,
@@ -499,10 +597,10 @@ write_some(bool fin, ConstBufferSequence const& buffers)
     return bytes_transferred;
 }
 
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
 template<class ConstBufferSequence>
 std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
 write_some(bool fin,
     ConstBufferSequence const& buffers, error_code& ec)
 {
@@ -544,9 +642,8 @@ write_some(bool fin,
         {
             auto b = buffer(
                 wr_buf_.get(), wr_buf_size_);
-            auto const more = detail::deflate(
-                pmd_->zo, b, cb, fin,
-                    bytes_transferred, ec);
+            auto const more = this->deflate(
+                b, cb, fin, bytes_transferred, ec);
             if(! check_ok(ec))
                 return bytes_transferred;
             auto const n = buffer_size(b);
@@ -582,12 +679,8 @@ write_some(bool fin,
             fh.op = detail::opcode::cont;
             fh.rsv1 = false;
         }
-        if(fh.fin && (
-            (role_ == role_type::client &&
-                pmd_config_.client_no_context_takeover) ||
-            (role_ == role_type::server &&
-                pmd_config_.server_no_context_takeover)))
-            pmd_->zo.reset();
+        if(fh.fin)
+            this->do_context_takeover_write(role_);
     }
     else if(! fh.mask)
     {
@@ -712,11 +805,11 @@ write_some(bool fin,
     return bytes_transferred;
 }
 
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
 template<class ConstBufferSequence, class WriteHandler>
 BOOST_ASIO_INITFN_RESULT_TYPE(
     WriteHandler, void(error_code, std::size_t))
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
 async_write_some(bool fin,
     ConstBufferSequence const& bs, WriteHandler&& handler)
 {
@@ -725,21 +818,21 @@ async_write_some(bool fin,
     static_assert(boost::asio::is_const_buffer_sequence<
         ConstBufferSequence>::value,
             "ConstBufferSequence requirements not met");
-    boost::asio::async_completion<WriteHandler,
-        void(error_code, std::size_t)> init{handler};
+    BOOST_BEAST_HANDLER_INIT(
+        WriteHandler, void(error_code, std::size_t));
     write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
         WriteHandler, void(error_code, std::size_t))>{
-            init.completion_handler, *this, fin, bs}(
+            std::move(init.completion_handler), *this, fin, bs}(
                 {}, 0, false);
     return init.result.get();
 }
 
 //------------------------------------------------------------------------------
 
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
 template<class ConstBufferSequence>
 std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
 write(ConstBufferSequence const& buffers)
 {
     static_assert(is_sync_stream<next_layer_type>::value,
@@ -754,10 +847,10 @@ write(ConstBufferSequence const& buffers)
     return bytes_transferred;
 }
 
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
 template<class ConstBufferSequence>
 std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
 write(ConstBufferSequence const& buffers, error_code& ec)
 {
     static_assert(is_sync_stream<next_layer_type>::value,
@@ -768,11 +861,11 @@ write(ConstBufferSequence const& buffers, error_code& ec)
     return write_some(true, buffers, ec);
 }
 
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
 template<class ConstBufferSequence, class WriteHandler>
 BOOST_ASIO_INITFN_RESULT_TYPE(
     WriteHandler, void(error_code, std::size_t))
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
 async_write(
     ConstBufferSequence const& bs, WriteHandler&& handler)
 {
@@ -781,11 +874,11 @@ async_write(
     static_assert(boost::asio::is_const_buffer_sequence<
         ConstBufferSequence>::value,
             "ConstBufferSequence requirements not met");
-    boost::asio::async_completion<WriteHandler,
-        void(error_code, std::size_t)> init{handler};
+    BOOST_BEAST_HANDLER_INIT(
+        WriteHandler, void(error_code, std::size_t));
     write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
         WriteHandler, void(error_code, std::size_t))>{
-            init.completion_handler, *this, true, bs}(
+            std::move(init.completion_handler), *this, true, bs}(
                 {}, 0, false);
     return init.result.get();
 }