#include <boost/beast/core/buffers_cat.hpp>
#include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/buffers_suffix.hpp>
-#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/clamp.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
+#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/throw_exception.hpp>
namespace beast {
namespace websocket {
-template<class NextLayer>
+namespace detail {
+
+// Compress a buffer sequence
+// Returns: `true` if more calls are needed
+//
+template<>
+template<class ConstBufferSequence>
+bool
+stream_base<true>::
+deflate(
+ boost::asio::mutable_buffer& out,
+ buffers_suffix<ConstBufferSequence>& cb,
+ bool fin,
+ std::size_t& total_in,
+ error_code& ec)
+{
+ using boost::asio::buffer;
+ BOOST_ASSERT(out.size() >= 6);
+ auto& zo = this->pmd_->zo;
+ zlib::z_params zs;
+ zs.avail_in = 0;
+ zs.next_in = nullptr;
+ zs.avail_out = out.size();
+ zs.next_out = out.data();
+ for(auto in : beast::detail::buffers_range(cb))
+ {
+ zs.avail_in = in.size();
+ if(zs.avail_in == 0)
+ continue;
+ zs.next_in = in.data();
+ zo.write(zs, zlib::Flush::none, ec);
+ if(ec)
+ {
+ if(ec != zlib::error::need_buffers)
+ return false;
+ BOOST_ASSERT(zs.avail_out == 0);
+ BOOST_ASSERT(zs.total_out == out.size());
+ ec.assign(0, ec.category());
+ break;
+ }
+ if(zs.avail_out == 0)
+ {
+ BOOST_ASSERT(zs.total_out == out.size());
+ break;
+ }
+ BOOST_ASSERT(zs.avail_in == 0);
+ }
+ total_in = zs.total_in;
+ cb.consume(zs.total_in);
+ if(zs.avail_out > 0 && fin)
+ {
+ auto const remain = boost::asio::buffer_size(cb);
+ if(remain == 0)
+ {
+ // Inspired by Mark Adler
+ // https://github.com/madler/zlib/issues/149
+ //
+ // VFALCO We could do this flush twice depending
+ // on how much space is in the output.
+ zo.write(zs, zlib::Flush::block, ec);
+ BOOST_ASSERT(! ec || ec == zlib::error::need_buffers);
+ if(ec == zlib::error::need_buffers)
+ ec.assign(0, ec.category());
+ if(ec)
+ return false;
+ if(zs.avail_out >= 6)
+ {
+ zo.write(zs, zlib::Flush::full, ec);
+ BOOST_ASSERT(! ec);
+ // remove flush marker
+ zs.total_out -= 4;
+ out = buffer(out.data(), zs.total_out);
+ return false;
+ }
+ }
+ }
+ ec.assign(0, ec.category());
+ out = buffer(out.data(), zs.total_out);
+ return true;
+}
+
+template<>
+inline
+void
+stream_base<true>::
+do_context_takeover_write(role_type role)
+{
+ if((role == role_type::client &&
+ this->pmd_config_.client_no_context_takeover) ||
+ (role == role_type::server &&
+ this->pmd_config_.server_no_context_takeover))
+ {
+ this->pmd_->zo.reset();
+ }
+}
+
+} // detail
+
+//------------------------------------------------------------------------------
+
+template<class NextLayer, bool deflateSupported>
template<class Buffers, class Handler>
-class stream<NextLayer>::write_some_op
+class stream<NextLayer, deflateSupported>::write_some_op
: public boost::asio::coroutine
{
Handler h_;
- stream<NextLayer>& ws_;
+ stream<NextLayer, deflateSupported>& ws_;
buffers_suffix<Buffers> cb_;
detail::frame_header fh_;
detail::prepared_key key_;
std::size_t bytes_transferred_ = 0;
std::size_t remain_;
std::size_t in_;
- token tok_;
int how_;
bool fin_;
bool more_;
bool cont_ = false;
public:
+ static constexpr int id = 2; // for soft_mutex
+
write_some_op(write_some_op&&) = default;
- write_some_op(write_some_op const&) = default;
+ write_some_op(write_some_op const&) = delete;
template<class DeducedHandler>
write_some_op(
DeducedHandler&& h,
- stream<NextLayer>& ws,
+ stream<NextLayer, deflateSupported>& ws,
bool fin,
Buffers const& bs)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, cb_(bs)
- , tok_(ws_.tok_.unique())
, fin_(fin)
{
}
allocator_type
get_allocator() const noexcept
{
- return boost::asio::get_associated_allocator(h_);
+ return (boost::asio::get_associated_allocator)(h_);
}
using executor_type = boost::asio::associated_executor_t<
- Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
+ Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
- return boost::asio::get_associated_executor(
+ return (boost::asio::get_associated_executor)(
h_, ws_.get_executor());
}
return op->cont_ || asio_handler_is_continuation(
std::addressof(op->h_));
}
+
+ template<class Function>
+ friend
+ void asio_handler_invoke(Function&& f, write_some_op* op)
+ {
+ using boost::asio::asio_handler_invoke;
+ asio_handler_invoke(
+ f, std::addressof(op->h_));
+ }
};
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class Buffers, class Handler>
void
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
write_some_op<Buffers, Handler>::
operator()(
error_code ec,
}
// Maybe suspend
- if(! ws_.wr_block_)
+ if(ws_.wr_block_.try_lock(this))
{
- // Acquire the write block
- ws_.wr_block_ = tok_;
-
// Make sure the stream is open
if(! ws_.check_open(ec))
goto upcall;
{
do_suspend:
// Suspend
- BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
- ws_.paused_wr_.save(std::move(*this));
+ ws_.paused_wr_.emplace(std::move(*this));
// Acquire the write block
- BOOST_ASSERT(! ws_.wr_block_);
- ws_.wr_block_ = tok_;
+ ws_.wr_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(), std::move(*this));
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
// Make sure the stream is open
if(! ws_.check_open(ec))
fh_.op = detail::opcode::cont;
// Allow outgoing control frames to
// be sent in between message frames
- ws_.wr_block_.reset();
+ ws_.wr_block_.unlock(this);
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
{
- BOOST_ASSERT(ws_.wr_block_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked());
goto do_suspend;
}
- ws_.wr_block_ = tok_;
+ ws_.wr_block_.lock(this);
}
goto upcall;
}
fh_.op = detail::opcode::cont;
// Allow outgoing control frames to
// be sent in between message frames:
- ws_.wr_block_.reset();
+ ws_.wr_block_.unlock(this);
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
{
- BOOST_ASSERT(ws_.wr_block_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked());
goto do_suspend;
}
- ws_.wr_block_ = tok_;
+ ws_.wr_block_.lock(this);
}
goto upcall;
}
{
b = buffer(ws_.wr_buf_.get(),
ws_.wr_buf_size_);
- more_ = detail::deflate(ws_.pmd_->zo,
- b, cb_, fin_, in_, ec);
+ more_ = ws_.deflate(b, cb_, fin_, in_, ec);
if(! ws_.check_ok(ec))
goto upcall;
n = buffer_size(b);
fh_.rsv1 = false;
// Allow outgoing control frames to
// be sent in between message frames:
- ws_.wr_block_.reset();
+ ws_.wr_block_.unlock(this);
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
{
- BOOST_ASSERT(ws_.wr_block_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked());
goto do_suspend;
}
- ws_.wr_block_ = tok_;
+ ws_.wr_block_.lock(this);
}
else
{
- if(fh_.fin && (
- (ws_.role_ == role_type::client &&
- ws_.pmd_config_.client_no_context_takeover) ||
- (ws_.role_ == role_type::server &&
- ws_.pmd_config_.server_no_context_takeover)))
- ws_.pmd_->zo.reset();
+ if(fh_.fin)
+ ws_.do_context_takeover_write(ws_.role_);
goto upcall;
}
}
//--------------------------------------------------------------------------
upcall:
- BOOST_ASSERT(ws_.wr_block_ == tok_);
- ws_.wr_block_.reset();
+ ws_.wr_block_.unlock(this);
ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke();
if(! cont_)
return boost::asio::post(
ws_.stream_.get_executor(),
- bind_handler(h_, ec, bytes_transferred_));
+ bind_handler(std::move(h_), ec, bytes_transferred_));
h_(ec, bytes_transferred_);
}
}
//------------------------------------------------------------------------------
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class ConstBufferSequence>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
write_some(bool fin, ConstBufferSequence const& buffers)
{
static_assert(is_sync_stream<next_layer_type>::value,
return bytes_transferred;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class ConstBufferSequence>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
write_some(bool fin,
ConstBufferSequence const& buffers, error_code& ec)
{
{
auto b = buffer(
wr_buf_.get(), wr_buf_size_);
- auto const more = detail::deflate(
- pmd_->zo, b, cb, fin,
- bytes_transferred, ec);
+ auto const more = this->deflate(
+ b, cb, fin, bytes_transferred, ec);
if(! check_ok(ec))
return bytes_transferred;
auto const n = buffer_size(b);
fh.op = detail::opcode::cont;
fh.rsv1 = false;
}
- if(fh.fin && (
- (role_ == role_type::client &&
- pmd_config_.client_no_context_takeover) ||
- (role_ == role_type::server &&
- pmd_config_.server_no_context_takeover)))
- pmd_->zo.reset();
+ if(fh.fin)
+ this->do_context_takeover_write(role_);
}
else if(! fh.mask)
{
return bytes_transferred;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class ConstBufferSequence, class WriteHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
WriteHandler, void(error_code, std::size_t))
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
async_write_some(bool fin,
ConstBufferSequence const& bs, WriteHandler&& handler)
{
static_assert(boost::asio::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
- boost::asio::async_completion<WriteHandler,
- void(error_code, std::size_t)> init{handler};
+ BOOST_BEAST_HANDLER_INIT(
+ WriteHandler, void(error_code, std::size_t));
write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code, std::size_t))>{
- init.completion_handler, *this, fin, bs}(
+ std::move(init.completion_handler), *this, fin, bs}(
{}, 0, false);
return init.result.get();
}
//------------------------------------------------------------------------------
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class ConstBufferSequence>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
write(ConstBufferSequence const& buffers)
{
static_assert(is_sync_stream<next_layer_type>::value,
return bytes_transferred;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class ConstBufferSequence>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
write(ConstBufferSequence const& buffers, error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
return write_some(true, buffers, ec);
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class ConstBufferSequence, class WriteHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
WriteHandler, void(error_code, std::size_t))
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
async_write(
ConstBufferSequence const& bs, WriteHandler&& handler)
{
static_assert(boost::asio::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
- boost::asio::async_completion<WriteHandler,
- void(error_code, std::size_t)> init{handler};
+ BOOST_BEAST_HANDLER_INIT(
+ WriteHandler, void(error_code, std::size_t));
write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code, std::size_t))>{
- init.completion_handler, *this, true, bs}(
+ std::move(init.completion_handler), *this, true, bs}(
{}, 0, false);
return init.result.get();
}