]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/beast/websocket/impl/read.ipp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / read.ipp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
12
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/core/bind_handler.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/core/buffers_suffix.hpp>
17 #include <boost/beast/core/flat_static_buffer.hpp>
18 #include <boost/beast/core/type_traits.hpp>
19 #include <boost/beast/core/detail/clamp.hpp>
20 #include <boost/beast/core/detail/config.hpp>
21 #include <boost/asio/associated_allocator.hpp>
22 #include <boost/asio/associated_executor.hpp>
23 #include <boost/asio/coroutine.hpp>
24 #include <boost/asio/handler_continuation_hook.hpp>
25 #include <boost/asio/post.hpp>
26 #include <boost/assert.hpp>
27 #include <boost/config.hpp>
28 #include <boost/optional.hpp>
29 #include <boost/throw_exception.hpp>
30 #include <algorithm>
31 #include <limits>
32 #include <memory>
33
34 namespace boost {
35 namespace beast {
36 namespace websocket {
37
38 /* Read some message frame data.
39
40 Also reads and handles control frames.
41 */
42 template<class NextLayer>
43 template<
44 class MutableBufferSequence,
45 class Handler>
46 class stream<NextLayer>::read_some_op
47 : public boost::asio::coroutine
48 {
49 Handler h_;
50 stream<NextLayer>& ws_;
51 MutableBufferSequence bs_;
52 buffers_suffix<MutableBufferSequence> cb_;
53 std::size_t bytes_written_ = 0;
54 error_code ev_;
55 token tok_;
56 close_code code_;
57 bool did_read_ = false;
58 bool cont_ = false;
59
60 public:
61 read_some_op(read_some_op&&) = default;
62 read_some_op(read_some_op const&) = default;
63
64 template<class DeducedHandler>
65 read_some_op(
66 DeducedHandler&& h,
67 stream<NextLayer>& ws,
68 MutableBufferSequence const& bs)
69 : h_(std::forward<DeducedHandler>(h))
70 , ws_(ws)
71 , bs_(bs)
72 , cb_(bs)
73 , tok_(ws_.tok_.unique())
74 , code_(close_code::none)
75 {
76 }
77
78 using allocator_type =
79 boost::asio::associated_allocator_t<Handler>;
80
81 allocator_type
82 get_allocator() const noexcept
83 {
84 return boost::asio::get_associated_allocator(h_);
85 }
86
87 using executor_type = boost::asio::associated_executor_t<
88 Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
89
90 executor_type
91 get_executor() const noexcept
92 {
93 return boost::asio::get_associated_executor(
94 h_, ws_.get_executor());
95 }
96
97 Handler&
98 handler()
99 {
100 return h_;
101 }
102
103 void operator()(
104 error_code ec = {},
105 std::size_t bytes_transferred = 0,
106 bool cont = true);
107
108 friend
109 bool asio_handler_is_continuation(read_some_op* op)
110 {
111 using boost::asio::asio_handler_is_continuation;
112 return op->cont_ || asio_handler_is_continuation(
113 std::addressof(op->h_));
114 }
115 };
116
117 template<class NextLayer>
118 template<class MutableBufferSequence, class Handler>
119 void
120 stream<NextLayer>::
121 read_some_op<MutableBufferSequence, Handler>::
122 operator()(
123 error_code ec,
124 std::size_t bytes_transferred,
125 bool cont)
126 {
127 using beast::detail::clamp;
128 using boost::asio::buffer;
129 using boost::asio::buffer_size;
130 close_code code{};
131 cont_ = cont;
132 BOOST_ASIO_CORO_REENTER(*this)
133 {
134 // Maybe suspend
135 do_maybe_suspend:
136 if(! ws_.rd_block_)
137 {
138 // Acquire the read block
139 ws_.rd_block_ = tok_;
140
141 // Make sure the stream is not closed
142 if( ws_.status_ == status::closed ||
143 ws_.status_ == status::failed)
144 {
145 ec = boost::asio::error::operation_aborted;
146 goto upcall;
147 }
148 }
149 else
150 {
151 do_suspend:
152 // Suspend
153 BOOST_ASSERT(ws_.rd_block_ != tok_);
154 BOOST_ASIO_CORO_YIELD
155 ws_.paused_r_rd_.save(std::move(*this));
156
157 // Acquire the read block
158 BOOST_ASSERT(! ws_.rd_block_);
159 ws_.rd_block_ = tok_;
160
161 // Resume
162 BOOST_ASIO_CORO_YIELD
163 boost::asio::post(
164 ws_.get_executor(), std::move(*this));
165 BOOST_ASSERT(ws_.rd_block_ == tok_);
166
167 // The only way to get read blocked is if
168 // a `close_op` wrote a close frame
169 BOOST_ASSERT(ws_.wr_close_);
170 BOOST_ASSERT(ws_.status_ != status::open);
171 ec = boost::asio::error::operation_aborted;
172 goto upcall;
173 }
174
175 // if status_ == status::closing, we want to suspend
176 // the read operation until the close completes,
177 // then finish the read with operation_aborted.
178
179 loop:
180 BOOST_ASSERT(ws_.rd_block_ == tok_);
181 // See if we need to read a frame header. This
182 // condition is structured to give the decompressor
183 // a chance to emit the final empty deflate block
184 //
185 if(ws_.rd_remain_ == 0 &&
186 (! ws_.rd_fh_.fin || ws_.rd_done_))
187 {
188 // Read frame header
189 while(! ws_.parse_fh(
190 ws_.rd_fh_, ws_.rd_buf_, code))
191 {
192 if(code != close_code::none)
193 {
194 // _Fail the WebSocket Connection_
195 code_ = code;
196 ev_ = error::failed;
197 goto close;
198 }
199 BOOST_ASSERT(ws_.rd_block_ == tok_);
200 BOOST_ASIO_CORO_YIELD
201 ws_.stream_.async_read_some(
202 ws_.rd_buf_.prepare(read_size(
203 ws_.rd_buf_, ws_.rd_buf_.max_size())),
204 std::move(*this));
205 BOOST_ASSERT(ws_.rd_block_ == tok_);
206 if(! ws_.check_ok(ec))
207 goto upcall;
208 ws_.rd_buf_.commit(bytes_transferred);
209
210 // Allow a close operation
211 // to acquire the read block
212 BOOST_ASSERT(ws_.rd_block_ == tok_);
213 ws_.rd_block_.reset();
214 if( ws_.paused_r_close_.maybe_invoke())
215 {
216 // Suspend
217 BOOST_ASSERT(ws_.rd_block_);
218 goto do_suspend;
219 }
220 // Acquire read block
221 ws_.rd_block_ = tok_;
222 }
223 // Immediately apply the mask to the portion
224 // of the buffer holding payload data.
225 if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
226 detail::mask_inplace(buffers_prefix(
227 clamp(ws_.rd_fh_.len),
228 ws_.rd_buf_.data()),
229 ws_.rd_key_);
230 if(detail::is_control(ws_.rd_fh_.op))
231 {
232 // Clear this otherwise the next
233 // frame will be considered final.
234 ws_.rd_fh_.fin = false;
235
236 // Handle ping frame
237 if(ws_.rd_fh_.op == detail::opcode::ping)
238 {
239 {
240 auto const b = buffers_prefix(
241 clamp(ws_.rd_fh_.len),
242 ws_.rd_buf_.data());
243 auto const len = buffer_size(b);
244 BOOST_ASSERT(len == ws_.rd_fh_.len);
245 ping_data payload;
246 detail::read_ping(payload, b);
247 ws_.rd_buf_.consume(len);
248 // Ignore ping when closing
249 if(ws_.status_ == status::closing)
250 goto loop;
251 if(ws_.ctrl_cb_)
252 ws_.ctrl_cb_(frame_type::ping, payload);
253 ws_.rd_fb_.reset();
254 ws_.template write_ping<
255 flat_static_buffer_base>(ws_.rd_fb_,
256 detail::opcode::pong, payload);
257 }
258
259 //BOOST_ASSERT(! ws_.paused_r_close_);
260
261 // Allow a close operation
262 // to acquire the read block
263 BOOST_ASSERT(ws_.rd_block_ == tok_);
264 ws_.rd_block_.reset();
265 ws_.paused_r_close_.maybe_invoke();
266
267 // Maybe suspend
268 if(! ws_.wr_block_)
269 {
270 // Acquire the write block
271 ws_.wr_block_ = tok_;
272 }
273 else
274 {
275 // Suspend
276 BOOST_ASSERT(ws_.wr_block_ != tok_);
277 BOOST_ASIO_CORO_YIELD
278 ws_.paused_rd_.save(std::move(*this));
279
280 // Acquire the write block
281 BOOST_ASSERT(! ws_.wr_block_);
282 ws_.wr_block_ = tok_;
283
284 // Resume
285 BOOST_ASIO_CORO_YIELD
286 boost::asio::post(
287 ws_.get_executor(), std::move(*this));
288 BOOST_ASSERT(ws_.wr_block_ == tok_);
289
290 // Make sure the stream is open
291 if(! ws_.check_open(ec))
292 goto upcall;
293 }
294
295 // Send pong
296 BOOST_ASSERT(ws_.wr_block_ == tok_);
297 BOOST_ASIO_CORO_YIELD
298 boost::asio::async_write(ws_.stream_,
299 ws_.rd_fb_.data(), std::move(*this));
300 BOOST_ASSERT(ws_.wr_block_ == tok_);
301 if(! ws_.check_ok(ec))
302 goto upcall;
303 ws_.wr_block_.reset();
304 ws_.paused_close_.maybe_invoke() ||
305 ws_.paused_ping_.maybe_invoke() ||
306 ws_.paused_wr_.maybe_invoke();
307 goto do_maybe_suspend;
308 }
309 // Handle pong frame
310 if(ws_.rd_fh_.op == detail::opcode::pong)
311 {
312 auto const cb = buffers_prefix(clamp(
313 ws_.rd_fh_.len), ws_.rd_buf_.data());
314 auto const len = buffer_size(cb);
315 BOOST_ASSERT(len == ws_.rd_fh_.len);
316 code = close_code::none;
317 ping_data payload;
318 detail::read_ping(payload, cb);
319 ws_.rd_buf_.consume(len);
320 // Ignore pong when closing
321 if(! ws_.wr_close_ && ws_.ctrl_cb_)
322 ws_.ctrl_cb_(frame_type::pong, payload);
323 goto loop;
324 }
325 // Handle close frame
326 BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
327 {
328 auto const cb = buffers_prefix(clamp(
329 ws_.rd_fh_.len), ws_.rd_buf_.data());
330 auto const len = buffer_size(cb);
331 BOOST_ASSERT(len == ws_.rd_fh_.len);
332 BOOST_ASSERT(! ws_.rd_close_);
333 ws_.rd_close_ = true;
334 close_reason cr;
335 detail::read_close(cr, cb, code);
336 if(code != close_code::none)
337 {
338 // _Fail the WebSocket Connection_
339 code_ = code;
340 ev_ = error::failed;
341 goto close;
342 }
343 ws_.cr_ = cr;
344 ws_.rd_buf_.consume(len);
345 if(ws_.ctrl_cb_)
346 ws_.ctrl_cb_(frame_type::close,
347 ws_.cr_.reason);
348 // See if we are already closing
349 if(ws_.status_ == status::closing)
350 {
351 // _Close the WebSocket Connection_
352 BOOST_ASSERT(ws_.wr_close_);
353 code_ = close_code::none;
354 ev_ = error::closed;
355 goto close;
356 }
357 // _Start the WebSocket Closing Handshake_
358 code_ = cr.code == close_code::none ?
359 close_code::normal :
360 static_cast<close_code>(cr.code);
361 ev_ = error::closed;
362 goto close;
363 }
364 }
365 if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
366 {
367 // Empty non-final frame
368 goto loop;
369 }
370 ws_.rd_done_ = false;
371 }
372 if(! ws_.pmd_ || ! ws_.pmd_->rd_set)
373 {
374 if(ws_.rd_remain_ > 0)
375 {
376 if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
377 (std::min)(clamp(ws_.rd_remain_),
378 buffer_size(cb_)))
379 {
380 // Fill the read buffer first, otherwise we
381 // get fewer bytes at the cost of one I/O.
382 BOOST_ASIO_CORO_YIELD
383 ws_.stream_.async_read_some(
384 ws_.rd_buf_.prepare(read_size(
385 ws_.rd_buf_, ws_.rd_buf_.max_size())),
386 std::move(*this));
387 if(! ws_.check_ok(ec))
388 goto upcall;
389 ws_.rd_buf_.commit(bytes_transferred);
390 if(ws_.rd_fh_.mask)
391 detail::mask_inplace(buffers_prefix(clamp(
392 ws_.rd_remain_), ws_.rd_buf_.data()),
393 ws_.rd_key_);
394 }
395 if(ws_.rd_buf_.size() > 0)
396 {
397 // Copy from the read buffer.
398 // The mask was already applied.
399 bytes_transferred = buffer_copy(cb_,
400 ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
401 auto const mb = buffers_prefix(
402 bytes_transferred, cb_);
403 ws_.rd_remain_ -= bytes_transferred;
404 if(ws_.rd_op_ == detail::opcode::text)
405 {
406 if(! ws_.rd_utf8_.write(mb) ||
407 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
408 ! ws_.rd_utf8_.finish()))
409 {
410 // _Fail the WebSocket Connection_
411 code_ = close_code::bad_payload;
412 ev_ = error::failed;
413 goto close;
414 }
415 }
416 bytes_written_ += bytes_transferred;
417 ws_.rd_size_ += bytes_transferred;
418 ws_.rd_buf_.consume(bytes_transferred);
419 }
420 else
421 {
422 // Read into caller's buffer
423 BOOST_ASSERT(ws_.rd_remain_ > 0);
424 BOOST_ASSERT(buffer_size(cb_) > 0);
425 BOOST_ASSERT(buffer_size(buffers_prefix(
426 clamp(ws_.rd_remain_), cb_)) > 0);
427 BOOST_ASIO_CORO_YIELD
428 ws_.stream_.async_read_some(buffers_prefix(
429 clamp(ws_.rd_remain_), cb_), std::move(*this));
430 if(! ws_.check_ok(ec))
431 goto upcall;
432 BOOST_ASSERT(bytes_transferred > 0);
433 auto const mb = buffers_prefix(
434 bytes_transferred, cb_);
435 ws_.rd_remain_ -= bytes_transferred;
436 if(ws_.rd_fh_.mask)
437 detail::mask_inplace(mb, ws_.rd_key_);
438 if(ws_.rd_op_ == detail::opcode::text)
439 {
440 if(! ws_.rd_utf8_.write(mb) ||
441 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
442 ! ws_.rd_utf8_.finish()))
443 {
444 // _Fail the WebSocket Connection_
445 code_ = close_code::bad_payload;
446 ev_ = error::failed;
447 goto close;
448 }
449 }
450 bytes_written_ += bytes_transferred;
451 ws_.rd_size_ += bytes_transferred;
452 }
453 }
454 ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
455 }
456 else
457 {
458 // Read compressed message frame payload:
459 // inflate even if rd_fh_.len == 0, otherwise we
460 // never emit the end-of-stream deflate block.
461 while(buffer_size(cb_) > 0)
462 {
463 if( ws_.rd_remain_ > 0 &&
464 ws_.rd_buf_.size() == 0 &&
465 ! did_read_)
466 {
467 // read new
468 BOOST_ASIO_CORO_YIELD
469 ws_.stream_.async_read_some(
470 ws_.rd_buf_.prepare(read_size(
471 ws_.rd_buf_, ws_.rd_buf_.max_size())),
472 std::move(*this));
473 if(! ws_.check_ok(ec))
474 goto upcall;
475 BOOST_ASSERT(bytes_transferred > 0);
476 ws_.rd_buf_.commit(bytes_transferred);
477 if(ws_.rd_fh_.mask)
478 detail::mask_inplace(
479 buffers_prefix(clamp(ws_.rd_remain_),
480 ws_.rd_buf_.data()), ws_.rd_key_);
481 did_read_ = true;
482 }
483 zlib::z_params zs;
484 {
485 auto const out = buffers_front(cb_);
486 zs.next_out = out.data();
487 zs.avail_out = out.size();
488 BOOST_ASSERT(zs.avail_out > 0);
489 }
490 if(ws_.rd_remain_ > 0)
491 {
492 if(ws_.rd_buf_.size() > 0)
493 {
494 // use what's there
495 auto const in = buffers_prefix(
496 clamp(ws_.rd_remain_), buffers_front(
497 ws_.rd_buf_.data()));
498 zs.avail_in = in.size();
499 zs.next_in = in.data();
500 }
501 else
502 {
503 break;
504 }
505 }
506 else if(ws_.rd_fh_.fin)
507 {
508 // append the empty block codes
509 static std::uint8_t constexpr
510 empty_block[4] = {
511 0x00, 0x00, 0xff, 0xff };
512 zs.next_in = empty_block;
513 zs.avail_in = sizeof(empty_block);
514 ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
515 if(! ec)
516 {
517 // https://github.com/madler/zlib/issues/280
518 if(zs.total_out > 0)
519 ec = error::partial_deflate_block;
520 }
521 if(! ws_.check_ok(ec))
522 goto upcall;
523 if(
524 (ws_.role_ == role_type::client &&
525 ws_.pmd_config_.server_no_context_takeover) ||
526 (ws_.role_ == role_type::server &&
527 ws_.pmd_config_.client_no_context_takeover))
528 ws_.pmd_->zi.reset();
529 ws_.rd_done_ = true;
530 break;
531 }
532 else
533 {
534 break;
535 }
536 ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
537 if(! ws_.check_ok(ec))
538 goto upcall;
539 if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
540 ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
541 {
542 // _Fail the WebSocket Connection_
543 code_ = close_code::too_big;
544 ev_ = error::failed;
545 goto close;
546 }
547 cb_.consume(zs.total_out);
548 ws_.rd_size_ += zs.total_out;
549 ws_.rd_remain_ -= zs.total_in;
550 ws_.rd_buf_.consume(zs.total_in);
551 bytes_written_ += zs.total_out;
552 }
553 if(ws_.rd_op_ == detail::opcode::text)
554 {
555 // check utf8
556 if(! ws_.rd_utf8_.write(
557 buffers_prefix(bytes_written_, bs_)) || (
558 ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
559 {
560 // _Fail the WebSocket Connection_
561 code_ = close_code::bad_payload;
562 ev_ = error::failed;
563 goto close;
564 }
565 }
566 }
567 goto upcall;
568
569 close:
570 if(! ws_.wr_block_)
571 {
572 // Acquire the write block
573 ws_.wr_block_ = tok_;
574
575 // Make sure the stream is open
576 BOOST_ASSERT(ws_.status_ == status::open);
577 }
578 else
579 {
580 // Suspend
581 BOOST_ASSERT(ws_.wr_block_ != tok_);
582 BOOST_ASIO_CORO_YIELD
583 ws_.paused_rd_.save(std::move(*this));
584
585 // Acquire the write block
586 BOOST_ASSERT(! ws_.wr_block_);
587 ws_.wr_block_ = tok_;
588
589 // Resume
590 BOOST_ASIO_CORO_YIELD
591 boost::asio::post(
592 ws_.get_executor(), std::move(*this));
593 BOOST_ASSERT(ws_.wr_block_ == tok_);
594
595 // Make sure the stream is open
596 if(! ws_.check_open(ec))
597 goto upcall;
598 }
599
600 // Set the status
601 ws_.status_ = status::closing;
602
603 if(! ws_.wr_close_)
604 {
605 ws_.wr_close_ = true;
606
607 // Serialize close frame
608 ws_.rd_fb_.reset();
609 ws_.template write_close<
610 flat_static_buffer_base>(
611 ws_.rd_fb_, code_);
612
613 // Send close frame
614 BOOST_ASSERT(ws_.wr_block_ == tok_);
615 BOOST_ASIO_CORO_YIELD
616 boost::asio::async_write(
617 ws_.stream_, ws_.rd_fb_.data(),
618 std::move(*this));
619 BOOST_ASSERT(ws_.wr_block_ == tok_);
620 if(! ws_.check_ok(ec))
621 goto upcall;
622 }
623
624 // Teardown
625 using beast::websocket::async_teardown;
626 BOOST_ASSERT(ws_.wr_block_ == tok_);
627 BOOST_ASIO_CORO_YIELD
628 async_teardown(ws_.role_,
629 ws_.stream_, std::move(*this));
630 BOOST_ASSERT(ws_.wr_block_ == tok_);
631 if(ec == boost::asio::error::eof)
632 {
633 // Rationale:
634 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
635 ec.assign(0, ec.category());
636 }
637 if(! ec)
638 ec = ev_;
639 if(ec && ec != error::closed)
640 ws_.status_ = status::failed;
641 else
642 ws_.status_ = status::closed;
643 ws_.close();
644
645 upcall:
646 if(ws_.rd_block_ == tok_)
647 ws_.rd_block_.reset();
648 ws_.paused_r_close_.maybe_invoke();
649 if(ws_.wr_block_ == tok_)
650 {
651 ws_.wr_block_.reset();
652 ws_.paused_close_.maybe_invoke() ||
653 ws_.paused_ping_.maybe_invoke() ||
654 ws_.paused_wr_.maybe_invoke();
655 }
656 if(! cont_)
657 return boost::asio::post(
658 ws_.stream_.get_executor(),
659 bind_handler(std::move(h_),
660 ec, bytes_written_));
661 h_(ec, bytes_written_);
662 }
663 }
664
665 //------------------------------------------------------------------------------
666
667 template<class NextLayer>
668 template<
669 class DynamicBuffer,
670 class Handler>
671 class stream<NextLayer>::read_op
672 : public boost::asio::coroutine
673 {
674 Handler h_;
675 stream<NextLayer>& ws_;
676 DynamicBuffer& b_;
677 std::size_t limit_;
678 std::size_t bytes_written_ = 0;
679 bool some_;
680
681 public:
682 using allocator_type =
683 boost::asio::associated_allocator_t<Handler>;
684
685 read_op(read_op&&) = default;
686 read_op(read_op const&) = default;
687
688 template<class DeducedHandler>
689 read_op(
690 DeducedHandler&& h,
691 stream<NextLayer>& ws,
692 DynamicBuffer& b,
693 std::size_t limit,
694 bool some)
695 : h_(std::forward<DeducedHandler>(h))
696 , ws_(ws)
697 , b_(b)
698 , limit_(limit ? limit : (
699 std::numeric_limits<std::size_t>::max)())
700 , some_(some)
701 {
702 }
703
704 allocator_type
705 get_allocator() const noexcept
706 {
707 return boost::asio::get_associated_allocator(h_);
708 }
709
710 using executor_type = boost::asio::associated_executor_t<
711 Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
712
713 executor_type
714 get_executor() const noexcept
715 {
716 return boost::asio::get_associated_executor(
717 h_, ws_.get_executor());
718 }
719
720 void operator()(
721 error_code ec = {},
722 std::size_t bytes_transferred = 0);
723
724 friend
725 bool asio_handler_is_continuation(read_op* op)
726 {
727 using boost::asio::asio_handler_is_continuation;
728 return asio_handler_is_continuation(
729 std::addressof(op->h_));
730 }
731 };
732
733 template<class NextLayer>
734 template<class DynamicBuffer, class Handler>
735 void
736 stream<NextLayer>::
737 read_op<DynamicBuffer, Handler>::
738 operator()(
739 error_code ec,
740 std::size_t bytes_transferred)
741 {
742 using beast::detail::clamp;
743 using buffers_type = typename
744 DynamicBuffer::mutable_buffers_type;
745 boost::optional<buffers_type> mb;
746 BOOST_ASIO_CORO_REENTER(*this)
747 {
748 do
749 {
750 try
751 {
752 mb.emplace(b_.prepare(clamp(
753 ws_.read_size_hint(b_), limit_)));
754 }
755 catch(std::length_error const&)
756 {
757 ec = error::buffer_overflow;
758 }
759 if(ec)
760 {
761 BOOST_ASIO_CORO_YIELD
762 boost::asio::post(
763 ws_.get_executor(),
764 bind_handler(std::move(*this),
765 error::buffer_overflow, 0));
766 break;
767 }
768 BOOST_ASIO_CORO_YIELD
769 read_some_op<buffers_type, read_op>{
770 std::move(*this), ws_, *mb}(
771 {}, 0, false);
772 if(ec)
773 break;
774 b_.commit(bytes_transferred);
775 bytes_written_ += bytes_transferred;
776 }
777 while(! some_ && ! ws_.is_message_done());
778 h_(ec, bytes_written_);
779 }
780 }
781
782 //------------------------------------------------------------------------------
783
784 template<class NextLayer>
785 template<class DynamicBuffer>
786 std::size_t
787 stream<NextLayer>::
788 read(DynamicBuffer& buffer)
789 {
790 static_assert(is_sync_stream<next_layer_type>::value,
791 "SyncStream requirements not met");
792 static_assert(
793 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
794 "DynamicBuffer requirements not met");
795 error_code ec;
796 auto const bytes_written = read(buffer, ec);
797 if(ec)
798 BOOST_THROW_EXCEPTION(system_error{ec});
799 return bytes_written;
800 }
801
802 template<class NextLayer>
803 template<class DynamicBuffer>
804 std::size_t
805 stream<NextLayer>::
806 read(DynamicBuffer& buffer, error_code& ec)
807 {
808 static_assert(is_sync_stream<next_layer_type>::value,
809 "SyncStream requirements not met");
810 static_assert(
811 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
812 "DynamicBuffer requirements not met");
813 std::size_t bytes_written = 0;
814 do
815 {
816 bytes_written += read_some(buffer, 0, ec);
817 if(ec)
818 return bytes_written;
819 }
820 while(! is_message_done());
821 return bytes_written;
822 }
823
824 template<class NextLayer>
825 template<class DynamicBuffer, class ReadHandler>
826 BOOST_ASIO_INITFN_RESULT_TYPE(
827 ReadHandler, void(error_code, std::size_t))
828 stream<NextLayer>::
829 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
830 {
831 static_assert(is_async_stream<next_layer_type>::value,
832 "AsyncStream requirements requirements not met");
833 static_assert(
834 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
835 "DynamicBuffer requirements not met");
836 boost::asio::async_completion<
837 ReadHandler, void(error_code, std::size_t)> init{handler};
838 read_op<
839 DynamicBuffer,
840 BOOST_ASIO_HANDLER_TYPE(
841 ReadHandler, void(error_code, std::size_t))>{
842 init.completion_handler,
843 *this,
844 buffer,
845 0,
846 false}();
847 return init.result.get();
848 }
849
850 //------------------------------------------------------------------------------
851
852 template<class NextLayer>
853 template<class DynamicBuffer>
854 std::size_t
855 stream<NextLayer>::
856 read_some(
857 DynamicBuffer& buffer,
858 std::size_t limit)
859 {
860 static_assert(is_sync_stream<next_layer_type>::value,
861 "SyncStream requirements not met");
862 static_assert(
863 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
864 "DynamicBuffer requirements not met");
865 error_code ec;
866 auto const bytes_written =
867 read_some(buffer, limit, ec);
868 if(ec)
869 BOOST_THROW_EXCEPTION(system_error{ec});
870 return bytes_written;
871 }
872
873 template<class NextLayer>
874 template<class DynamicBuffer>
875 std::size_t
876 stream<NextLayer>::
877 read_some(
878 DynamicBuffer& buffer,
879 std::size_t limit,
880 error_code& ec)
881 {
882 static_assert(is_sync_stream<next_layer_type>::value,
883 "SyncStream requirements not met");
884 static_assert(
885 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
886 "DynamicBuffer requirements not met");
887 using beast::detail::clamp;
888 if(! limit)
889 limit = (std::numeric_limits<std::size_t>::max)();
890 auto const size =
891 clamp(read_size_hint(buffer), limit);
892 BOOST_ASSERT(size > 0);
893 boost::optional<typename
894 DynamicBuffer::mutable_buffers_type> mb;
895 try
896 {
897 mb.emplace(buffer.prepare(size));
898 }
899 catch(std::length_error const&)
900 {
901 ec = error::buffer_overflow;
902 return 0;
903 }
904 auto const bytes_written = read_some(*mb, ec);
905 buffer.commit(bytes_written);
906 return bytes_written;
907 }
908
909 template<class NextLayer>
910 template<class DynamicBuffer, class ReadHandler>
911 BOOST_ASIO_INITFN_RESULT_TYPE(
912 ReadHandler, void(error_code, std::size_t))
913 stream<NextLayer>::
914 async_read_some(
915 DynamicBuffer& buffer,
916 std::size_t limit,
917 ReadHandler&& handler)
918 {
919 static_assert(is_async_stream<next_layer_type>::value,
920 "AsyncStream requirements requirements not met");
921 static_assert(
922 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
923 "DynamicBuffer requirements not met");
924 boost::asio::async_completion<ReadHandler,
925 void(error_code, std::size_t)> init{handler};
926 read_op<
927 DynamicBuffer,
928 BOOST_ASIO_HANDLER_TYPE(
929 ReadHandler, void(error_code, std::size_t))>{
930 init.completion_handler,
931 *this,
932 buffer,
933 limit,
934 true}({}, 0);
935 return init.result.get();
936 }
937
938 //------------------------------------------------------------------------------
939
940 template<class NextLayer>
941 template<class MutableBufferSequence>
942 std::size_t
943 stream<NextLayer>::
944 read_some(
945 MutableBufferSequence const& buffers)
946 {
947 static_assert(is_sync_stream<next_layer_type>::value,
948 "SyncStream requirements not met");
949 static_assert(boost::asio::is_mutable_buffer_sequence<
950 MutableBufferSequence>::value,
951 "MutableBufferSequence requirements not met");
952 error_code ec;
953 auto const bytes_written = read_some(buffers, ec);
954 if(ec)
955 BOOST_THROW_EXCEPTION(system_error{ec});
956 return bytes_written;
957 }
958
959 template<class NextLayer>
960 template<class MutableBufferSequence>
961 std::size_t
962 stream<NextLayer>::
963 read_some(
964 MutableBufferSequence const& buffers,
965 error_code& ec)
966 {
967 static_assert(is_sync_stream<next_layer_type>::value,
968 "SyncStream requirements not met");
969 static_assert(boost::asio::is_mutable_buffer_sequence<
970 MutableBufferSequence>::value,
971 "MutableBufferSequence requirements not met");
972 using beast::detail::clamp;
973 using boost::asio::buffer;
974 using boost::asio::buffer_size;
975 close_code code{};
976 std::size_t bytes_written = 0;
977 ec.assign(0, ec.category());
978 // Make sure the stream is open
979 if(! check_open(ec))
980 return 0;
981 loop:
982 // See if we need to read a frame header. This
983 // condition is structured to give the decompressor
984 // a chance to emit the final empty deflate block
985 //
986 if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
987 {
988 // Read frame header
989 while(! parse_fh(rd_fh_, rd_buf_, code))
990 {
991 if(code != close_code::none)
992 {
993 // _Fail the WebSocket Connection_
994 do_fail(code, error::failed, ec);
995 return bytes_written;
996 }
997 auto const bytes_transferred =
998 stream_.read_some(
999 rd_buf_.prepare(read_size(
1000 rd_buf_, rd_buf_.max_size())),
1001 ec);
1002 if(! check_ok(ec))
1003 return bytes_written;
1004 rd_buf_.commit(bytes_transferred);
1005 }
1006 // Immediately apply the mask to the portion
1007 // of the buffer holding payload data.
1008 if(rd_fh_.len > 0 && rd_fh_.mask)
1009 detail::mask_inplace(buffers_prefix(
1010 clamp(rd_fh_.len), rd_buf_.data()),
1011 rd_key_);
1012 if(detail::is_control(rd_fh_.op))
1013 {
1014 // Get control frame payload
1015 auto const b = buffers_prefix(
1016 clamp(rd_fh_.len), rd_buf_.data());
1017 auto const len = buffer_size(b);
1018 BOOST_ASSERT(len == rd_fh_.len);
1019
1020 // Clear this otherwise the next
1021 // frame will be considered final.
1022 rd_fh_.fin = false;
1023
1024 // Handle ping frame
1025 if(rd_fh_.op == detail::opcode::ping)
1026 {
1027 ping_data payload;
1028 detail::read_ping(payload, b);
1029 rd_buf_.consume(len);
1030 if(wr_close_)
1031 {
1032 // Ignore ping when closing
1033 goto loop;
1034 }
1035 if(ctrl_cb_)
1036 ctrl_cb_(frame_type::ping, payload);
1037 detail::frame_buffer fb;
1038 write_ping<flat_static_buffer_base>(fb,
1039 detail::opcode::pong, payload);
1040 boost::asio::write(stream_, fb.data(), ec);
1041 if(! check_ok(ec))
1042 return bytes_written;
1043 goto loop;
1044 }
1045 // Handle pong frame
1046 if(rd_fh_.op == detail::opcode::pong)
1047 {
1048 ping_data payload;
1049 detail::read_ping(payload, b);
1050 rd_buf_.consume(len);
1051 if(ctrl_cb_)
1052 ctrl_cb_(frame_type::pong, payload);
1053 goto loop;
1054 }
1055 // Handle close frame
1056 BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
1057 {
1058 BOOST_ASSERT(! rd_close_);
1059 rd_close_ = true;
1060 close_reason cr;
1061 detail::read_close(cr, b, code);
1062 if(code != close_code::none)
1063 {
1064 // _Fail the WebSocket Connection_
1065 do_fail(code, error::failed, ec);
1066 return bytes_written;
1067 }
1068 cr_ = cr;
1069 rd_buf_.consume(len);
1070 if(ctrl_cb_)
1071 ctrl_cb_(frame_type::close, cr_.reason);
1072 BOOST_ASSERT(! wr_close_);
1073 // _Start the WebSocket Closing Handshake_
1074 do_fail(
1075 cr.code == close_code::none ?
1076 close_code::normal :
1077 static_cast<close_code>(cr.code),
1078 error::closed, ec);
1079 return bytes_written;
1080 }
1081 }
1082 if(rd_fh_.len == 0 && ! rd_fh_.fin)
1083 {
1084 // Empty non-final frame
1085 goto loop;
1086 }
1087 rd_done_ = false;
1088 }
1089 else
1090 {
1091 ec.assign(0, ec.category());
1092 }
1093 if(! pmd_ || ! pmd_->rd_set)
1094 {
1095 if(rd_remain_ > 0)
1096 {
1097 if(rd_buf_.size() == 0 && rd_buf_.max_size() >
1098 (std::min)(clamp(rd_remain_),
1099 buffer_size(buffers)))
1100 {
1101 // Fill the read buffer first, otherwise we
1102 // get fewer bytes at the cost of one I/O.
1103 rd_buf_.commit(stream_.read_some(
1104 rd_buf_.prepare(read_size(rd_buf_,
1105 rd_buf_.max_size())), ec));
1106 if(! check_ok(ec))
1107 return bytes_written;
1108 if(rd_fh_.mask)
1109 detail::mask_inplace(
1110 buffers_prefix(clamp(rd_remain_),
1111 rd_buf_.data()), rd_key_);
1112 }
1113 if(rd_buf_.size() > 0)
1114 {
1115 // Copy from the read buffer.
1116 // The mask was already applied.
1117 auto const bytes_transferred =
1118 buffer_copy(buffers, rd_buf_.data(),
1119 clamp(rd_remain_));
1120 auto const mb = buffers_prefix(
1121 bytes_transferred, buffers);
1122 rd_remain_ -= bytes_transferred;
1123 if(rd_op_ == detail::opcode::text)
1124 {
1125 if(! rd_utf8_.write(mb) ||
1126 (rd_remain_ == 0 && rd_fh_.fin &&
1127 ! rd_utf8_.finish()))
1128 {
1129 // _Fail the WebSocket Connection_
1130 do_fail(
1131 close_code::bad_payload,
1132 error::failed,
1133 ec);
1134 return bytes_written;
1135 }
1136 }
1137 bytes_written += bytes_transferred;
1138 rd_size_ += bytes_transferred;
1139 rd_buf_.consume(bytes_transferred);
1140 }
1141 else
1142 {
1143 // Read into caller's buffer
1144 BOOST_ASSERT(rd_remain_ > 0);
1145 BOOST_ASSERT(buffer_size(buffers) > 0);
1146 BOOST_ASSERT(buffer_size(buffers_prefix(
1147 clamp(rd_remain_), buffers)) > 0);
1148 auto const bytes_transferred =
1149 stream_.read_some(buffers_prefix(
1150 clamp(rd_remain_), buffers), ec);
1151 if(! check_ok(ec))
1152 return bytes_written;
1153 BOOST_ASSERT(bytes_transferred > 0);
1154 auto const mb = buffers_prefix(
1155 bytes_transferred, buffers);
1156 rd_remain_ -= bytes_transferred;
1157 if(rd_fh_.mask)
1158 detail::mask_inplace(mb, rd_key_);
1159 if(rd_op_ == detail::opcode::text)
1160 {
1161 if(! rd_utf8_.write(mb) ||
1162 (rd_remain_ == 0 && rd_fh_.fin &&
1163 ! rd_utf8_.finish()))
1164 {
1165 // _Fail the WebSocket Connection_
1166 do_fail(close_code::bad_payload,
1167 error::failed, ec);
1168 return bytes_written;
1169 }
1170 }
1171 bytes_written += bytes_transferred;
1172 rd_size_ += bytes_transferred;
1173 }
1174 }
1175 rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
1176 }
1177 else
1178 {
1179 // Read compressed message frame payload:
1180 // inflate even if rd_fh_.len == 0, otherwise we
1181 // never emit the end-of-stream deflate block.
1182 //
1183 bool did_read = false;
1184 buffers_suffix<MutableBufferSequence> cb{buffers};
1185 while(buffer_size(cb) > 0)
1186 {
1187 zlib::z_params zs;
1188 {
1189 auto const out = buffers_front(cb);
1190 zs.next_out = out.data();
1191 zs.avail_out = out.size();
1192 BOOST_ASSERT(zs.avail_out > 0);
1193 }
1194 if(rd_remain_ > 0)
1195 {
1196 if(rd_buf_.size() > 0)
1197 {
1198 // use what's there
1199 auto const in = buffers_prefix(
1200 clamp(rd_remain_), buffers_front(
1201 rd_buf_.data()));
1202 zs.avail_in = in.size();
1203 zs.next_in = in.data();
1204 }
1205 else if(! did_read)
1206 {
1207 // read new
1208 auto const bytes_transferred =
1209 stream_.read_some(
1210 rd_buf_.prepare(read_size(
1211 rd_buf_, rd_buf_.max_size())),
1212 ec);
1213 if(! check_ok(ec))
1214 return bytes_written;
1215 BOOST_ASSERT(bytes_transferred > 0);
1216 rd_buf_.commit(bytes_transferred);
1217 if(rd_fh_.mask)
1218 detail::mask_inplace(
1219 buffers_prefix(clamp(rd_remain_),
1220 rd_buf_.data()), rd_key_);
1221 auto const in = buffers_prefix(
1222 clamp(rd_remain_), buffers_front(
1223 rd_buf_.data()));
1224 zs.avail_in = in.size();
1225 zs.next_in = in.data();
1226 did_read = true;
1227 }
1228 else
1229 {
1230 break;
1231 }
1232 }
1233 else if(rd_fh_.fin)
1234 {
1235 // append the empty block codes
1236 static std::uint8_t constexpr
1237 empty_block[4] = {
1238 0x00, 0x00, 0xff, 0xff };
1239 zs.next_in = empty_block;
1240 zs.avail_in = sizeof(empty_block);
1241 pmd_->zi.write(zs, zlib::Flush::sync, ec);
1242 if(! ec)
1243 {
1244 // https://github.com/madler/zlib/issues/280
1245 if(zs.total_out > 0)
1246 ec = error::partial_deflate_block;
1247 }
1248 if(! check_ok(ec))
1249 return bytes_written;
1250 if(
1251 (role_ == role_type::client &&
1252 pmd_config_.server_no_context_takeover) ||
1253 (role_ == role_type::server &&
1254 pmd_config_.client_no_context_takeover))
1255 pmd_->zi.reset();
1256 rd_done_ = true;
1257 break;
1258 }
1259 else
1260 {
1261 break;
1262 }
1263 pmd_->zi.write(zs, zlib::Flush::sync, ec);
1264 if(! check_ok(ec))
1265 return bytes_written;
1266 if(rd_msg_max_ && beast::detail::sum_exceeds(
1267 rd_size_, zs.total_out, rd_msg_max_))
1268 {
1269 do_fail(close_code::too_big,
1270 error::failed, ec);
1271 return bytes_written;
1272 }
1273 cb.consume(zs.total_out);
1274 rd_size_ += zs.total_out;
1275 rd_remain_ -= zs.total_in;
1276 rd_buf_.consume(zs.total_in);
1277 bytes_written += zs.total_out;
1278 }
1279 if(rd_op_ == detail::opcode::text)
1280 {
1281 // check utf8
1282 if(! rd_utf8_.write(
1283 buffers_prefix(bytes_written, buffers)) || (
1284 rd_done_ && ! rd_utf8_.finish()))
1285 {
1286 // _Fail the WebSocket Connection_
1287 do_fail(close_code::bad_payload,
1288 error::failed, ec);
1289 return bytes_written;
1290 }
1291 }
1292 }
1293 return bytes_written;
1294 }
1295
1296 template<class NextLayer>
1297 template<class MutableBufferSequence, class ReadHandler>
1298 BOOST_ASIO_INITFN_RESULT_TYPE(
1299 ReadHandler, void(error_code, std::size_t))
1300 stream<NextLayer>::
1301 async_read_some(
1302 MutableBufferSequence const& buffers,
1303 ReadHandler&& handler)
1304 {
1305 static_assert(is_async_stream<next_layer_type>::value,
1306 "AsyncStream requirements requirements not met");
1307 static_assert(boost::asio::is_mutable_buffer_sequence<
1308 MutableBufferSequence>::value,
1309 "MutableBufferSequence requirements not met");
1310 boost::asio::async_completion<ReadHandler,
1311 void(error_code, std::size_t)> init{handler};
1312 read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
1313 ReadHandler, void(error_code, std::size_t))>{
1314 init.completion_handler,*this, buffers}(
1315 {}, 0, false);
1316 return init.result.get();
1317 }
1318
1319 } // websocket
1320 } // beast
1321 } // boost
1322
1323 #endif