2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/core/bind_handler.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/core/buffers_suffix.hpp>
17 #include <boost/beast/core/flat_static_buffer.hpp>
18 #include <boost/beast/core/type_traits.hpp>
19 #include <boost/beast/core/detail/clamp.hpp>
20 #include <boost/beast/core/detail/config.hpp>
21 #include <boost/asio/associated_allocator.hpp>
22 #include <boost/asio/associated_executor.hpp>
23 #include <boost/asio/coroutine.hpp>
24 #include <boost/asio/handler_continuation_hook.hpp>
25 #include <boost/asio/handler_invoke_hook.hpp>
26 #include <boost/asio/post.hpp>
27 #include <boost/assert.hpp>
28 #include <boost/config.hpp>
29 #include <boost/optional.hpp>
30 #include <boost/throw_exception.hpp>
50 this->pmd_->zi.write(zs, flush, ec);
57 do_context_takeover_read(role_type role)
59 if((role == role_type::client &&
60 pmd_config_.server_no_context_takeover) ||
61 (role == role_type::server &&
62 pmd_config_.client_no_context_takeover))
70 //------------------------------------------------------------------------------
72 /* Read some message frame data.
74 Also reads and handles control frames.
76 template<class NextLayer, bool deflateSupported>
78 class MutableBufferSequence,
80 class stream<NextLayer, deflateSupported>::read_some_op
81 : public boost::asio::coroutine
84 stream<NextLayer, deflateSupported>& ws_;
85 MutableBufferSequence bs_;
86 buffers_suffix<MutableBufferSequence> cb_;
87 std::size_t bytes_written_ = 0;
90 bool did_read_ = false;
94 static constexpr int id = 1; // for soft_mutex
96 read_some_op(read_some_op&&) = default;
97 read_some_op(read_some_op const&) = delete;
99 template<class DeducedHandler>
102 stream<NextLayer, deflateSupported>& ws,
103 MutableBufferSequence const& bs)
104 : h_(std::forward<DeducedHandler>(h))
108 , code_(close_code::none)
112 using allocator_type =
113 boost::asio::associated_allocator_t<Handler>;
116 get_allocator() const noexcept
118 return (boost::asio::get_associated_allocator)(h_);
121 using executor_type = boost::asio::associated_executor_t<
122 Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
125 get_executor() const noexcept
127 return (boost::asio::get_associated_executor)(
128 h_, ws_.get_executor());
139 std::size_t bytes_transferred = 0,
143 bool asio_handler_is_continuation(read_some_op* op)
145 using boost::asio::asio_handler_is_continuation;
146 return op->cont_ || asio_handler_is_continuation(
147 std::addressof(op->h_));
150 template<class Function>
152 void asio_handler_invoke(Function&& f, read_some_op* op)
154 using boost::asio::asio_handler_invoke;
155 asio_handler_invoke(f, std::addressof(op->h_));
159 template<class NextLayer, bool deflateSupported>
160 template<class MutableBufferSequence, class Handler>
162 stream<NextLayer, deflateSupported>::
163 read_some_op<MutableBufferSequence, Handler>::
166 std::size_t bytes_transferred,
169 using beast::detail::clamp;
170 using boost::asio::buffer;
171 using boost::asio::buffer_size;
173 BOOST_ASIO_CORO_REENTER(*this)
177 if(ws_.rd_block_.try_lock(this))
179 // Make sure the stream is not closed
180 if( ws_.status_ == status::closed ||
181 ws_.status_ == status::failed)
183 ec = boost::asio::error::operation_aborted;
191 BOOST_ASIO_CORO_YIELD
192 ws_.paused_r_rd_.emplace(std::move(*this));
194 // Acquire the read block
195 ws_.rd_block_.lock(this);
198 BOOST_ASIO_CORO_YIELD
200 ws_.get_executor(), std::move(*this));
201 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
203 // The only way to get read blocked is if
204 // a `close_op` wrote a close frame
205 BOOST_ASSERT(ws_.wr_close_);
206 BOOST_ASSERT(ws_.status_ != status::open);
207 ec = boost::asio::error::operation_aborted;
211 // if status_ == status::closing, we want to suspend
212 // the read operation until the close completes,
213 // then finish the read with operation_aborted.
216 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
217 // See if we need to read a frame header. This
218 // condition is structured to give the decompressor
219 // a chance to emit the final empty deflate block
221 if(ws_.rd_remain_ == 0 &&
222 (! ws_.rd_fh_.fin || ws_.rd_done_))
225 while(! ws_.parse_fh(
226 ws_.rd_fh_, ws_.rd_buf_, result_))
230 // _Fail the WebSocket Connection_
231 if(result_ == error::message_too_big)
232 code_ = close_code::too_big;
234 code_ = close_code::protocol_error;
237 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
238 BOOST_ASIO_CORO_YIELD
239 ws_.stream_.async_read_some(
240 ws_.rd_buf_.prepare(read_size(
241 ws_.rd_buf_, ws_.rd_buf_.max_size())),
243 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
244 if(! ws_.check_ok(ec))
246 ws_.rd_buf_.commit(bytes_transferred);
248 // Allow a close operation
249 // to acquire the read block
250 ws_.rd_block_.unlock(this);
251 if( ws_.paused_r_close_.maybe_invoke())
254 BOOST_ASSERT(ws_.rd_block_.is_locked());
257 // Acquire read block
258 ws_.rd_block_.lock(this);
260 // Immediately apply the mask to the portion
261 // of the buffer holding payload data.
262 if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
263 detail::mask_inplace(buffers_prefix(
264 clamp(ws_.rd_fh_.len),
265 ws_.rd_buf_.mutable_data()),
267 if(detail::is_control(ws_.rd_fh_.op))
269 // Clear this otherwise the next
270 // frame will be considered final.
271 ws_.rd_fh_.fin = false;
274 if(ws_.rd_fh_.op == detail::opcode::ping)
280 BOOST_ASIO_CORO_YIELD
288 auto const b = buffers_prefix(
289 clamp(ws_.rd_fh_.len),
291 auto const len = buffer_size(b);
292 BOOST_ASSERT(len == ws_.rd_fh_.len);
294 detail::read_ping(payload, b);
295 ws_.rd_buf_.consume(len);
296 // Ignore ping when closing
297 if(ws_.status_ == status::closing)
301 frame_type::ping, payload);
303 ws_.template write_ping<
304 flat_static_buffer_base>(ws_.rd_fb_,
305 detail::opcode::pong, payload);
308 // Allow a close operation
309 // to acquire the read block
310 ws_.rd_block_.unlock(this);
311 ws_.paused_r_close_.maybe_invoke();
314 if(! ws_.wr_block_.try_lock(this))
317 BOOST_ASIO_CORO_YIELD
318 ws_.paused_rd_.emplace(std::move(*this));
320 // Acquire the write block
321 ws_.wr_block_.lock(this);
324 BOOST_ASIO_CORO_YIELD
326 ws_.get_executor(), std::move(*this));
327 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
329 // Make sure the stream is open
330 if(! ws_.check_open(ec))
335 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
336 BOOST_ASIO_CORO_YIELD
337 boost::asio::async_write(ws_.stream_,
338 ws_.rd_fb_.data(), std::move(*this));
339 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
340 if(! ws_.check_ok(ec))
342 ws_.wr_block_.unlock(this);
343 ws_.paused_close_.maybe_invoke() ||
344 ws_.paused_ping_.maybe_invoke() ||
345 ws_.paused_wr_.maybe_invoke();
346 goto do_maybe_suspend;
349 if(ws_.rd_fh_.op == detail::opcode::pong)
351 // Ignore pong when closing
352 if(! ws_.wr_close_ && ws_.ctrl_cb_)
356 BOOST_ASIO_CORO_YIELD
363 auto const cb = buffers_prefix(clamp(
364 ws_.rd_fh_.len), ws_.rd_buf_.data());
365 auto const len = buffer_size(cb);
366 BOOST_ASSERT(len == ws_.rd_fh_.len);
368 detail::read_ping(payload, cb);
369 ws_.rd_buf_.consume(len);
370 // Ignore pong when closing
371 if(! ws_.wr_close_ && ws_.ctrl_cb_)
372 ws_.ctrl_cb_(frame_type::pong, payload);
375 // Handle close frame
376 BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
382 BOOST_ASIO_CORO_YIELD
389 auto const cb = buffers_prefix(clamp(
390 ws_.rd_fh_.len), ws_.rd_buf_.data());
391 auto const len = buffer_size(cb);
392 BOOST_ASSERT(len == ws_.rd_fh_.len);
393 BOOST_ASSERT(! ws_.rd_close_);
394 ws_.rd_close_ = true;
396 detail::read_close(cr, cb, result_);
399 // _Fail the WebSocket Connection_
400 code_ = close_code::protocol_error;
404 ws_.rd_buf_.consume(len);
406 ws_.ctrl_cb_(frame_type::close,
408 // See if we are already closing
409 if(ws_.status_ == status::closing)
411 // _Close the WebSocket Connection_
412 BOOST_ASSERT(ws_.wr_close_);
413 code_ = close_code::none;
414 result_ = error::closed;
417 // _Start the WebSocket Closing Handshake_
418 code_ = cr.code == close_code::none ?
420 static_cast<close_code>(cr.code);
421 result_ = error::closed;
425 if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
427 // Empty non-final frame
430 ws_.rd_done_ = false;
432 if(! ws_.rd_deflated())
434 if(ws_.rd_remain_ > 0)
436 if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
437 (std::min)(clamp(ws_.rd_remain_),
440 // Fill the read buffer first, otherwise we
441 // get fewer bytes at the cost of one I/O.
442 BOOST_ASIO_CORO_YIELD
443 ws_.stream_.async_read_some(
444 ws_.rd_buf_.prepare(read_size(
445 ws_.rd_buf_, ws_.rd_buf_.max_size())),
447 if(! ws_.check_ok(ec))
449 ws_.rd_buf_.commit(bytes_transferred);
451 detail::mask_inplace(buffers_prefix(clamp(
452 ws_.rd_remain_), ws_.rd_buf_.mutable_data()),
455 if(ws_.rd_buf_.size() > 0)
457 // Copy from the read buffer.
458 // The mask was already applied.
459 bytes_transferred = buffer_copy(cb_,
460 ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
461 auto const mb = buffers_prefix(
462 bytes_transferred, cb_);
463 ws_.rd_remain_ -= bytes_transferred;
464 if(ws_.rd_op_ == detail::opcode::text)
466 if(! ws_.rd_utf8_.write(mb) ||
467 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
468 ! ws_.rd_utf8_.finish()))
470 // _Fail the WebSocket Connection_
471 code_ = close_code::bad_payload;
472 result_ = error::bad_frame_payload;
476 bytes_written_ += bytes_transferred;
477 ws_.rd_size_ += bytes_transferred;
478 ws_.rd_buf_.consume(bytes_transferred);
482 // Read into caller's buffer
483 BOOST_ASSERT(ws_.rd_remain_ > 0);
484 BOOST_ASSERT(buffer_size(cb_) > 0);
485 BOOST_ASSERT(buffer_size(buffers_prefix(
486 clamp(ws_.rd_remain_), cb_)) > 0);
487 BOOST_ASIO_CORO_YIELD
488 ws_.stream_.async_read_some(buffers_prefix(
489 clamp(ws_.rd_remain_), cb_), std::move(*this));
490 if(! ws_.check_ok(ec))
492 BOOST_ASSERT(bytes_transferred > 0);
493 auto const mb = buffers_prefix(
494 bytes_transferred, cb_);
495 ws_.rd_remain_ -= bytes_transferred;
497 detail::mask_inplace(mb, ws_.rd_key_);
498 if(ws_.rd_op_ == detail::opcode::text)
500 if(! ws_.rd_utf8_.write(mb) ||
501 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
502 ! ws_.rd_utf8_.finish()))
504 // _Fail the WebSocket Connection_
505 code_ = close_code::bad_payload;
506 result_ = error::bad_frame_payload;
510 bytes_written_ += bytes_transferred;
511 ws_.rd_size_ += bytes_transferred;
514 ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
518 // Read compressed message frame payload:
519 // inflate even if rd_fh_.len == 0, otherwise we
520 // never emit the end-of-stream deflate block.
521 while(buffer_size(cb_) > 0)
523 if( ws_.rd_remain_ > 0 &&
524 ws_.rd_buf_.size() == 0 &&
528 BOOST_ASIO_CORO_YIELD
529 ws_.stream_.async_read_some(
530 ws_.rd_buf_.prepare(read_size(
531 ws_.rd_buf_, ws_.rd_buf_.max_size())),
533 if(! ws_.check_ok(ec))
535 BOOST_ASSERT(bytes_transferred > 0);
536 ws_.rd_buf_.commit(bytes_transferred);
538 detail::mask_inplace(
539 buffers_prefix(clamp(ws_.rd_remain_),
540 ws_.rd_buf_.mutable_data()), ws_.rd_key_);
545 auto const out = buffers_front(cb_);
546 zs.next_out = out.data();
547 zs.avail_out = out.size();
548 BOOST_ASSERT(zs.avail_out > 0);
550 if(ws_.rd_remain_ > 0)
552 if(ws_.rd_buf_.size() > 0)
555 auto const in = buffers_prefix(
556 clamp(ws_.rd_remain_), buffers_front(
557 ws_.rd_buf_.data()));
558 zs.avail_in = in.size();
559 zs.next_in = in.data();
566 else if(ws_.rd_fh_.fin)
568 // append the empty block codes
569 static std::uint8_t constexpr
571 0x00, 0x00, 0xff, 0xff };
572 zs.next_in = empty_block;
573 zs.avail_in = sizeof(empty_block);
574 ws_.inflate(zs, zlib::Flush::sync, ec);
577 // https://github.com/madler/zlib/issues/280
579 ec = error::partial_deflate_block;
581 if(! ws_.check_ok(ec))
583 ws_.do_context_takeover_read(ws_.role_);
591 ws_.inflate(zs, zlib::Flush::sync, ec);
592 if(! ws_.check_ok(ec))
594 if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
595 ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
597 // _Fail the WebSocket Connection_
598 code_ = close_code::too_big;
599 result_ = error::message_too_big;
602 cb_.consume(zs.total_out);
603 ws_.rd_size_ += zs.total_out;
604 ws_.rd_remain_ -= zs.total_in;
605 ws_.rd_buf_.consume(zs.total_in);
606 bytes_written_ += zs.total_out;
608 if(ws_.rd_op_ == detail::opcode::text)
611 if(! ws_.rd_utf8_.write(
612 buffers_prefix(bytes_written_, bs_)) || (
613 ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
615 // _Fail the WebSocket Connection_
616 code_ = close_code::bad_payload;
617 result_ = error::bad_frame_payload;
625 if(ws_.wr_block_.try_lock(this))
627 // Make sure the stream is open
628 BOOST_ASSERT(ws_.status_ == status::open);
633 BOOST_ASIO_CORO_YIELD
634 ws_.paused_rd_.emplace(std::move(*this));
636 // Acquire the write block
637 ws_.wr_block_.lock(this);
640 BOOST_ASIO_CORO_YIELD
642 ws_.get_executor(), std::move(*this));
643 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
645 // Make sure the stream is open
646 if(! ws_.check_open(ec))
651 ws_.status_ = status::closing;
655 ws_.wr_close_ = true;
657 // Serialize close frame
659 ws_.template write_close<
660 flat_static_buffer_base>(
664 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
665 BOOST_ASIO_CORO_YIELD
666 boost::asio::async_write(
667 ws_.stream_, ws_.rd_fb_.data(),
669 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
670 if(! ws_.check_ok(ec))
675 using beast::websocket::async_teardown;
676 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
677 BOOST_ASIO_CORO_YIELD
678 async_teardown(ws_.role_,
679 ws_.stream_, std::move(*this));
680 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
681 if(ec == boost::asio::error::eof)
684 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
685 ec.assign(0, ec.category());
689 if(ec && ec != error::closed)
690 ws_.status_ = status::failed;
692 ws_.status_ = status::closed;
696 ws_.rd_block_.try_unlock(this);
697 ws_.paused_r_close_.maybe_invoke();
698 if(ws_.wr_block_.try_unlock(this))
699 ws_.paused_close_.maybe_invoke() ||
700 ws_.paused_ping_.maybe_invoke() ||
701 ws_.paused_wr_.maybe_invoke();
703 return boost::asio::post(
704 ws_.stream_.get_executor(),
705 bind_handler(std::move(h_),
706 ec, bytes_written_));
707 h_(ec, bytes_written_);
711 //------------------------------------------------------------------------------
713 template<class NextLayer, bool deflateSupported>
717 class stream<NextLayer, deflateSupported>::read_op
718 : public boost::asio::coroutine
721 stream<NextLayer, deflateSupported>& ws_;
724 std::size_t bytes_written_ = 0;
728 using allocator_type =
729 boost::asio::associated_allocator_t<Handler>;
731 read_op(read_op&&) = default;
732 read_op(read_op const&) = delete;
734 template<class DeducedHandler>
737 stream<NextLayer, deflateSupported>& ws,
741 : h_(std::forward<DeducedHandler>(h))
744 , limit_(limit ? limit : (
745 std::numeric_limits<std::size_t>::max)())
751 get_allocator() const noexcept
753 return (boost::asio::get_associated_allocator)(h_);
756 using executor_type = boost::asio::associated_executor_t<
757 Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
760 get_executor() const noexcept
762 return (boost::asio::get_associated_executor)(
763 h_, ws_.get_executor());
768 std::size_t bytes_transferred = 0);
771 bool asio_handler_is_continuation(read_op* op)
773 using boost::asio::asio_handler_is_continuation;
774 return asio_handler_is_continuation(
775 std::addressof(op->h_));
778 template<class Function>
780 void asio_handler_invoke(Function&& f, read_op* op)
782 using boost::asio::asio_handler_invoke;
783 asio_handler_invoke(f, std::addressof(op->h_));
787 template<class NextLayer, bool deflateSupported>
788 template<class DynamicBuffer, class Handler>
790 stream<NextLayer, deflateSupported>::
791 read_op<DynamicBuffer, Handler>::
794 std::size_t bytes_transferred)
796 using beast::detail::clamp;
797 using buffers_type = typename
798 DynamicBuffer::mutable_buffers_type;
799 boost::optional<buffers_type> mb;
800 BOOST_ASIO_CORO_REENTER(*this)
806 mb.emplace(b_.prepare(clamp(
807 ws_.read_size_hint(b_), limit_)));
809 catch(std::length_error const&)
811 ec = error::buffer_overflow;
815 BOOST_ASIO_CORO_YIELD
818 bind_handler(std::move(*this),
819 error::buffer_overflow, 0));
822 BOOST_ASIO_CORO_YIELD
823 read_some_op<buffers_type, read_op>{
824 std::move(*this), ws_, *mb}(
828 b_.commit(bytes_transferred);
829 bytes_written_ += bytes_transferred;
831 while(! some_ && ! ws_.is_message_done());
832 h_(ec, bytes_written_);
836 //------------------------------------------------------------------------------
838 template<class NextLayer, bool deflateSupported>
839 template<class DynamicBuffer>
841 stream<NextLayer, deflateSupported>::
842 read(DynamicBuffer& buffer)
844 static_assert(is_sync_stream<next_layer_type>::value,
845 "SyncStream requirements not met");
847 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
848 "DynamicBuffer requirements not met");
850 auto const bytes_written = read(buffer, ec);
852 BOOST_THROW_EXCEPTION(system_error{ec});
853 return bytes_written;
856 template<class NextLayer, bool deflateSupported>
857 template<class DynamicBuffer>
859 stream<NextLayer, deflateSupported>::
860 read(DynamicBuffer& buffer, error_code& ec)
862 static_assert(is_sync_stream<next_layer_type>::value,
863 "SyncStream requirements not met");
865 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
866 "DynamicBuffer requirements not met");
867 std::size_t bytes_written = 0;
870 bytes_written += read_some(buffer, 0, ec);
872 return bytes_written;
874 while(! is_message_done());
875 return bytes_written;
878 template<class NextLayer, bool deflateSupported>
879 template<class DynamicBuffer, class ReadHandler>
880 BOOST_ASIO_INITFN_RESULT_TYPE(
881 ReadHandler, void(error_code, std::size_t))
882 stream<NextLayer, deflateSupported>::
883 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
885 static_assert(is_async_stream<next_layer_type>::value,
886 "AsyncStream requirements not met");
888 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
889 "DynamicBuffer requirements not met");
890 BOOST_BEAST_HANDLER_INIT(
891 ReadHandler, void(error_code, std::size_t));
894 BOOST_ASIO_HANDLER_TYPE(
895 ReadHandler, void(error_code, std::size_t))>{
896 std::move(init.completion_handler),
901 return init.result.get();
904 //------------------------------------------------------------------------------
906 template<class NextLayer, bool deflateSupported>
907 template<class DynamicBuffer>
909 stream<NextLayer, deflateSupported>::
911 DynamicBuffer& buffer,
914 static_assert(is_sync_stream<next_layer_type>::value,
915 "SyncStream requirements not met");
917 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
918 "DynamicBuffer requirements not met");
920 auto const bytes_written =
921 read_some(buffer, limit, ec);
923 BOOST_THROW_EXCEPTION(system_error{ec});
924 return bytes_written;
927 template<class NextLayer, bool deflateSupported>
928 template<class DynamicBuffer>
930 stream<NextLayer, deflateSupported>::
932 DynamicBuffer& buffer,
936 static_assert(is_sync_stream<next_layer_type>::value,
937 "SyncStream requirements not met");
939 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
940 "DynamicBuffer requirements not met");
941 using beast::detail::clamp;
943 limit = (std::numeric_limits<std::size_t>::max)();
945 clamp(read_size_hint(buffer), limit);
946 BOOST_ASSERT(size > 0);
947 boost::optional<typename
948 DynamicBuffer::mutable_buffers_type> mb;
951 mb.emplace(buffer.prepare(size));
953 catch(std::length_error const&)
955 ec = error::buffer_overflow;
958 auto const bytes_written = read_some(*mb, ec);
959 buffer.commit(bytes_written);
960 return bytes_written;
963 template<class NextLayer, bool deflateSupported>
964 template<class DynamicBuffer, class ReadHandler>
965 BOOST_ASIO_INITFN_RESULT_TYPE(
966 ReadHandler, void(error_code, std::size_t))
967 stream<NextLayer, deflateSupported>::
969 DynamicBuffer& buffer,
971 ReadHandler&& handler)
973 static_assert(is_async_stream<next_layer_type>::value,
974 "AsyncStream requirements not met");
976 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
977 "DynamicBuffer requirements not met");
978 BOOST_BEAST_HANDLER_INIT(
979 ReadHandler, void(error_code, std::size_t));
982 BOOST_ASIO_HANDLER_TYPE(
983 ReadHandler, void(error_code, std::size_t))>{
984 std::move(init.completion_handler),
989 return init.result.get();
992 //------------------------------------------------------------------------------
994 template<class NextLayer, bool deflateSupported>
995 template<class MutableBufferSequence>
997 stream<NextLayer, deflateSupported>::
999 MutableBufferSequence const& buffers)
1001 static_assert(is_sync_stream<next_layer_type>::value,
1002 "SyncStream requirements not met");
1003 static_assert(boost::asio::is_mutable_buffer_sequence<
1004 MutableBufferSequence>::value,
1005 "MutableBufferSequence requirements not met");
1007 auto const bytes_written = read_some(buffers, ec);
1009 BOOST_THROW_EXCEPTION(system_error{ec});
1010 return bytes_written;
1013 template<class NextLayer, bool deflateSupported>
1014 template<class MutableBufferSequence>
1016 stream<NextLayer, deflateSupported>::
1018 MutableBufferSequence const& buffers,
1021 static_assert(is_sync_stream<next_layer_type>::value,
1022 "SyncStream requirements not met");
1023 static_assert(boost::asio::is_mutable_buffer_sequence<
1024 MutableBufferSequence>::value,
1025 "MutableBufferSequence requirements not met");
1026 using beast::detail::clamp;
1027 using boost::asio::buffer;
1028 using boost::asio::buffer_size;
1030 std::size_t bytes_written = 0;
1031 ec.assign(0, ec.category());
1032 // Make sure the stream is open
1033 if(! check_open(ec))
1036 // See if we need to read a frame header. This
1037 // condition is structured to give the decompressor
1038 // a chance to emit the final empty deflate block
1040 if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
1042 // Read frame header
1044 while(! parse_fh(rd_fh_, rd_buf_, result))
1048 // _Fail the WebSocket Connection_
1049 if(result == error::message_too_big)
1050 code = close_code::too_big;
1052 code = close_code::protocol_error;
1053 do_fail(code, result, ec);
1054 return bytes_written;
1056 auto const bytes_transferred =
1058 rd_buf_.prepare(read_size(
1059 rd_buf_, rd_buf_.max_size())),
1062 return bytes_written;
1063 rd_buf_.commit(bytes_transferred);
1065 // Immediately apply the mask to the portion
1066 // of the buffer holding payload data.
1067 if(rd_fh_.len > 0 && rd_fh_.mask)
1068 detail::mask_inplace(buffers_prefix(
1069 clamp(rd_fh_.len), rd_buf_.mutable_data()),
1071 if(detail::is_control(rd_fh_.op))
1073 // Get control frame payload
1074 auto const b = buffers_prefix(
1075 clamp(rd_fh_.len), rd_buf_.data());
1076 auto const len = buffer_size(b);
1077 BOOST_ASSERT(len == rd_fh_.len);
1079 // Clear this otherwise the next
1080 // frame will be considered final.
1083 // Handle ping frame
1084 if(rd_fh_.op == detail::opcode::ping)
1087 detail::read_ping(payload, b);
1088 rd_buf_.consume(len);
1091 // Ignore ping when closing
1095 ctrl_cb_(frame_type::ping, payload);
1096 detail::frame_buffer fb;
1097 write_ping<flat_static_buffer_base>(fb,
1098 detail::opcode::pong, payload);
1099 boost::asio::write(stream_, fb.data(), ec);
1101 return bytes_written;
1104 // Handle pong frame
1105 if(rd_fh_.op == detail::opcode::pong)
1108 detail::read_ping(payload, b);
1109 rd_buf_.consume(len);
1111 ctrl_cb_(frame_type::pong, payload);
1114 // Handle close frame
1115 BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
1117 BOOST_ASSERT(! rd_close_);
1120 detail::read_close(cr, b, result);
1123 // _Fail the WebSocket Connection_
1124 do_fail(close_code::protocol_error,
1126 return bytes_written;
1129 rd_buf_.consume(len);
1131 ctrl_cb_(frame_type::close, cr_.reason);
1132 BOOST_ASSERT(! wr_close_);
1133 // _Start the WebSocket Closing Handshake_
1135 cr.code == close_code::none ?
1136 close_code::normal :
1137 static_cast<close_code>(cr.code),
1139 return bytes_written;
1142 if(rd_fh_.len == 0 && ! rd_fh_.fin)
1144 // Empty non-final frame
1151 ec.assign(0, ec.category());
1153 if(! this->rd_deflated())
1157 if(rd_buf_.size() == 0 && rd_buf_.max_size() >
1158 (std::min)(clamp(rd_remain_),
1159 buffer_size(buffers)))
1161 // Fill the read buffer first, otherwise we
1162 // get fewer bytes at the cost of one I/O.
1163 rd_buf_.commit(stream_.read_some(
1164 rd_buf_.prepare(read_size(rd_buf_,
1165 rd_buf_.max_size())), ec));
1167 return bytes_written;
1169 detail::mask_inplace(
1170 buffers_prefix(clamp(rd_remain_),
1171 rd_buf_.mutable_data()), rd_key_);
1173 if(rd_buf_.size() > 0)
1175 // Copy from the read buffer.
1176 // The mask was already applied.
1177 auto const bytes_transferred =
1178 buffer_copy(buffers, rd_buf_.data(),
1180 auto const mb = buffers_prefix(
1181 bytes_transferred, buffers);
1182 rd_remain_ -= bytes_transferred;
1183 if(rd_op_ == detail::opcode::text)
1185 if(! rd_utf8_.write(mb) ||
1186 (rd_remain_ == 0 && rd_fh_.fin &&
1187 ! rd_utf8_.finish()))
1189 // _Fail the WebSocket Connection_
1190 do_fail(close_code::bad_payload,
1191 error::bad_frame_payload, ec);
1192 return bytes_written;
1195 bytes_written += bytes_transferred;
1196 rd_size_ += bytes_transferred;
1197 rd_buf_.consume(bytes_transferred);
1201 // Read into caller's buffer
1202 BOOST_ASSERT(rd_remain_ > 0);
1203 BOOST_ASSERT(buffer_size(buffers) > 0);
1204 BOOST_ASSERT(buffer_size(buffers_prefix(
1205 clamp(rd_remain_), buffers)) > 0);
1206 auto const bytes_transferred =
1207 stream_.read_some(buffers_prefix(
1208 clamp(rd_remain_), buffers), ec);
1210 return bytes_written;
1211 BOOST_ASSERT(bytes_transferred > 0);
1212 auto const mb = buffers_prefix(
1213 bytes_transferred, buffers);
1214 rd_remain_ -= bytes_transferred;
1216 detail::mask_inplace(mb, rd_key_);
1217 if(rd_op_ == detail::opcode::text)
1219 if(! rd_utf8_.write(mb) ||
1220 (rd_remain_ == 0 && rd_fh_.fin &&
1221 ! rd_utf8_.finish()))
1223 // _Fail the WebSocket Connection_
1224 do_fail(close_code::bad_payload,
1225 error::bad_frame_payload, ec);
1226 return bytes_written;
1229 bytes_written += bytes_transferred;
1230 rd_size_ += bytes_transferred;
1233 rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
1237 // Read compressed message frame payload:
1238 // inflate even if rd_fh_.len == 0, otherwise we
1239 // never emit the end-of-stream deflate block.
1241 bool did_read = false;
1242 buffers_suffix<MutableBufferSequence> cb{buffers};
1243 while(buffer_size(cb) > 0)
1247 auto const out = buffers_front(cb);
1248 zs.next_out = out.data();
1249 zs.avail_out = out.size();
1250 BOOST_ASSERT(zs.avail_out > 0);
1254 if(rd_buf_.size() > 0)
1257 auto const in = buffers_prefix(
1258 clamp(rd_remain_), buffers_front(
1260 zs.avail_in = in.size();
1261 zs.next_in = in.data();
1266 auto const bytes_transferred =
1268 rd_buf_.prepare(read_size(
1269 rd_buf_, rd_buf_.max_size())),
1272 return bytes_written;
1273 BOOST_ASSERT(bytes_transferred > 0);
1274 rd_buf_.commit(bytes_transferred);
1276 detail::mask_inplace(
1277 buffers_prefix(clamp(rd_remain_),
1278 rd_buf_.mutable_data()), rd_key_);
1279 auto const in = buffers_prefix(
1280 clamp(rd_remain_), buffers_front(
1282 zs.avail_in = in.size();
1283 zs.next_in = in.data();
1293 // append the empty block codes
1294 static std::uint8_t constexpr
1296 0x00, 0x00, 0xff, 0xff };
1297 zs.next_in = empty_block;
1298 zs.avail_in = sizeof(empty_block);
1299 this->inflate(zs, zlib::Flush::sync, ec);
1302 // https://github.com/madler/zlib/issues/280
1303 if(zs.total_out > 0)
1304 ec = error::partial_deflate_block;
1307 return bytes_written;
1308 this->do_context_takeover_read(role_);
1316 this->inflate(zs, zlib::Flush::sync, ec);
1318 return bytes_written;
1319 if(rd_msg_max_ && beast::detail::sum_exceeds(
1320 rd_size_, zs.total_out, rd_msg_max_))
1322 do_fail(close_code::too_big,
1323 error::message_too_big, ec);
1324 return bytes_written;
1326 cb.consume(zs.total_out);
1327 rd_size_ += zs.total_out;
1328 rd_remain_ -= zs.total_in;
1329 rd_buf_.consume(zs.total_in);
1330 bytes_written += zs.total_out;
1332 if(rd_op_ == detail::opcode::text)
1335 if(! rd_utf8_.write(
1336 buffers_prefix(bytes_written, buffers)) || (
1337 rd_done_ && ! rd_utf8_.finish()))
1339 // _Fail the WebSocket Connection_
1340 do_fail(close_code::bad_payload,
1341 error::bad_frame_payload, ec);
1342 return bytes_written;
1346 return bytes_written;
1349 template<class NextLayer, bool deflateSupported>
1350 template<class MutableBufferSequence, class ReadHandler>
1351 BOOST_ASIO_INITFN_RESULT_TYPE(
1352 ReadHandler, void(error_code, std::size_t))
1353 stream<NextLayer, deflateSupported>::
1355 MutableBufferSequence const& buffers,
1356 ReadHandler&& handler)
1358 static_assert(is_async_stream<next_layer_type>::value,
1359 "AsyncStream requirements not met");
1360 static_assert(boost::asio::is_mutable_buffer_sequence<
1361 MutableBufferSequence>::value,
1362 "MutableBufferSequence requirements not met");
1363 BOOST_BEAST_HANDLER_INIT(
1364 ReadHandler, void(error_code, std::size_t));
1365 read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
1366 ReadHandler, void(error_code, std::size_t))>{
1367 std::move(init.completion_handler), *this, buffers}(
1369 return init.result.get();