2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_WRITE_IPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_WRITE_IPP
13 #include <boost/beast/core/bind_handler.hpp>
14 #include <boost/beast/core/buffers_cat.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/core/buffers_suffix.hpp>
17 #include <boost/beast/core/flat_static_buffer.hpp>
18 #include <boost/beast/core/type_traits.hpp>
19 #include <boost/beast/core/detail/clamp.hpp>
20 #include <boost/beast/core/detail/config.hpp>
21 #include <boost/beast/websocket/detail/frame.hpp>
22 #include <boost/asio/associated_allocator.hpp>
23 #include <boost/asio/associated_executor.hpp>
24 #include <boost/asio/coroutine.hpp>
25 #include <boost/asio/handler_continuation_hook.hpp>
26 #include <boost/asio/handler_invoke_hook.hpp>
27 #include <boost/assert.hpp>
28 #include <boost/config.hpp>
29 #include <boost/throw_exception.hpp>
39 // Compress a buffer sequence
40 // Returns: `true` if more calls are needed
43 template<class ConstBufferSequence>
47 boost::asio::mutable_buffer& out,
48 buffers_suffix<ConstBufferSequence>& cb,
50 std::size_t& total_in,
53 using boost::asio::buffer;
54 BOOST_ASSERT(out.size() >= 6);
55 auto& zo = this->pmd_->zo;
59 zs.avail_out = out.size();
60 zs.next_out = out.data();
61 for(auto in : beast::detail::buffers_range(cb))
63 zs.avail_in = in.size();
66 zs.next_in = in.data();
67 zo.write(zs, zlib::Flush::none, ec);
70 if(ec != zlib::error::need_buffers)
72 BOOST_ASSERT(zs.avail_out == 0);
73 BOOST_ASSERT(zs.total_out == out.size());
74 ec.assign(0, ec.category());
79 BOOST_ASSERT(zs.total_out == out.size());
82 BOOST_ASSERT(zs.avail_in == 0);
84 total_in = zs.total_in;
85 cb.consume(zs.total_in);
86 if(zs.avail_out > 0 && fin)
88 auto const remain = boost::asio::buffer_size(cb);
91 // Inspired by Mark Adler
92 // https://github.com/madler/zlib/issues/149
94 // VFALCO We could do this flush twice depending
95 // on how much space is in the output.
96 zo.write(zs, zlib::Flush::block, ec);
97 BOOST_ASSERT(! ec || ec == zlib::error::need_buffers);
98 if(ec == zlib::error::need_buffers)
99 ec.assign(0, ec.category());
102 if(zs.avail_out >= 6)
104 zo.write(zs, zlib::Flush::full, ec);
106 // remove flush marker
108 out = buffer(out.data(), zs.total_out);
113 ec.assign(0, ec.category());
114 out = buffer(out.data(), zs.total_out);
122 do_context_takeover_write(role_type role)
124 if((role == role_type::client &&
125 this->pmd_config_.client_no_context_takeover) ||
126 (role == role_type::server &&
127 this->pmd_config_.server_no_context_takeover))
129 this->pmd_->zo.reset();
135 //------------------------------------------------------------------------------
137 template<class NextLayer, bool deflateSupported>
138 template<class Buffers, class Handler>
139 class stream<NextLayer, deflateSupported>::write_some_op
140 : public boost::asio::coroutine
143 stream<NextLayer, deflateSupported>& ws_;
144 buffers_suffix<Buffers> cb_;
145 detail::frame_header fh_;
146 detail::prepared_key key_;
147 std::size_t bytes_transferred_ = 0;
156 static constexpr int id = 2; // for soft_mutex
158 write_some_op(write_some_op&&) = default;
159 write_some_op(write_some_op const&) = delete;
161 template<class DeducedHandler>
164 stream<NextLayer, deflateSupported>& ws,
167 : h_(std::forward<DeducedHandler>(h))
174 using allocator_type =
175 boost::asio::associated_allocator_t<Handler>;
178 get_allocator() const noexcept
180 return (boost::asio::get_associated_allocator)(h_);
183 using executor_type = boost::asio::associated_executor_t<
184 Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
187 get_executor() const noexcept
189 return (boost::asio::get_associated_executor)(
190 h_, ws_.get_executor());
201 std::size_t bytes_transferred = 0,
205 bool asio_handler_is_continuation(write_some_op* op)
207 using boost::asio::asio_handler_is_continuation;
208 return op->cont_ || asio_handler_is_continuation(
209 std::addressof(op->h_));
212 template<class Function>
214 void asio_handler_invoke(Function&& f, write_some_op* op)
216 using boost::asio::asio_handler_invoke;
218 f, std::addressof(op->h_));
222 template<class NextLayer, bool deflateSupported>
223 template<class Buffers, class Handler>
225 stream<NextLayer, deflateSupported>::
226 write_some_op<Buffers, Handler>::
229 std::size_t bytes_transferred,
232 using beast::detail::clamp;
233 using boost::asio::buffer;
234 using boost::asio::buffer_copy;
235 using boost::asio::buffer_size;
236 using boost::asio::mutable_buffer;
246 boost::asio::mutable_buffer b;
248 BOOST_ASIO_CORO_REENTER(*this)
250 // Set up the outgoing frame header
254 fh_.rsv1 = ws_.wr_compress_;
262 fh_.op = ws_.wr_cont_ ?
263 detail::opcode::cont : ws_.wr_opcode_;
265 ws_.role_ == role_type::client;
267 // Choose a write algorithm
276 how_ = do_nomask_nofrag;
280 BOOST_ASSERT(ws_.wr_buf_size_ != 0);
281 remain_ = buffer_size(cb_);
282 if(remain_ > ws_.wr_buf_size_)
283 how_ = do_nomask_frag;
285 how_ = do_nomask_nofrag;
292 how_ = do_mask_nofrag;
296 BOOST_ASSERT(ws_.wr_buf_size_ != 0);
297 remain_ = buffer_size(cb_);
298 if(remain_ > ws_.wr_buf_size_)
301 how_ = do_mask_nofrag;
306 if(ws_.wr_block_.try_lock(this))
308 // Make sure the stream is open
309 if(! ws_.check_open(ec))
316 BOOST_ASIO_CORO_YIELD
317 ws_.paused_wr_.emplace(std::move(*this));
319 // Acquire the write block
320 ws_.wr_block_.lock(this);
323 BOOST_ASIO_CORO_YIELD
325 ws_.get_executor(), std::move(*this));
326 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
328 // Make sure the stream is open
329 if(! ws_.check_open(ec))
333 //------------------------------------------------------------------
335 if(how_ == do_nomask_nofrag)
338 fh_.len = buffer_size(cb_);
340 detail::write<flat_static_buffer_base>(
342 ws_.wr_cont_ = ! fin_;
344 BOOST_ASIO_CORO_YIELD
345 boost::asio::async_write(ws_.stream_,
346 buffers_cat(ws_.wr_fb_.data(), cb_),
348 if(! ws_.check_ok(ec))
350 bytes_transferred_ += clamp(fh_.len);
354 //------------------------------------------------------------------
356 else if(how_ == do_nomask_frag)
360 n = clamp(remain_, ws_.wr_buf_size_);
363 fh_.fin = fin_ ? remain_ == 0 : false;
365 detail::write<flat_static_buffer_base>(
367 ws_.wr_cont_ = ! fin_;
369 BOOST_ASIO_CORO_YIELD
370 boost::asio::async_write(
371 ws_.stream_, buffers_cat(
372 ws_.wr_fb_.data(), buffers_prefix(
373 clamp(fh_.len), cb_)),
375 if(! ws_.check_ok(ec))
377 n = clamp(fh_.len); // because yield
378 bytes_transferred_ += n;
382 fh_.op = detail::opcode::cont;
383 // Allow outgoing control frames to
384 // be sent in between message frames
385 ws_.wr_block_.unlock(this);
386 if( ws_.paused_close_.maybe_invoke() ||
387 ws_.paused_rd_.maybe_invoke() ||
388 ws_.paused_ping_.maybe_invoke())
390 BOOST_ASSERT(ws_.wr_block_.is_locked());
393 ws_.wr_block_.lock(this);
398 //------------------------------------------------------------------
400 else if(how_ == do_mask_nofrag)
402 remain_ = buffer_size(cb_);
405 fh_.key = ws_.wr_gen_();
406 detail::prepare_key(key_, fh_.key);
408 detail::write<flat_static_buffer_base>(
410 n = clamp(remain_, ws_.wr_buf_size_);
412 ws_.wr_buf_.get(), n), cb_);
413 detail::mask_inplace(buffer(
414 ws_.wr_buf_.get(), n), key_);
416 ws_.wr_cont_ = ! fin_;
417 // Send frame header and partial payload
418 BOOST_ASIO_CORO_YIELD
419 boost::asio::async_write(
420 ws_.stream_, buffers_cat(ws_.wr_fb_.data(),
421 buffer(ws_.wr_buf_.get(), n)),
423 if(! ws_.check_ok(ec))
425 bytes_transferred_ +=
426 bytes_transferred - ws_.wr_fb_.size();
429 cb_.consume(ws_.wr_buf_size_);
430 n = clamp(remain_, ws_.wr_buf_size_);
432 ws_.wr_buf_.get(), n), cb_);
433 detail::mask_inplace(buffer(
434 ws_.wr_buf_.get(), n), key_);
436 // Send partial payload
437 BOOST_ASIO_CORO_YIELD
438 boost::asio::async_write(ws_.stream_,
439 buffer(ws_.wr_buf_.get(), n),
441 if(! ws_.check_ok(ec))
443 bytes_transferred_ += bytes_transferred;
448 //------------------------------------------------------------------
450 else if(how_ == do_mask_frag)
454 n = clamp(remain_, ws_.wr_buf_size_);
457 fh_.key = ws_.wr_gen_();
458 fh_.fin = fin_ ? remain_ == 0 : false;
459 detail::prepare_key(key_, fh_.key);
461 ws_.wr_buf_.get(), n), cb_);
462 detail::mask_inplace(buffer(
463 ws_.wr_buf_.get(), n), key_);
465 detail::write<flat_static_buffer_base>(
467 ws_.wr_cont_ = ! fin_;
469 BOOST_ASIO_CORO_YIELD
470 boost::asio::async_write(ws_.stream_,
471 buffers_cat(ws_.wr_fb_.data(),
472 buffer(ws_.wr_buf_.get(), n)),
474 if(! ws_.check_ok(ec))
476 n = bytes_transferred - ws_.wr_fb_.size();
477 bytes_transferred_ += n;
481 fh_.op = detail::opcode::cont;
482 // Allow outgoing control frames to
483 // be sent in between message frames:
484 ws_.wr_block_.unlock(this);
485 if( ws_.paused_close_.maybe_invoke() ||
486 ws_.paused_rd_.maybe_invoke() ||
487 ws_.paused_ping_.maybe_invoke())
489 BOOST_ASSERT(ws_.wr_block_.is_locked());
492 ws_.wr_block_.lock(this);
497 //------------------------------------------------------------------
499 else if(how_ == do_deflate)
503 b = buffer(ws_.wr_buf_.get(),
505 more_ = ws_.deflate(b, cb_, fin_, in_, ec);
506 if(! ws_.check_ok(ec))
511 // The input was consumed, but there
512 // is no output due to compression
514 BOOST_ASSERT(! fin_);
515 BOOST_ASSERT(buffer_size(cb_) == 0);
520 fh_.key = ws_.wr_gen_();
521 detail::prepared_key key;
522 detail::prepare_key(key, fh_.key);
523 detail::mask_inplace(b, key);
529 flat_static_buffer_base>(ws_.wr_fb_, fh_);
530 ws_.wr_cont_ = ! fin_;
532 BOOST_ASIO_CORO_YIELD
533 boost::asio::async_write(ws_.stream_,
534 buffers_cat(ws_.wr_fb_.data(), b),
536 if(! ws_.check_ok(ec))
538 bytes_transferred_ += in_;
541 fh_.op = detail::opcode::cont;
543 // Allow outgoing control frames to
544 // be sent in between message frames:
545 ws_.wr_block_.unlock(this);
546 if( ws_.paused_close_.maybe_invoke() ||
547 ws_.paused_rd_.maybe_invoke() ||
548 ws_.paused_ping_.maybe_invoke())
550 BOOST_ASSERT(ws_.wr_block_.is_locked());
553 ws_.wr_block_.lock(this);
558 ws_.do_context_takeover_write(ws_.role_);
564 //--------------------------------------------------------------------------
567 ws_.wr_block_.unlock(this);
568 ws_.paused_close_.maybe_invoke() ||
569 ws_.paused_rd_.maybe_invoke() ||
570 ws_.paused_ping_.maybe_invoke();
572 return boost::asio::post(
573 ws_.stream_.get_executor(),
574 bind_handler(std::move(h_), ec, bytes_transferred_));
575 h_(ec, bytes_transferred_);
579 //------------------------------------------------------------------------------
581 template<class NextLayer, bool deflateSupported>
582 template<class ConstBufferSequence>
584 stream<NextLayer, deflateSupported>::
585 write_some(bool fin, ConstBufferSequence const& buffers)
587 static_assert(is_sync_stream<next_layer_type>::value,
588 "SyncStream requirements not met");
589 static_assert(boost::asio::is_const_buffer_sequence<
590 ConstBufferSequence>::value,
591 "ConstBufferSequence requirements not met");
593 auto const bytes_transferred =
594 write_some(fin, buffers, ec);
596 BOOST_THROW_EXCEPTION(system_error{ec});
597 return bytes_transferred;
600 template<class NextLayer, bool deflateSupported>
601 template<class ConstBufferSequence>
603 stream<NextLayer, deflateSupported>::
605 ConstBufferSequence const& buffers, error_code& ec)
607 static_assert(is_sync_stream<next_layer_type>::value,
608 "SyncStream requirements not met");
609 static_assert(boost::asio::is_const_buffer_sequence<
610 ConstBufferSequence>::value,
611 "ConstBufferSequence requirements not met");
612 using beast::detail::clamp;
613 using boost::asio::buffer;
614 using boost::asio::buffer_copy;
615 using boost::asio::buffer_size;
616 std::size_t bytes_transferred = 0;
617 ec.assign(0, ec.category());
618 // Make sure the stream is open
620 return bytes_transferred;
621 detail::frame_header fh;
625 fh.rsv1 = wr_compress_;
634 detail::opcode::cont : wr_opcode_;
635 fh.mask = role_ == role_type::client;
636 auto remain = buffer_size(buffers);
640 ConstBufferSequence> cb{buffers};
644 wr_buf_.get(), wr_buf_size_);
645 auto const more = this->deflate(
646 b, cb, fin, bytes_transferred, ec);
648 return bytes_transferred;
649 auto const n = buffer_size(b);
652 // The input was consumed, but there
653 // is no output due to compression
656 BOOST_ASSERT(buffer_size(cb) == 0);
663 detail::prepared_key key;
664 detail::prepare_key(key, fh.key);
665 detail::mask_inplace(b, key);
669 detail::fh_buffer fh_buf;
671 flat_static_buffer_base>(fh_buf, fh);
673 boost::asio::write(stream_,
674 buffers_cat(fh_buf.data(), b), ec);
676 return bytes_transferred;
679 fh.op = detail::opcode::cont;
683 this->do_context_takeover_write(role_);
689 // no mask, no autofrag
692 detail::fh_buffer fh_buf;
694 flat_static_buffer_base>(fh_buf, fh);
696 boost::asio::write(stream_,
697 buffers_cat(fh_buf.data(), buffers), ec);
699 return bytes_transferred;
700 bytes_transferred += remain;
705 BOOST_ASSERT(wr_buf_size_ != 0);
707 ConstBufferSequence> cb{buffers};
710 auto const n = clamp(remain, wr_buf_size_);
713 fh.fin = fin ? remain == 0 : false;
714 detail::fh_buffer fh_buf;
716 flat_static_buffer_base>(fh_buf, fh);
718 boost::asio::write(stream_,
719 buffers_cat(fh_buf.data(),
720 buffers_prefix(n, cb)), ec);
722 return bytes_transferred;
723 bytes_transferred += n;
726 fh.op = detail::opcode::cont;
737 detail::prepared_key key;
738 detail::prepare_key(key, fh.key);
739 detail::fh_buffer fh_buf;
741 flat_static_buffer_base>(fh_buf, fh);
743 ConstBufferSequence> cb{buffers};
745 auto const n = clamp(remain, wr_buf_size_);
746 auto const b = buffer(wr_buf_.get(), n);
750 detail::mask_inplace(b, key);
752 boost::asio::write(stream_,
753 buffers_cat(fh_buf.data(), b), ec);
755 return bytes_transferred;
756 bytes_transferred += n;
760 auto const n = clamp(remain, wr_buf_size_);
761 auto const b = buffer(wr_buf_.get(), n);
765 detail::mask_inplace(b, key);
766 boost::asio::write(stream_, b, ec);
768 return bytes_transferred;
769 bytes_transferred += n;
775 BOOST_ASSERT(wr_buf_size_ != 0);
777 ConstBufferSequence> cb{buffers};
781 detail::prepared_key key;
782 detail::prepare_key(key, fh.key);
783 auto const n = clamp(remain, wr_buf_size_);
784 auto const b = buffer(wr_buf_.get(), n);
786 detail::mask_inplace(b, key);
789 fh.fin = fin ? remain == 0 : false;
791 detail::fh_buffer fh_buf;
793 flat_static_buffer_base>(fh_buf, fh);
794 boost::asio::write(stream_,
795 buffers_cat(fh_buf.data(), b), ec);
797 return bytes_transferred;
798 bytes_transferred += n;
801 fh.op = detail::opcode::cont;
805 return bytes_transferred;
808 template<class NextLayer, bool deflateSupported>
809 template<class ConstBufferSequence, class WriteHandler>
810 BOOST_ASIO_INITFN_RESULT_TYPE(
811 WriteHandler, void(error_code, std::size_t))
812 stream<NextLayer, deflateSupported>::
813 async_write_some(bool fin,
814 ConstBufferSequence const& bs, WriteHandler&& handler)
816 static_assert(is_async_stream<next_layer_type>::value,
817 "AsyncStream requirements not met");
818 static_assert(boost::asio::is_const_buffer_sequence<
819 ConstBufferSequence>::value,
820 "ConstBufferSequence requirements not met");
821 BOOST_BEAST_HANDLER_INIT(
822 WriteHandler, void(error_code, std::size_t));
823 write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
824 WriteHandler, void(error_code, std::size_t))>{
825 std::move(init.completion_handler), *this, fin, bs}(
827 return init.result.get();
830 //------------------------------------------------------------------------------
832 template<class NextLayer, bool deflateSupported>
833 template<class ConstBufferSequence>
835 stream<NextLayer, deflateSupported>::
836 write(ConstBufferSequence const& buffers)
838 static_assert(is_sync_stream<next_layer_type>::value,
839 "SyncStream requirements not met");
840 static_assert(boost::asio::is_const_buffer_sequence<
841 ConstBufferSequence>::value,
842 "ConstBufferSequence requirements not met");
844 auto const bytes_transferred = write(buffers, ec);
846 BOOST_THROW_EXCEPTION(system_error{ec});
847 return bytes_transferred;
850 template<class NextLayer, bool deflateSupported>
851 template<class ConstBufferSequence>
853 stream<NextLayer, deflateSupported>::
854 write(ConstBufferSequence const& buffers, error_code& ec)
856 static_assert(is_sync_stream<next_layer_type>::value,
857 "SyncStream requirements not met");
858 static_assert(boost::asio::is_const_buffer_sequence<
859 ConstBufferSequence>::value,
860 "ConstBufferSequence requirements not met");
861 return write_some(true, buffers, ec);
864 template<class NextLayer, bool deflateSupported>
865 template<class ConstBufferSequence, class WriteHandler>
866 BOOST_ASIO_INITFN_RESULT_TYPE(
867 WriteHandler, void(error_code, std::size_t))
868 stream<NextLayer, deflateSupported>::
870 ConstBufferSequence const& bs, WriteHandler&& handler)
872 static_assert(is_async_stream<next_layer_type>::value,
873 "AsyncStream requirements not met");
874 static_assert(boost::asio::is_const_buffer_sequence<
875 ConstBufferSequence>::value,
876 "ConstBufferSequence requirements not met");
877 BOOST_BEAST_HANDLER_INIT(
878 WriteHandler, void(error_code, std::size_t));
879 write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
880 WriteHandler, void(error_code, std::size_t))>{
881 std::move(init.completion_handler), *this, true, bs}(
883 return init.result.get();