2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/core/bind_handler.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/core/buffers_suffix.hpp>
17 #include <boost/beast/core/flat_static_buffer.hpp>
18 #include <boost/beast/core/type_traits.hpp>
19 #include <boost/beast/core/detail/clamp.hpp>
20 #include <boost/beast/core/detail/config.hpp>
21 #include <boost/asio/associated_allocator.hpp>
22 #include <boost/asio/associated_executor.hpp>
23 #include <boost/asio/coroutine.hpp>
24 #include <boost/asio/handler_continuation_hook.hpp>
25 #include <boost/asio/post.hpp>
26 #include <boost/assert.hpp>
27 #include <boost/config.hpp>
28 #include <boost/optional.hpp>
29 #include <boost/throw_exception.hpp>
38 /* Read some message frame data.
40 Also reads and handles control frames.
42 template<class NextLayer>
44 class MutableBufferSequence,
46 class stream<NextLayer>::read_some_op
47 : public boost::asio::coroutine
50 stream<NextLayer>& ws_;
51 MutableBufferSequence bs_;
52 buffers_suffix<MutableBufferSequence> cb_;
53 std::size_t bytes_written_ = 0;
57 bool did_read_ = false;
61 read_some_op(read_some_op&&) = default;
62 read_some_op(read_some_op const&) = default;
64 template<class DeducedHandler>
67 stream<NextLayer>& ws,
68 MutableBufferSequence const& bs)
69 : h_(std::forward<DeducedHandler>(h))
73 , tok_(ws_.tok_.unique())
74 , code_(close_code::none)
78 using allocator_type =
79 boost::asio::associated_allocator_t<Handler>;
82 get_allocator() const noexcept
84 return boost::asio::get_associated_allocator(h_);
87 using executor_type = boost::asio::associated_executor_t<
88 Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
91 get_executor() const noexcept
93 return boost::asio::get_associated_executor(
94 h_, ws_.get_executor());
105 std::size_t bytes_transferred = 0,
109 bool asio_handler_is_continuation(read_some_op* op)
111 using boost::asio::asio_handler_is_continuation;
112 return op->cont_ || asio_handler_is_continuation(
113 std::addressof(op->h_));
117 template<class NextLayer>
118 template<class MutableBufferSequence, class Handler>
121 read_some_op<MutableBufferSequence, Handler>::
124 std::size_t bytes_transferred,
127 using beast::detail::clamp;
128 using boost::asio::buffer;
129 using boost::asio::buffer_size;
132 BOOST_ASIO_CORO_REENTER(*this)
138 // Acquire the read block
139 ws_.rd_block_ = tok_;
141 // Make sure the stream is not closed
142 if( ws_.status_ == status::closed ||
143 ws_.status_ == status::failed)
145 ec = boost::asio::error::operation_aborted;
153 BOOST_ASSERT(ws_.rd_block_ != tok_);
154 BOOST_ASIO_CORO_YIELD
155 ws_.paused_r_rd_.save(std::move(*this));
157 // Acquire the read block
158 BOOST_ASSERT(! ws_.rd_block_);
159 ws_.rd_block_ = tok_;
162 BOOST_ASIO_CORO_YIELD
164 ws_.get_executor(), std::move(*this));
165 BOOST_ASSERT(ws_.rd_block_ == tok_);
167 // The only way to get read blocked is if
168 // a `close_op` wrote a close frame
169 BOOST_ASSERT(ws_.wr_close_);
170 BOOST_ASSERT(ws_.status_ != status::open);
171 ec = boost::asio::error::operation_aborted;
175 // if status_ == status::closing, we want to suspend
176 // the read operation until the close completes,
177 // then finish the read with operation_aborted.
180 BOOST_ASSERT(ws_.rd_block_ == tok_);
181 // See if we need to read a frame header. This
182 // condition is structured to give the decompressor
183 // a chance to emit the final empty deflate block
185 if(ws_.rd_remain_ == 0 &&
186 (! ws_.rd_fh_.fin || ws_.rd_done_))
189 while(! ws_.parse_fh(
190 ws_.rd_fh_, ws_.rd_buf_, code))
192 if(code != close_code::none)
194 // _Fail the WebSocket Connection_
199 BOOST_ASSERT(ws_.rd_block_ == tok_);
200 BOOST_ASIO_CORO_YIELD
201 ws_.stream_.async_read_some(
202 ws_.rd_buf_.prepare(read_size(
203 ws_.rd_buf_, ws_.rd_buf_.max_size())),
205 BOOST_ASSERT(ws_.rd_block_ == tok_);
206 if(! ws_.check_ok(ec))
208 ws_.rd_buf_.commit(bytes_transferred);
210 // Allow a close operation
211 // to acquire the read block
212 BOOST_ASSERT(ws_.rd_block_ == tok_);
213 ws_.rd_block_.reset();
214 if( ws_.paused_r_close_.maybe_invoke())
217 BOOST_ASSERT(ws_.rd_block_);
220 // Acquire read block
221 ws_.rd_block_ = tok_;
223 // Immediately apply the mask to the portion
224 // of the buffer holding payload data.
225 if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
226 detail::mask_inplace(buffers_prefix(
227 clamp(ws_.rd_fh_.len),
230 if(detail::is_control(ws_.rd_fh_.op))
232 // Clear this otherwise the next
233 // frame will be considered final.
234 ws_.rd_fh_.fin = false;
237 if(ws_.rd_fh_.op == detail::opcode::ping)
240 auto const b = buffers_prefix(
241 clamp(ws_.rd_fh_.len),
243 auto const len = buffer_size(b);
244 BOOST_ASSERT(len == ws_.rd_fh_.len);
246 detail::read_ping(payload, b);
247 ws_.rd_buf_.consume(len);
248 // Ignore ping when closing
249 if(ws_.status_ == status::closing)
252 ws_.ctrl_cb_(frame_type::ping, payload);
254 ws_.template write_ping<
255 flat_static_buffer_base>(ws_.rd_fb_,
256 detail::opcode::pong, payload);
259 //BOOST_ASSERT(! ws_.paused_r_close_);
261 // Allow a close operation
262 // to acquire the read block
263 BOOST_ASSERT(ws_.rd_block_ == tok_);
264 ws_.rd_block_.reset();
265 ws_.paused_r_close_.maybe_invoke();
270 // Acquire the write block
271 ws_.wr_block_ = tok_;
276 BOOST_ASSERT(ws_.wr_block_ != tok_);
277 BOOST_ASIO_CORO_YIELD
278 ws_.paused_rd_.save(std::move(*this));
280 // Acquire the write block
281 BOOST_ASSERT(! ws_.wr_block_);
282 ws_.wr_block_ = tok_;
285 BOOST_ASIO_CORO_YIELD
287 ws_.get_executor(), std::move(*this));
288 BOOST_ASSERT(ws_.wr_block_ == tok_);
290 // Make sure the stream is open
291 if(! ws_.check_open(ec))
296 BOOST_ASSERT(ws_.wr_block_ == tok_);
297 BOOST_ASIO_CORO_YIELD
298 boost::asio::async_write(ws_.stream_,
299 ws_.rd_fb_.data(), std::move(*this));
300 BOOST_ASSERT(ws_.wr_block_ == tok_);
301 if(! ws_.check_ok(ec))
303 ws_.wr_block_.reset();
304 ws_.paused_close_.maybe_invoke() ||
305 ws_.paused_ping_.maybe_invoke() ||
306 ws_.paused_wr_.maybe_invoke();
307 goto do_maybe_suspend;
310 if(ws_.rd_fh_.op == detail::opcode::pong)
312 auto const cb = buffers_prefix(clamp(
313 ws_.rd_fh_.len), ws_.rd_buf_.data());
314 auto const len = buffer_size(cb);
315 BOOST_ASSERT(len == ws_.rd_fh_.len);
316 code = close_code::none;
318 detail::read_ping(payload, cb);
319 ws_.rd_buf_.consume(len);
320 // Ignore pong when closing
321 if(! ws_.wr_close_ && ws_.ctrl_cb_)
322 ws_.ctrl_cb_(frame_type::pong, payload);
325 // Handle close frame
326 BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
328 auto const cb = buffers_prefix(clamp(
329 ws_.rd_fh_.len), ws_.rd_buf_.data());
330 auto const len = buffer_size(cb);
331 BOOST_ASSERT(len == ws_.rd_fh_.len);
332 BOOST_ASSERT(! ws_.rd_close_);
333 ws_.rd_close_ = true;
335 detail::read_close(cr, cb, code);
336 if(code != close_code::none)
338 // _Fail the WebSocket Connection_
344 ws_.rd_buf_.consume(len);
346 ws_.ctrl_cb_(frame_type::close,
348 // See if we are already closing
349 if(ws_.status_ == status::closing)
351 // _Close the WebSocket Connection_
352 BOOST_ASSERT(ws_.wr_close_);
353 code_ = close_code::none;
357 // _Start the WebSocket Closing Handshake_
358 code_ = cr.code == close_code::none ?
360 static_cast<close_code>(cr.code);
365 if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
367 // Empty non-final frame
370 ws_.rd_done_ = false;
372 if(! ws_.pmd_ || ! ws_.pmd_->rd_set)
374 if(ws_.rd_remain_ > 0)
376 if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
377 (std::min)(clamp(ws_.rd_remain_),
380 // Fill the read buffer first, otherwise we
381 // get fewer bytes at the cost of one I/O.
382 BOOST_ASIO_CORO_YIELD
383 ws_.stream_.async_read_some(
384 ws_.rd_buf_.prepare(read_size(
385 ws_.rd_buf_, ws_.rd_buf_.max_size())),
387 if(! ws_.check_ok(ec))
389 ws_.rd_buf_.commit(bytes_transferred);
391 detail::mask_inplace(buffers_prefix(clamp(
392 ws_.rd_remain_), ws_.rd_buf_.data()),
395 if(ws_.rd_buf_.size() > 0)
397 // Copy from the read buffer.
398 // The mask was already applied.
399 bytes_transferred = buffer_copy(cb_,
400 ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
401 auto const mb = buffers_prefix(
402 bytes_transferred, cb_);
403 ws_.rd_remain_ -= bytes_transferred;
404 if(ws_.rd_op_ == detail::opcode::text)
406 if(! ws_.rd_utf8_.write(mb) ||
407 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
408 ! ws_.rd_utf8_.finish()))
410 // _Fail the WebSocket Connection_
411 code_ = close_code::bad_payload;
416 bytes_written_ += bytes_transferred;
417 ws_.rd_size_ += bytes_transferred;
418 ws_.rd_buf_.consume(bytes_transferred);
422 // Read into caller's buffer
423 BOOST_ASSERT(ws_.rd_remain_ > 0);
424 BOOST_ASSERT(buffer_size(cb_) > 0);
425 BOOST_ASSERT(buffer_size(buffers_prefix(
426 clamp(ws_.rd_remain_), cb_)) > 0);
427 BOOST_ASIO_CORO_YIELD
428 ws_.stream_.async_read_some(buffers_prefix(
429 clamp(ws_.rd_remain_), cb_), std::move(*this));
430 if(! ws_.check_ok(ec))
432 BOOST_ASSERT(bytes_transferred > 0);
433 auto const mb = buffers_prefix(
434 bytes_transferred, cb_);
435 ws_.rd_remain_ -= bytes_transferred;
437 detail::mask_inplace(mb, ws_.rd_key_);
438 if(ws_.rd_op_ == detail::opcode::text)
440 if(! ws_.rd_utf8_.write(mb) ||
441 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
442 ! ws_.rd_utf8_.finish()))
444 // _Fail the WebSocket Connection_
445 code_ = close_code::bad_payload;
450 bytes_written_ += bytes_transferred;
451 ws_.rd_size_ += bytes_transferred;
454 ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
458 // Read compressed message frame payload:
459 // inflate even if rd_fh_.len == 0, otherwise we
460 // never emit the end-of-stream deflate block.
461 while(buffer_size(cb_) > 0)
463 if( ws_.rd_remain_ > 0 &&
464 ws_.rd_buf_.size() == 0 &&
468 BOOST_ASIO_CORO_YIELD
469 ws_.stream_.async_read_some(
470 ws_.rd_buf_.prepare(read_size(
471 ws_.rd_buf_, ws_.rd_buf_.max_size())),
473 if(! ws_.check_ok(ec))
475 BOOST_ASSERT(bytes_transferred > 0);
476 ws_.rd_buf_.commit(bytes_transferred);
478 detail::mask_inplace(
479 buffers_prefix(clamp(ws_.rd_remain_),
480 ws_.rd_buf_.data()), ws_.rd_key_);
485 auto const out = buffers_front(cb_);
486 zs.next_out = out.data();
487 zs.avail_out = out.size();
488 BOOST_ASSERT(zs.avail_out > 0);
490 if(ws_.rd_remain_ > 0)
492 if(ws_.rd_buf_.size() > 0)
495 auto const in = buffers_prefix(
496 clamp(ws_.rd_remain_), buffers_front(
497 ws_.rd_buf_.data()));
498 zs.avail_in = in.size();
499 zs.next_in = in.data();
506 else if(ws_.rd_fh_.fin)
508 // append the empty block codes
509 static std::uint8_t constexpr
511 0x00, 0x00, 0xff, 0xff };
512 zs.next_in = empty_block;
513 zs.avail_in = sizeof(empty_block);
514 ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
517 // https://github.com/madler/zlib/issues/280
519 ec = error::partial_deflate_block;
521 if(! ws_.check_ok(ec))
524 (ws_.role_ == role_type::client &&
525 ws_.pmd_config_.server_no_context_takeover) ||
526 (ws_.role_ == role_type::server &&
527 ws_.pmd_config_.client_no_context_takeover))
528 ws_.pmd_->zi.reset();
536 ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
537 if(! ws_.check_ok(ec))
539 if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
540 ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
542 // _Fail the WebSocket Connection_
543 code_ = close_code::too_big;
547 cb_.consume(zs.total_out);
548 ws_.rd_size_ += zs.total_out;
549 ws_.rd_remain_ -= zs.total_in;
550 ws_.rd_buf_.consume(zs.total_in);
551 bytes_written_ += zs.total_out;
553 if(ws_.rd_op_ == detail::opcode::text)
556 if(! ws_.rd_utf8_.write(
557 buffers_prefix(bytes_written_, bs_)) || (
558 ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
560 // _Fail the WebSocket Connection_
561 code_ = close_code::bad_payload;
572 // Acquire the write block
573 ws_.wr_block_ = tok_;
575 // Make sure the stream is open
576 BOOST_ASSERT(ws_.status_ == status::open);
581 BOOST_ASSERT(ws_.wr_block_ != tok_);
582 BOOST_ASIO_CORO_YIELD
583 ws_.paused_rd_.save(std::move(*this));
585 // Acquire the write block
586 BOOST_ASSERT(! ws_.wr_block_);
587 ws_.wr_block_ = tok_;
590 BOOST_ASIO_CORO_YIELD
592 ws_.get_executor(), std::move(*this));
593 BOOST_ASSERT(ws_.wr_block_ == tok_);
595 // Make sure the stream is open
596 if(! ws_.check_open(ec))
601 ws_.status_ = status::closing;
605 ws_.wr_close_ = true;
607 // Serialize close frame
609 ws_.template write_close<
610 flat_static_buffer_base>(
614 BOOST_ASSERT(ws_.wr_block_ == tok_);
615 BOOST_ASIO_CORO_YIELD
616 boost::asio::async_write(
617 ws_.stream_, ws_.rd_fb_.data(),
619 BOOST_ASSERT(ws_.wr_block_ == tok_);
620 if(! ws_.check_ok(ec))
625 using beast::websocket::async_teardown;
626 BOOST_ASSERT(ws_.wr_block_ == tok_);
627 BOOST_ASIO_CORO_YIELD
628 async_teardown(ws_.role_,
629 ws_.stream_, std::move(*this));
630 BOOST_ASSERT(ws_.wr_block_ == tok_);
631 if(ec == boost::asio::error::eof)
634 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
635 ec.assign(0, ec.category());
639 if(ec && ec != error::closed)
640 ws_.status_ = status::failed;
642 ws_.status_ = status::closed;
646 if(ws_.rd_block_ == tok_)
647 ws_.rd_block_.reset();
648 ws_.paused_r_close_.maybe_invoke();
649 if(ws_.wr_block_ == tok_)
651 ws_.wr_block_.reset();
652 ws_.paused_close_.maybe_invoke() ||
653 ws_.paused_ping_.maybe_invoke() ||
654 ws_.paused_wr_.maybe_invoke();
657 return boost::asio::post(
658 ws_.stream_.get_executor(),
659 bind_handler(std::move(h_),
660 ec, bytes_written_));
661 h_(ec, bytes_written_);
665 //------------------------------------------------------------------------------
667 template<class NextLayer>
671 class stream<NextLayer>::read_op
672 : public boost::asio::coroutine
675 stream<NextLayer>& ws_;
678 std::size_t bytes_written_ = 0;
682 using allocator_type =
683 boost::asio::associated_allocator_t<Handler>;
685 read_op(read_op&&) = default;
686 read_op(read_op const&) = default;
688 template<class DeducedHandler>
691 stream<NextLayer>& ws,
695 : h_(std::forward<DeducedHandler>(h))
698 , limit_(limit ? limit : (
699 std::numeric_limits<std::size_t>::max)())
705 get_allocator() const noexcept
707 return boost::asio::get_associated_allocator(h_);
710 using executor_type = boost::asio::associated_executor_t<
711 Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
714 get_executor() const noexcept
716 return boost::asio::get_associated_executor(
717 h_, ws_.get_executor());
722 std::size_t bytes_transferred = 0);
725 bool asio_handler_is_continuation(read_op* op)
727 using boost::asio::asio_handler_is_continuation;
728 return asio_handler_is_continuation(
729 std::addressof(op->h_));
733 template<class NextLayer>
734 template<class DynamicBuffer, class Handler>
737 read_op<DynamicBuffer, Handler>::
740 std::size_t bytes_transferred)
742 using beast::detail::clamp;
743 using buffers_type = typename
744 DynamicBuffer::mutable_buffers_type;
745 boost::optional<buffers_type> mb;
746 BOOST_ASIO_CORO_REENTER(*this)
752 mb.emplace(b_.prepare(clamp(
753 ws_.read_size_hint(b_), limit_)));
755 catch(std::length_error const&)
757 ec = error::buffer_overflow;
761 BOOST_ASIO_CORO_YIELD
764 bind_handler(std::move(*this),
765 error::buffer_overflow, 0));
768 BOOST_ASIO_CORO_YIELD
769 read_some_op<buffers_type, read_op>{
770 std::move(*this), ws_, *mb}(
774 b_.commit(bytes_transferred);
775 bytes_written_ += bytes_transferred;
777 while(! some_ && ! ws_.is_message_done());
778 h_(ec, bytes_written_);
782 //------------------------------------------------------------------------------
784 template<class NextLayer>
785 template<class DynamicBuffer>
788 read(DynamicBuffer& buffer)
790 static_assert(is_sync_stream<next_layer_type>::value,
791 "SyncStream requirements not met");
793 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
794 "DynamicBuffer requirements not met");
796 auto const bytes_written = read(buffer, ec);
798 BOOST_THROW_EXCEPTION(system_error{ec});
799 return bytes_written;
802 template<class NextLayer>
803 template<class DynamicBuffer>
806 read(DynamicBuffer& buffer, error_code& ec)
808 static_assert(is_sync_stream<next_layer_type>::value,
809 "SyncStream requirements not met");
811 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
812 "DynamicBuffer requirements not met");
813 std::size_t bytes_written = 0;
816 bytes_written += read_some(buffer, 0, ec);
818 return bytes_written;
820 while(! is_message_done());
821 return bytes_written;
824 template<class NextLayer>
825 template<class DynamicBuffer, class ReadHandler>
826 BOOST_ASIO_INITFN_RESULT_TYPE(
827 ReadHandler, void(error_code, std::size_t))
829 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
831 static_assert(is_async_stream<next_layer_type>::value,
832 "AsyncStream requirements requirements not met");
834 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
835 "DynamicBuffer requirements not met");
836 boost::asio::async_completion<
837 ReadHandler, void(error_code, std::size_t)> init{handler};
840 BOOST_ASIO_HANDLER_TYPE(
841 ReadHandler, void(error_code, std::size_t))>{
842 init.completion_handler,
847 return init.result.get();
850 //------------------------------------------------------------------------------
852 template<class NextLayer>
853 template<class DynamicBuffer>
857 DynamicBuffer& buffer,
860 static_assert(is_sync_stream<next_layer_type>::value,
861 "SyncStream requirements not met");
863 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
864 "DynamicBuffer requirements not met");
866 auto const bytes_written =
867 read_some(buffer, limit, ec);
869 BOOST_THROW_EXCEPTION(system_error{ec});
870 return bytes_written;
873 template<class NextLayer>
874 template<class DynamicBuffer>
878 DynamicBuffer& buffer,
882 static_assert(is_sync_stream<next_layer_type>::value,
883 "SyncStream requirements not met");
885 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
886 "DynamicBuffer requirements not met");
887 using beast::detail::clamp;
889 limit = (std::numeric_limits<std::size_t>::max)();
891 clamp(read_size_hint(buffer), limit);
892 BOOST_ASSERT(size > 0);
893 boost::optional<typename
894 DynamicBuffer::mutable_buffers_type> mb;
897 mb.emplace(buffer.prepare(size));
899 catch(std::length_error const&)
901 ec = error::buffer_overflow;
904 auto const bytes_written = read_some(*mb, ec);
905 buffer.commit(bytes_written);
906 return bytes_written;
909 template<class NextLayer>
910 template<class DynamicBuffer, class ReadHandler>
911 BOOST_ASIO_INITFN_RESULT_TYPE(
912 ReadHandler, void(error_code, std::size_t))
915 DynamicBuffer& buffer,
917 ReadHandler&& handler)
919 static_assert(is_async_stream<next_layer_type>::value,
920 "AsyncStream requirements requirements not met");
922 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
923 "DynamicBuffer requirements not met");
924 boost::asio::async_completion<ReadHandler,
925 void(error_code, std::size_t)> init{handler};
928 BOOST_ASIO_HANDLER_TYPE(
929 ReadHandler, void(error_code, std::size_t))>{
930 init.completion_handler,
935 return init.result.get();
938 //------------------------------------------------------------------------------
940 template<class NextLayer>
941 template<class MutableBufferSequence>
945 MutableBufferSequence const& buffers)
947 static_assert(is_sync_stream<next_layer_type>::value,
948 "SyncStream requirements not met");
949 static_assert(boost::asio::is_mutable_buffer_sequence<
950 MutableBufferSequence>::value,
951 "MutableBufferSequence requirements not met");
953 auto const bytes_written = read_some(buffers, ec);
955 BOOST_THROW_EXCEPTION(system_error{ec});
956 return bytes_written;
959 template<class NextLayer>
960 template<class MutableBufferSequence>
964 MutableBufferSequence const& buffers,
967 static_assert(is_sync_stream<next_layer_type>::value,
968 "SyncStream requirements not met");
969 static_assert(boost::asio::is_mutable_buffer_sequence<
970 MutableBufferSequence>::value,
971 "MutableBufferSequence requirements not met");
972 using beast::detail::clamp;
973 using boost::asio::buffer;
974 using boost::asio::buffer_size;
976 std::size_t bytes_written = 0;
977 ec.assign(0, ec.category());
978 // Make sure the stream is open
982 // See if we need to read a frame header. This
983 // condition is structured to give the decompressor
984 // a chance to emit the final empty deflate block
986 if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
989 while(! parse_fh(rd_fh_, rd_buf_, code))
991 if(code != close_code::none)
993 // _Fail the WebSocket Connection_
994 do_fail(code, error::failed, ec);
995 return bytes_written;
997 auto const bytes_transferred =
999 rd_buf_.prepare(read_size(
1000 rd_buf_, rd_buf_.max_size())),
1003 return bytes_written;
1004 rd_buf_.commit(bytes_transferred);
1006 // Immediately apply the mask to the portion
1007 // of the buffer holding payload data.
1008 if(rd_fh_.len > 0 && rd_fh_.mask)
1009 detail::mask_inplace(buffers_prefix(
1010 clamp(rd_fh_.len), rd_buf_.data()),
1012 if(detail::is_control(rd_fh_.op))
1014 // Get control frame payload
1015 auto const b = buffers_prefix(
1016 clamp(rd_fh_.len), rd_buf_.data());
1017 auto const len = buffer_size(b);
1018 BOOST_ASSERT(len == rd_fh_.len);
1020 // Clear this otherwise the next
1021 // frame will be considered final.
1024 // Handle ping frame
1025 if(rd_fh_.op == detail::opcode::ping)
1028 detail::read_ping(payload, b);
1029 rd_buf_.consume(len);
1032 // Ignore ping when closing
1036 ctrl_cb_(frame_type::ping, payload);
1037 detail::frame_buffer fb;
1038 write_ping<flat_static_buffer_base>(fb,
1039 detail::opcode::pong, payload);
1040 boost::asio::write(stream_, fb.data(), ec);
1042 return bytes_written;
1045 // Handle pong frame
1046 if(rd_fh_.op == detail::opcode::pong)
1049 detail::read_ping(payload, b);
1050 rd_buf_.consume(len);
1052 ctrl_cb_(frame_type::pong, payload);
1055 // Handle close frame
1056 BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
1058 BOOST_ASSERT(! rd_close_);
1061 detail::read_close(cr, b, code);
1062 if(code != close_code::none)
1064 // _Fail the WebSocket Connection_
1065 do_fail(code, error::failed, ec);
1066 return bytes_written;
1069 rd_buf_.consume(len);
1071 ctrl_cb_(frame_type::close, cr_.reason);
1072 BOOST_ASSERT(! wr_close_);
1073 // _Start the WebSocket Closing Handshake_
1075 cr.code == close_code::none ?
1076 close_code::normal :
1077 static_cast<close_code>(cr.code),
1079 return bytes_written;
1082 if(rd_fh_.len == 0 && ! rd_fh_.fin)
1084 // Empty non-final frame
1091 ec.assign(0, ec.category());
1093 if(! pmd_ || ! pmd_->rd_set)
1097 if(rd_buf_.size() == 0 && rd_buf_.max_size() >
1098 (std::min)(clamp(rd_remain_),
1099 buffer_size(buffers)))
1101 // Fill the read buffer first, otherwise we
1102 // get fewer bytes at the cost of one I/O.
1103 rd_buf_.commit(stream_.read_some(
1104 rd_buf_.prepare(read_size(rd_buf_,
1105 rd_buf_.max_size())), ec));
1107 return bytes_written;
1109 detail::mask_inplace(
1110 buffers_prefix(clamp(rd_remain_),
1111 rd_buf_.data()), rd_key_);
1113 if(rd_buf_.size() > 0)
1115 // Copy from the read buffer.
1116 // The mask was already applied.
1117 auto const bytes_transferred =
1118 buffer_copy(buffers, rd_buf_.data(),
1120 auto const mb = buffers_prefix(
1121 bytes_transferred, buffers);
1122 rd_remain_ -= bytes_transferred;
1123 if(rd_op_ == detail::opcode::text)
1125 if(! rd_utf8_.write(mb) ||
1126 (rd_remain_ == 0 && rd_fh_.fin &&
1127 ! rd_utf8_.finish()))
1129 // _Fail the WebSocket Connection_
1131 close_code::bad_payload,
1134 return bytes_written;
1137 bytes_written += bytes_transferred;
1138 rd_size_ += bytes_transferred;
1139 rd_buf_.consume(bytes_transferred);
1143 // Read into caller's buffer
1144 BOOST_ASSERT(rd_remain_ > 0);
1145 BOOST_ASSERT(buffer_size(buffers) > 0);
1146 BOOST_ASSERT(buffer_size(buffers_prefix(
1147 clamp(rd_remain_), buffers)) > 0);
1148 auto const bytes_transferred =
1149 stream_.read_some(buffers_prefix(
1150 clamp(rd_remain_), buffers), ec);
1152 return bytes_written;
1153 BOOST_ASSERT(bytes_transferred > 0);
1154 auto const mb = buffers_prefix(
1155 bytes_transferred, buffers);
1156 rd_remain_ -= bytes_transferred;
1158 detail::mask_inplace(mb, rd_key_);
1159 if(rd_op_ == detail::opcode::text)
1161 if(! rd_utf8_.write(mb) ||
1162 (rd_remain_ == 0 && rd_fh_.fin &&
1163 ! rd_utf8_.finish()))
1165 // _Fail the WebSocket Connection_
1166 do_fail(close_code::bad_payload,
1168 return bytes_written;
1171 bytes_written += bytes_transferred;
1172 rd_size_ += bytes_transferred;
1175 rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
1179 // Read compressed message frame payload:
1180 // inflate even if rd_fh_.len == 0, otherwise we
1181 // never emit the end-of-stream deflate block.
1183 bool did_read = false;
1184 buffers_suffix<MutableBufferSequence> cb{buffers};
1185 while(buffer_size(cb) > 0)
1189 auto const out = buffers_front(cb);
1190 zs.next_out = out.data();
1191 zs.avail_out = out.size();
1192 BOOST_ASSERT(zs.avail_out > 0);
1196 if(rd_buf_.size() > 0)
1199 auto const in = buffers_prefix(
1200 clamp(rd_remain_), buffers_front(
1202 zs.avail_in = in.size();
1203 zs.next_in = in.data();
1208 auto const bytes_transferred =
1210 rd_buf_.prepare(read_size(
1211 rd_buf_, rd_buf_.max_size())),
1214 return bytes_written;
1215 BOOST_ASSERT(bytes_transferred > 0);
1216 rd_buf_.commit(bytes_transferred);
1218 detail::mask_inplace(
1219 buffers_prefix(clamp(rd_remain_),
1220 rd_buf_.data()), rd_key_);
1221 auto const in = buffers_prefix(
1222 clamp(rd_remain_), buffers_front(
1224 zs.avail_in = in.size();
1225 zs.next_in = in.data();
1235 // append the empty block codes
1236 static std::uint8_t constexpr
1238 0x00, 0x00, 0xff, 0xff };
1239 zs.next_in = empty_block;
1240 zs.avail_in = sizeof(empty_block);
1241 pmd_->zi.write(zs, zlib::Flush::sync, ec);
1244 // https://github.com/madler/zlib/issues/280
1245 if(zs.total_out > 0)
1246 ec = error::partial_deflate_block;
1249 return bytes_written;
1251 (role_ == role_type::client &&
1252 pmd_config_.server_no_context_takeover) ||
1253 (role_ == role_type::server &&
1254 pmd_config_.client_no_context_takeover))
1263 pmd_->zi.write(zs, zlib::Flush::sync, ec);
1265 return bytes_written;
1266 if(rd_msg_max_ && beast::detail::sum_exceeds(
1267 rd_size_, zs.total_out, rd_msg_max_))
1269 do_fail(close_code::too_big,
1271 return bytes_written;
1273 cb.consume(zs.total_out);
1274 rd_size_ += zs.total_out;
1275 rd_remain_ -= zs.total_in;
1276 rd_buf_.consume(zs.total_in);
1277 bytes_written += zs.total_out;
1279 if(rd_op_ == detail::opcode::text)
1282 if(! rd_utf8_.write(
1283 buffers_prefix(bytes_written, buffers)) || (
1284 rd_done_ && ! rd_utf8_.finish()))
1286 // _Fail the WebSocket Connection_
1287 do_fail(close_code::bad_payload,
1289 return bytes_written;
1293 return bytes_written;
1296 template<class NextLayer>
1297 template<class MutableBufferSequence, class ReadHandler>
1298 BOOST_ASIO_INITFN_RESULT_TYPE(
1299 ReadHandler, void(error_code, std::size_t))
1302 MutableBufferSequence const& buffers,
1303 ReadHandler&& handler)
1305 static_assert(is_async_stream<next_layer_type>::value,
1306 "AsyncStream requirements requirements not met");
1307 static_assert(boost::asio::is_mutable_buffer_sequence<
1308 MutableBufferSequence>::value,
1309 "MutableBufferSequence requirements not met");
1310 boost::asio::async_completion<ReadHandler,
1311 void(error_code, std::size_t)> init{handler};
1312 read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
1313 ReadHandler, void(error_code, std::size_t))>{
1314 init.completion_handler,*this, buffers}(
1316 return init.result.get();