2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
13 #include <boost/beast/core/buffer_traits.hpp>
14 #include <boost/beast/websocket/teardown.hpp>
15 #include <boost/beast/websocket/detail/mask.hpp>
16 #include <boost/beast/websocket/impl/stream_impl.hpp>
17 #include <boost/beast/core/async_base.hpp>
18 #include <boost/beast/core/bind_handler.hpp>
19 #include <boost/beast/core/buffers_prefix.hpp>
20 #include <boost/beast/core/buffers_suffix.hpp>
21 #include <boost/beast/core/flat_static_buffer.hpp>
22 #include <boost/beast/core/read_size.hpp>
23 #include <boost/beast/core/stream_traits.hpp>
24 #include <boost/beast/core/detail/bind_continuation.hpp>
25 #include <boost/beast/core/detail/buffer.hpp>
26 #include <boost/beast/core/detail/clamp.hpp>
27 #include <boost/beast/core/detail/config.hpp>
28 #include <boost/asio/coroutine.hpp>
29 #include <boost/asio/post.hpp>
30 #include <boost/assert.hpp>
31 #include <boost/config.hpp>
32 #include <boost/optional.hpp>
33 #include <boost/throw_exception.hpp>
42 /* Read some message data into a buffer sequence.
44 Also reads and handles control frames.
46 template<class NextLayer, bool deflateSupported>
47 template<class Handler, class MutableBufferSequence>
48 class stream<NextLayer, deflateSupported>::read_some_op
49 : public beast::async_base<
50 Handler, beast::executor_type<stream>>
51 , public asio::coroutine
53 boost::weak_ptr<impl_type> wp_;
54 MutableBufferSequence bs_;
55 buffers_suffix<MutableBufferSequence> cb_;
56 std::size_t bytes_written_ = 0;
59 bool did_read_ = false;
62 static constexpr int id = 1; // for soft_mutex
64 template<class Handler_>
67 boost::shared_ptr<impl_type> const& sp,
68 MutableBufferSequence const& bs)
70 Handler, beast::executor_type<stream>>(
71 std::forward<Handler_>(h),
72 sp->stream().get_executor())
76 , code_(close_code::none)
78 (*this)({}, 0, false);
83 std::size_t bytes_transferred = 0,
86 using beast::detail::clamp;
90 ec = net::error::operation_aborted;
92 return this->complete(cont, ec, bytes_written_);
95 BOOST_ASIO_CORO_REENTER(*this)
97 impl.update_timer(this->get_executor());
100 // Acquire the read lock
101 if(! impl.rd_block.try_lock(this))
104 BOOST_ASIO_CORO_YIELD
105 impl.op_r_rd.emplace(std::move(*this));
106 impl.rd_block.lock(this);
107 BOOST_ASIO_CORO_YIELD
108 net::post(std::move(*this));
109 BOOST_ASSERT(impl.rd_block.is_locked(this));
111 // VFALCO Is this check correct here?
112 BOOST_ASSERT(! ec && impl.check_stop_now(ec));
113 if(impl.check_stop_now(ec))
115 BOOST_ASSERT(ec == net::error::operation_aborted);
118 // VFALCO Should never get here
120 // The only way to get read blocked is if
121 // a `close_op` wrote a close frame
122 BOOST_ASSERT(impl.wr_close);
123 BOOST_ASSERT(impl.status_ != status::open);
124 ec = net::error::operation_aborted;
129 // Make sure the stream is not closed
130 if( impl.status_ == status::closed ||
131 impl.status_ == status::failed)
133 ec = net::error::operation_aborted;
138 // if status_ == status::closing, we want to suspend
139 // the read operation until the close completes,
140 // then finish the read with operation_aborted.
143 BOOST_ASSERT(impl.rd_block.is_locked(this));
144 // See if we need to read a frame header. This
145 // condition is structured to give the decompressor
146 // a chance to emit the final empty deflate block
148 if(impl.rd_remain == 0 &&
149 (! impl.rd_fh.fin || impl.rd_done))
152 while(! impl.parse_fh(
153 impl.rd_fh, impl.rd_buf, result_))
157 // _Fail the WebSocket Connection_
158 if(result_ == error::message_too_big)
159 code_ = close_code::too_big;
161 code_ = close_code::protocol_error;
164 BOOST_ASSERT(impl.rd_block.is_locked(this));
165 BOOST_ASIO_CORO_YIELD
166 impl.stream().async_read_some(
167 impl.rd_buf.prepare(read_size(
168 impl.rd_buf, impl.rd_buf.max_size())),
170 BOOST_ASSERT(impl.rd_block.is_locked(this));
171 impl.rd_buf.commit(bytes_transferred);
172 if(impl.check_stop_now(ec))
176 // Allow a close operation
177 // to acquire the read block
178 impl.rd_block.unlock(this);
179 if( impl.op_r_close.maybe_invoke())
182 BOOST_ASSERT(impl.rd_block.is_locked());
185 // Acquire read block
186 impl.rd_block.lock(this);
188 // Immediately apply the mask to the portion
189 // of the buffer holding payload data.
190 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
191 detail::mask_inplace(buffers_prefix(
192 clamp(impl.rd_fh.len),
195 if(detail::is_control(impl.rd_fh.op))
197 // Clear this otherwise the next
198 // frame will be considered final.
199 impl.rd_fh.fin = false;
202 if(impl.rd_fh.op == detail::opcode::ping)
208 BOOST_ASIO_CORO_YIELD
209 net::post(std::move(*this));
211 // VFALCO call check_stop_now() here?
215 auto const b = buffers_prefix(
216 clamp(impl.rd_fh.len),
218 auto const len = buffer_bytes(b);
219 BOOST_ASSERT(len == impl.rd_fh.len);
221 detail::read_ping(payload, b);
222 impl.rd_buf.consume(len);
223 // Ignore ping when closing
224 if(impl.status_ == status::closing)
228 frame_type::ping, payload);
230 impl.template write_ping<
231 flat_static_buffer_base>(impl.rd_fb,
232 detail::opcode::pong, payload);
235 // Allow a close operation
236 // to acquire the read block
237 impl.rd_block.unlock(this);
238 impl.op_r_close.maybe_invoke();
240 // Acquire the write lock
241 if(! impl.wr_block.try_lock(this))
243 BOOST_ASIO_CORO_YIELD
244 impl.op_rd.emplace(std::move(*this));
245 impl.wr_block.lock(this);
246 BOOST_ASIO_CORO_YIELD
247 net::post(std::move(*this));
248 BOOST_ASSERT(impl.wr_block.is_locked(this));
249 if(impl.check_stop_now(ec))
254 BOOST_ASSERT(impl.wr_block.is_locked(this));
255 BOOST_ASIO_CORO_YIELD
257 impl.stream(), impl.rd_fb.data(),
258 beast::detail::bind_continuation(std::move(*this)));
259 BOOST_ASSERT(impl.wr_block.is_locked(this));
260 if(impl.check_stop_now(ec))
262 impl.wr_block.unlock(this);
263 impl.op_close.maybe_invoke()
264 || impl.op_idle_ping.maybe_invoke()
265 || impl.op_ping.maybe_invoke()
266 || impl.op_wr.maybe_invoke();
267 goto acquire_read_lock;
271 if(impl.rd_fh.op == detail::opcode::pong)
273 // Ignore pong when closing
274 if(! impl.wr_close && impl.ctrl_cb)
278 BOOST_ASIO_CORO_YIELD
279 net::post(std::move(*this));
283 auto const cb = buffers_prefix(clamp(
284 impl.rd_fh.len), impl.rd_buf.data());
285 auto const len = buffer_bytes(cb);
286 BOOST_ASSERT(len == impl.rd_fh.len);
288 detail::read_ping(payload, cb);
289 impl.rd_buf.consume(len);
290 // Ignore pong when closing
291 if(! impl.wr_close && impl.ctrl_cb)
292 impl.ctrl_cb(frame_type::pong, payload);
296 // Handle close frame
297 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
303 BOOST_ASIO_CORO_YIELD
304 net::post(std::move(*this));
308 auto const cb = buffers_prefix(clamp(
309 impl.rd_fh.len), impl.rd_buf.data());
310 auto const len = buffer_bytes(cb);
311 BOOST_ASSERT(len == impl.rd_fh.len);
312 BOOST_ASSERT(! impl.rd_close);
313 impl.rd_close = true;
315 detail::read_close(cr, cb, result_);
318 // _Fail the WebSocket Connection_
319 code_ = close_code::protocol_error;
323 impl.rd_buf.consume(len);
325 impl.ctrl_cb(frame_type::close,
327 // See if we are already closing
328 if(impl.status_ == status::closing)
330 // _Close the WebSocket Connection_
331 BOOST_ASSERT(impl.wr_close);
332 code_ = close_code::none;
333 result_ = error::closed;
336 // _Start the WebSocket Closing Handshake_
337 code_ = cr.code == close_code::none ?
339 static_cast<close_code>(cr.code);
340 result_ = error::closed;
344 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
346 // Empty non-final frame
349 impl.rd_done = false;
351 if(! impl.rd_deflated())
353 if(impl.rd_remain > 0)
355 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
356 (std::min)(clamp(impl.rd_remain),
359 // Fill the read buffer first, otherwise we
360 // get fewer bytes at the cost of one I/O.
361 BOOST_ASIO_CORO_YIELD
362 impl.stream().async_read_some(
363 impl.rd_buf.prepare(read_size(
364 impl.rd_buf, impl.rd_buf.max_size())),
366 impl.rd_buf.commit(bytes_transferred);
367 if(impl.check_stop_now(ec))
371 detail::mask_inplace(buffers_prefix(clamp(
372 impl.rd_remain), impl.rd_buf.data()),
375 if(impl.rd_buf.size() > 0)
377 // Copy from the read buffer.
378 // The mask was already applied.
379 bytes_transferred = net::buffer_copy(cb_,
380 impl.rd_buf.data(), clamp(impl.rd_remain));
381 auto const mb = buffers_prefix(
382 bytes_transferred, cb_);
383 impl.rd_remain -= bytes_transferred;
384 if(impl.rd_op == detail::opcode::text)
386 if(! impl.rd_utf8.write(mb) ||
387 (impl.rd_remain == 0 && impl.rd_fh.fin &&
388 ! impl.rd_utf8.finish()))
390 // _Fail the WebSocket Connection_
391 code_ = close_code::bad_payload;
392 result_ = error::bad_frame_payload;
396 bytes_written_ += bytes_transferred;
397 impl.rd_size += bytes_transferred;
398 impl.rd_buf.consume(bytes_transferred);
402 // Read into caller's buffer
403 BOOST_ASSERT(impl.rd_remain > 0);
404 BOOST_ASSERT(buffer_bytes(cb_) > 0);
405 BOOST_ASSERT(buffer_bytes(buffers_prefix(
406 clamp(impl.rd_remain), cb_)) > 0);
407 BOOST_ASIO_CORO_YIELD
408 impl.stream().async_read_some(buffers_prefix(
409 clamp(impl.rd_remain), cb_), std::move(*this));
410 if(impl.check_stop_now(ec))
413 BOOST_ASSERT(bytes_transferred > 0);
414 auto const mb = buffers_prefix(
415 bytes_transferred, cb_);
416 impl.rd_remain -= bytes_transferred;
418 detail::mask_inplace(mb, impl.rd_key);
419 if(impl.rd_op == detail::opcode::text)
421 if(! impl.rd_utf8.write(mb) ||
422 (impl.rd_remain == 0 && impl.rd_fh.fin &&
423 ! impl.rd_utf8.finish()))
425 // _Fail the WebSocket Connection_
426 code_ = close_code::bad_payload;
427 result_ = error::bad_frame_payload;
431 bytes_written_ += bytes_transferred;
432 impl.rd_size += bytes_transferred;
435 impl.rd_done = impl.rd_remain == 0 && impl.rd_fh.fin;
439 // Read compressed message frame payload:
440 // inflate even if rd_fh_.len == 0, otherwise we
441 // never emit the end-of-stream deflate block.
442 while(buffer_bytes(cb_) > 0)
444 if( impl.rd_remain > 0 &&
445 impl.rd_buf.size() == 0 &&
449 BOOST_ASIO_CORO_YIELD
450 impl.stream().async_read_some(
451 impl.rd_buf.prepare(read_size(
452 impl.rd_buf, impl.rd_buf.max_size())),
454 if(impl.check_stop_now(ec))
457 BOOST_ASSERT(bytes_transferred > 0);
458 impl.rd_buf.commit(bytes_transferred);
460 detail::mask_inplace(
461 buffers_prefix(clamp(impl.rd_remain),
462 impl.rd_buf.data()), impl.rd_key);
467 auto const out = buffers_front(cb_);
468 zs.next_out = out.data();
469 zs.avail_out = out.size();
470 BOOST_ASSERT(zs.avail_out > 0);
472 if(impl.rd_remain > 0)
474 if(impl.rd_buf.size() > 0)
477 auto const in = buffers_prefix(
478 clamp(impl.rd_remain), buffers_front(
479 impl.rd_buf.data()));
480 zs.avail_in = in.size();
481 zs.next_in = in.data();
488 else if(impl.rd_fh.fin)
490 // append the empty block codes
491 std::uint8_t constexpr
492 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
493 zs.next_in = empty_block;
494 zs.avail_in = sizeof(empty_block);
495 impl.inflate(zs, zlib::Flush::sync, ec);
498 // https://github.com/madler/zlib/issues/280
500 ec = error::partial_deflate_block;
502 if(impl.check_stop_now(ec))
504 impl.do_context_takeover_read(impl.role);
512 impl.inflate(zs, zlib::Flush::sync, ec);
513 if(impl.check_stop_now(ec))
515 if(impl.rd_msg_max && beast::detail::sum_exceeds(
516 impl.rd_size, zs.total_out, impl.rd_msg_max))
518 // _Fail the WebSocket Connection_
519 code_ = close_code::too_big;
520 result_ = error::message_too_big;
523 cb_.consume(zs.total_out);
524 impl.rd_size += zs.total_out;
525 impl.rd_remain -= zs.total_in;
526 impl.rd_buf.consume(zs.total_in);
527 bytes_written_ += zs.total_out;
529 if(impl.rd_op == detail::opcode::text)
532 if(! impl.rd_utf8.write(
533 buffers_prefix(bytes_written_, bs_)) || (
534 impl.rd_done && ! impl.rd_utf8.finish()))
536 // _Fail the WebSocket Connection_
537 code_ = close_code::bad_payload;
538 result_ = error::bad_frame_payload;
546 // Acquire the write lock
547 if(! impl.wr_block.try_lock(this))
549 BOOST_ASIO_CORO_YIELD
550 impl.op_rd.emplace(std::move(*this));
551 impl.wr_block.lock(this);
552 BOOST_ASIO_CORO_YIELD
553 net::post(std::move(*this));
554 BOOST_ASSERT(impl.wr_block.is_locked(this));
555 if(impl.check_stop_now(ec))
559 impl.change_status(status::closing);
563 impl.wr_close = true;
565 // Serialize close frame
567 impl.template write_close<
568 flat_static_buffer_base>(
572 BOOST_ASSERT(impl.wr_block.is_locked(this));
573 BOOST_ASIO_CORO_YIELD
574 net::async_write(impl.stream(), impl.rd_fb.data(),
575 beast::detail::bind_continuation(std::move(*this)));
576 BOOST_ASSERT(impl.wr_block.is_locked(this));
577 if(impl.check_stop_now(ec))
582 using beast::websocket::async_teardown;
583 BOOST_ASSERT(impl.wr_block.is_locked(this));
584 BOOST_ASIO_CORO_YIELD
585 async_teardown(impl.role, impl.stream(),
586 beast::detail::bind_continuation(std::move(*this)));
587 BOOST_ASSERT(impl.wr_block.is_locked(this));
588 if(ec == net::error::eof)
591 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
596 if(ec && ec != error::closed)
597 impl.change_status(status::failed);
599 impl.change_status(status::closed);
603 impl.rd_block.try_unlock(this);
604 impl.op_r_close.maybe_invoke();
605 if(impl.wr_block.try_unlock(this))
606 impl.op_close.maybe_invoke()
607 || impl.op_idle_ping.maybe_invoke()
608 || impl.op_ping.maybe_invoke()
609 || impl.op_wr.maybe_invoke();
610 this->complete(cont, ec, bytes_written_);
615 //------------------------------------------------------------------------------
617 template<class NextLayer, bool deflateSupported>
618 template<class Handler, class DynamicBuffer>
619 class stream<NextLayer, deflateSupported>::read_op
620 : public beast::async_base<
621 Handler, beast::executor_type<stream>>
622 , public asio::coroutine
624 boost::weak_ptr<impl_type> wp_;
627 std::size_t bytes_written_ = 0;
631 template<class Handler_>
634 boost::shared_ptr<impl_type> const& sp,
638 : async_base<Handler,
639 beast::executor_type<stream>>(
640 std::forward<Handler_>(h),
641 sp->stream().get_executor())
644 , limit_(limit ? limit : (
645 std::numeric_limits<std::size_t>::max)())
648 (*this)({}, 0, false);
653 std::size_t bytes_transferred = 0,
656 using beast::detail::clamp;
657 auto sp = wp_.lock();
660 ec = net::error::operation_aborted;
662 return this->complete(cont, ec, bytes_written_);
665 using mutable_buffers_type = typename
666 DynamicBuffer::mutable_buffers_type;
667 BOOST_ASIO_CORO_REENTER(*this)
671 // VFALCO TODO use boost::beast::bind_continuation
672 BOOST_ASIO_CORO_YIELD
674 auto mb = beast::detail::dynamic_buffer_prepare(b_,
675 clamp(impl.read_size_hint_db(b_), limit_),
676 ec, error::buffer_overflow);
677 if(impl.check_stop_now(ec))
679 read_some_op<read_op, mutable_buffers_type>(
680 std::move(*this), sp, *mb);
683 b_.commit(bytes_transferred);
684 bytes_written_ += bytes_transferred;
688 while(! some_ && ! impl.rd_done);
691 this->complete(cont, ec, bytes_written_);
696 template<class NextLayer, bool deflateSupported>
697 struct stream<NextLayer, deflateSupported>::
702 class MutableBufferSequence>
706 boost::shared_ptr<impl_type> const& sp,
707 MutableBufferSequence const& b)
709 // If you get an error on the following line it means
710 // that your handler does not meet the documented type
711 // requirements for the handler.
714 beast::detail::is_invocable<ReadHandler,
715 void(error_code, std::size_t)>::value,
716 "ReadHandler type requirements not met");
719 typename std::decay<ReadHandler>::type,
720 MutableBufferSequence>(
721 std::forward<ReadHandler>(h),
727 template<class NextLayer, bool deflateSupported>
728 struct stream<NextLayer, deflateSupported>::
737 boost::shared_ptr<impl_type> const& sp,
742 // If you get an error on the following line it means
743 // that your handler does not meet the documented type
744 // requirements for the handler.
747 beast::detail::is_invocable<ReadHandler,
748 void(error_code, std::size_t)>::value,
749 "ReadHandler type requirements not met");
752 typename std::decay<ReadHandler>::type,
754 std::forward<ReadHandler>(h),
762 //------------------------------------------------------------------------------
764 template<class NextLayer, bool deflateSupported>
765 template<class DynamicBuffer>
767 stream<NextLayer, deflateSupported>::
768 read(DynamicBuffer& buffer)
770 static_assert(is_sync_stream<next_layer_type>::value,
771 "SyncStream type requirements not met");
773 net::is_dynamic_buffer<DynamicBuffer>::value,
774 "DynamicBuffer type requirements not met");
776 auto const bytes_written = read(buffer, ec);
778 BOOST_THROW_EXCEPTION(system_error{ec});
779 return bytes_written;
782 template<class NextLayer, bool deflateSupported>
783 template<class DynamicBuffer>
785 stream<NextLayer, deflateSupported>::
786 read(DynamicBuffer& buffer, error_code& ec)
788 static_assert(is_sync_stream<next_layer_type>::value,
789 "SyncStream type requirements not met");
791 net::is_dynamic_buffer<DynamicBuffer>::value,
792 "DynamicBuffer type requirements not met");
793 std::size_t bytes_written = 0;
796 bytes_written += read_some(buffer, 0, ec);
798 return bytes_written;
800 while(! is_message_done());
801 return bytes_written;
804 template<class NextLayer, bool deflateSupported>
805 template<class DynamicBuffer, class ReadHandler>
806 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
807 stream<NextLayer, deflateSupported>::
808 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
810 static_assert(is_async_stream<next_layer_type>::value,
811 "AsyncStream type requirements not met");
813 net::is_dynamic_buffer<DynamicBuffer>::value,
814 "DynamicBuffer type requirements not met");
815 return net::async_initiate<
817 void(error_code, std::size_t)>(
826 //------------------------------------------------------------------------------
828 template<class NextLayer, bool deflateSupported>
829 template<class DynamicBuffer>
831 stream<NextLayer, deflateSupported>::
833 DynamicBuffer& buffer,
836 static_assert(is_sync_stream<next_layer_type>::value,
837 "SyncStream type requirements not met");
839 net::is_dynamic_buffer<DynamicBuffer>::value,
840 "DynamicBuffer type requirements not met");
842 auto const bytes_written =
843 read_some(buffer, limit, ec);
845 BOOST_THROW_EXCEPTION(system_error{ec});
846 return bytes_written;
849 template<class NextLayer, bool deflateSupported>
850 template<class DynamicBuffer>
852 stream<NextLayer, deflateSupported>::
854 DynamicBuffer& buffer,
858 static_assert(is_sync_stream<next_layer_type>::value,
859 "SyncStream type requirements not met");
861 net::is_dynamic_buffer<DynamicBuffer>::value,
862 "DynamicBuffer type requirements not met");
863 using beast::detail::clamp;
865 limit = (std::numeric_limits<std::size_t>::max)();
867 clamp(read_size_hint(buffer), limit);
868 BOOST_ASSERT(size > 0);
869 auto mb = beast::detail::dynamic_buffer_prepare(
870 buffer, size, ec, error::buffer_overflow);
871 if(impl_->check_stop_now(ec))
873 auto const bytes_written = read_some(*mb, ec);
874 buffer.commit(bytes_written);
875 return bytes_written;
878 template<class NextLayer, bool deflateSupported>
879 template<class DynamicBuffer, class ReadHandler>
880 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
881 stream<NextLayer, deflateSupported>::
883 DynamicBuffer& buffer,
885 ReadHandler&& handler)
887 static_assert(is_async_stream<next_layer_type>::value,
888 "AsyncStream type requirements not met");
890 net::is_dynamic_buffer<DynamicBuffer>::value,
891 "DynamicBuffer type requirements not met");
892 return net::async_initiate<
894 void(error_code, std::size_t)>(
903 //------------------------------------------------------------------------------
905 template<class NextLayer, bool deflateSupported>
906 template<class MutableBufferSequence>
908 stream<NextLayer, deflateSupported>::
910 MutableBufferSequence const& buffers)
912 static_assert(is_sync_stream<next_layer_type>::value,
913 "SyncStream type requirements not met");
914 static_assert(net::is_mutable_buffer_sequence<
915 MutableBufferSequence>::value,
916 "MutableBufferSequence type requirements not met");
918 auto const bytes_written = read_some(buffers, ec);
920 BOOST_THROW_EXCEPTION(system_error{ec});
921 return bytes_written;
924 template<class NextLayer, bool deflateSupported>
925 template<class MutableBufferSequence>
927 stream<NextLayer, deflateSupported>::
929 MutableBufferSequence const& buffers,
932 static_assert(is_sync_stream<next_layer_type>::value,
933 "SyncStream type requirements not met");
934 static_assert(net::is_mutable_buffer_sequence<
935 MutableBufferSequence>::value,
936 "MutableBufferSequence type requirements not met");
937 using beast::detail::clamp;
940 std::size_t bytes_written = 0;
942 // Make sure the stream is open
943 if(impl.check_stop_now(ec))
944 return bytes_written;
946 // See if we need to read a frame header. This
947 // condition is structured to give the decompressor
948 // a chance to emit the final empty deflate block
950 if(impl.rd_remain == 0 && (
951 ! impl.rd_fh.fin || impl.rd_done))
955 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
959 // _Fail the WebSocket Connection_
960 if(result == error::message_too_big)
961 code = close_code::too_big;
963 code = close_code::protocol_error;
964 do_fail(code, result, ec);
965 return bytes_written;
967 auto const bytes_transferred =
968 impl.stream().read_some(
969 impl.rd_buf.prepare(read_size(
970 impl.rd_buf, impl.rd_buf.max_size())),
972 impl.rd_buf.commit(bytes_transferred);
973 if(impl.check_stop_now(ec))
974 return bytes_written;
976 // Immediately apply the mask to the portion
977 // of the buffer holding payload data.
978 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
979 detail::mask_inplace(buffers_prefix(
980 clamp(impl.rd_fh.len), impl.rd_buf.data()),
982 if(detail::is_control(impl.rd_fh.op))
984 // Get control frame payload
985 auto const b = buffers_prefix(
986 clamp(impl.rd_fh.len), impl.rd_buf.data());
987 auto const len = buffer_bytes(b);
988 BOOST_ASSERT(len == impl.rd_fh.len);
990 // Clear this otherwise the next
991 // frame will be considered final.
992 impl.rd_fh.fin = false;
995 if(impl.rd_fh.op == detail::opcode::ping)
998 detail::read_ping(payload, b);
999 impl.rd_buf.consume(len);
1002 // Ignore ping when closing
1006 impl.ctrl_cb(frame_type::ping, payload);
1007 detail::frame_buffer fb;
1008 impl.template write_ping<flat_static_buffer_base>(fb,
1009 detail::opcode::pong, payload);
1010 net::write(impl.stream(), fb.data(), ec);
1011 if(impl.check_stop_now(ec))
1012 return bytes_written;
1015 // Handle pong frame
1016 if(impl.rd_fh.op == detail::opcode::pong)
1019 detail::read_ping(payload, b);
1020 impl.rd_buf.consume(len);
1022 impl.ctrl_cb(frame_type::pong, payload);
1025 // Handle close frame
1026 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1028 BOOST_ASSERT(! impl.rd_close);
1029 impl.rd_close = true;
1031 detail::read_close(cr, b, result);
1034 // _Fail the WebSocket Connection_
1035 do_fail(close_code::protocol_error,
1037 return bytes_written;
1040 impl.rd_buf.consume(len);
1042 impl.ctrl_cb(frame_type::close, impl.cr.reason);
1043 BOOST_ASSERT(! impl.wr_close);
1044 // _Start the WebSocket Closing Handshake_
1046 cr.code == close_code::none ?
1047 close_code::normal :
1048 static_cast<close_code>(cr.code),
1050 return bytes_written;
1053 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1055 // Empty non-final frame
1058 impl.rd_done = false;
1064 if(! impl.rd_deflated())
1066 if(impl.rd_remain > 0)
1068 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1069 (std::min)(clamp(impl.rd_remain),
1070 buffer_bytes(buffers)))
1072 // Fill the read buffer first, otherwise we
1073 // get fewer bytes at the cost of one I/O.
1074 impl.rd_buf.commit(impl.stream().read_some(
1075 impl.rd_buf.prepare(read_size(impl.rd_buf,
1076 impl.rd_buf.max_size())), ec));
1077 if(impl.check_stop_now(ec))
1078 return bytes_written;
1080 detail::mask_inplace(
1081 buffers_prefix(clamp(impl.rd_remain),
1082 impl.rd_buf.data()), impl.rd_key);
1084 if(impl.rd_buf.size() > 0)
1086 // Copy from the read buffer.
1087 // The mask was already applied.
1088 auto const bytes_transferred = net::buffer_copy(
1089 buffers, impl.rd_buf.data(),
1090 clamp(impl.rd_remain));
1091 auto const mb = buffers_prefix(
1092 bytes_transferred, buffers);
1093 impl.rd_remain -= bytes_transferred;
1094 if(impl.rd_op == detail::opcode::text)
1096 if(! impl.rd_utf8.write(mb) ||
1097 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1098 ! impl.rd_utf8.finish()))
1100 // _Fail the WebSocket Connection_
1101 do_fail(close_code::bad_payload,
1102 error::bad_frame_payload, ec);
1103 return bytes_written;
1106 bytes_written += bytes_transferred;
1107 impl.rd_size += bytes_transferred;
1108 impl.rd_buf.consume(bytes_transferred);
1112 // Read into caller's buffer
1113 BOOST_ASSERT(impl.rd_remain > 0);
1114 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1115 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1116 clamp(impl.rd_remain), buffers)) > 0);
1117 auto const bytes_transferred =
1118 impl.stream().read_some(buffers_prefix(
1119 clamp(impl.rd_remain), buffers), ec);
1120 // VFALCO What if some bytes were written?
1121 if(impl.check_stop_now(ec))
1122 return bytes_written;
1123 BOOST_ASSERT(bytes_transferred > 0);
1124 auto const mb = buffers_prefix(
1125 bytes_transferred, buffers);
1126 impl.rd_remain -= bytes_transferred;
1128 detail::mask_inplace(mb, impl.rd_key);
1129 if(impl.rd_op == detail::opcode::text)
1131 if(! impl.rd_utf8.write(mb) ||
1132 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1133 ! impl.rd_utf8.finish()))
1135 // _Fail the WebSocket Connection_
1136 do_fail(close_code::bad_payload,
1137 error::bad_frame_payload, ec);
1138 return bytes_written;
1141 bytes_written += bytes_transferred;
1142 impl.rd_size += bytes_transferred;
1145 impl.rd_done = impl.rd_remain == 0 && impl.rd_fh.fin;
1149 // Read compressed message frame payload:
1150 // inflate even if rd_fh_.len == 0, otherwise we
1151 // never emit the end-of-stream deflate block.
1153 bool did_read = false;
1154 buffers_suffix<MutableBufferSequence> cb(buffers);
1155 while(buffer_bytes(cb) > 0)
1159 auto const out = beast::buffers_front(cb);
1160 zs.next_out = out.data();
1161 zs.avail_out = out.size();
1162 BOOST_ASSERT(zs.avail_out > 0);
1164 if(impl.rd_remain > 0)
1166 if(impl.rd_buf.size() > 0)
1169 auto const in = buffers_prefix(
1170 clamp(impl.rd_remain), beast::buffers_front(
1171 impl.rd_buf.data()));
1172 zs.avail_in = in.size();
1173 zs.next_in = in.data();
1178 auto const bytes_transferred =
1179 impl.stream().read_some(
1180 impl.rd_buf.prepare(read_size(
1181 impl.rd_buf, impl.rd_buf.max_size())),
1183 if(impl.check_stop_now(ec))
1184 return bytes_written;
1185 BOOST_ASSERT(bytes_transferred > 0);
1186 impl.rd_buf.commit(bytes_transferred);
1188 detail::mask_inplace(
1189 buffers_prefix(clamp(impl.rd_remain),
1190 impl.rd_buf.data()), impl.rd_key);
1191 auto const in = buffers_prefix(
1192 clamp(impl.rd_remain), buffers_front(
1193 impl.rd_buf.data()));
1194 zs.avail_in = in.size();
1195 zs.next_in = in.data();
1203 else if(impl.rd_fh.fin)
1205 // append the empty block codes
1206 static std::uint8_t constexpr
1208 0x00, 0x00, 0xff, 0xff };
1209 zs.next_in = empty_block;
1210 zs.avail_in = sizeof(empty_block);
1211 impl.inflate(zs, zlib::Flush::sync, ec);
1214 // https://github.com/madler/zlib/issues/280
1215 if(zs.total_out > 0)
1216 ec = error::partial_deflate_block;
1218 if(impl.check_stop_now(ec))
1219 return bytes_written;
1220 impl.do_context_takeover_read(impl.role);
1221 impl.rd_done = true;
1228 impl.inflate(zs, zlib::Flush::sync, ec);
1229 if(impl.check_stop_now(ec))
1230 return bytes_written;
1231 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1232 impl.rd_size, zs.total_out, impl.rd_msg_max))
1234 do_fail(close_code::too_big,
1235 error::message_too_big, ec);
1236 return bytes_written;
1238 cb.consume(zs.total_out);
1239 impl.rd_size += zs.total_out;
1240 impl.rd_remain -= zs.total_in;
1241 impl.rd_buf.consume(zs.total_in);
1242 bytes_written += zs.total_out;
1244 if(impl.rd_op == detail::opcode::text)
1247 if(! impl.rd_utf8.write(beast::buffers_prefix(
1248 bytes_written, buffers)) || (
1249 impl.rd_done && ! impl.rd_utf8.finish()))
1251 // _Fail the WebSocket Connection_
1252 do_fail(close_code::bad_payload,
1253 error::bad_frame_payload, ec);
1254 return bytes_written;
1258 return bytes_written;
1261 template<class NextLayer, bool deflateSupported>
1262 template<class MutableBufferSequence, class ReadHandler>
1263 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1264 stream<NextLayer, deflateSupported>::
1266 MutableBufferSequence const& buffers,
1267 ReadHandler&& handler)
1269 static_assert(is_async_stream<next_layer_type>::value,
1270 "AsyncStream type requirements not met");
1271 static_assert(net::is_mutable_buffer_sequence<
1272 MutableBufferSequence>::value,
1273 "MutableBufferSequence type requirements not met");
1274 return net::async_initiate<
1276 void(error_code, std::size_t)>(