]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/beast/websocket/impl/read.ipp
Add patch for failing prerm scripts
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / read.ipp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
12
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/core/bind_handler.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/core/buffers_suffix.hpp>
17 #include <boost/beast/core/flat_static_buffer.hpp>
18 #include <boost/beast/core/type_traits.hpp>
19 #include <boost/beast/core/detail/clamp.hpp>
20 #include <boost/beast/core/detail/config.hpp>
21 #include <boost/asio/associated_allocator.hpp>
22 #include <boost/asio/associated_executor.hpp>
23 #include <boost/asio/coroutine.hpp>
24 #include <boost/asio/handler_continuation_hook.hpp>
25 #include <boost/asio/handler_invoke_hook.hpp>
26 #include <boost/asio/post.hpp>
27 #include <boost/assert.hpp>
28 #include <boost/config.hpp>
29 #include <boost/optional.hpp>
30 #include <boost/throw_exception.hpp>
31 #include <algorithm>
32 #include <limits>
33 #include <memory>
34
35 namespace boost {
36 namespace beast {
37 namespace websocket {
38
39 namespace detail {
40
41 template<>
42 inline
43 void
44 stream_base<true>::
45 inflate(
46 zlib::z_params& zs,
47 zlib::Flush flush,
48 error_code& ec)
49 {
50 this->pmd_->zi.write(zs, flush, ec);
51 }
52
53 template<>
54 inline
55 void
56 stream_base<true>::
57 do_context_takeover_read(role_type role)
58 {
59 if((role == role_type::client &&
60 pmd_config_.server_no_context_takeover) ||
61 (role == role_type::server &&
62 pmd_config_.client_no_context_takeover))
63 {
64 pmd_->zi.reset();
65 }
66 }
67
68 } // detail
69
70 //------------------------------------------------------------------------------
71
72 /* Read some message frame data.
73
74 Also reads and handles control frames.
75 */
76 template<class NextLayer, bool deflateSupported>
77 template<
78 class MutableBufferSequence,
79 class Handler>
80 class stream<NextLayer, deflateSupported>::read_some_op
81 : public boost::asio::coroutine
82 {
83 Handler h_;
84 stream<NextLayer, deflateSupported>& ws_;
85 MutableBufferSequence bs_;
86 buffers_suffix<MutableBufferSequence> cb_;
87 std::size_t bytes_written_ = 0;
88 error_code result_;
89 close_code code_;
90 bool did_read_ = false;
91 bool cont_ = false;
92
93 public:
94 static constexpr int id = 1; // for soft_mutex
95
96 read_some_op(read_some_op&&) = default;
97 read_some_op(read_some_op const&) = delete;
98
99 template<class DeducedHandler>
100 read_some_op(
101 DeducedHandler&& h,
102 stream<NextLayer, deflateSupported>& ws,
103 MutableBufferSequence const& bs)
104 : h_(std::forward<DeducedHandler>(h))
105 , ws_(ws)
106 , bs_(bs)
107 , cb_(bs)
108 , code_(close_code::none)
109 {
110 }
111
112 using allocator_type =
113 boost::asio::associated_allocator_t<Handler>;
114
115 allocator_type
116 get_allocator() const noexcept
117 {
118 return (boost::asio::get_associated_allocator)(h_);
119 }
120
121 using executor_type = boost::asio::associated_executor_t<
122 Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
123
124 executor_type
125 get_executor() const noexcept
126 {
127 return (boost::asio::get_associated_executor)(
128 h_, ws_.get_executor());
129 }
130
131 Handler&
132 handler()
133 {
134 return h_;
135 }
136
137 void operator()(
138 error_code ec = {},
139 std::size_t bytes_transferred = 0,
140 bool cont = true);
141
142 friend
143 bool asio_handler_is_continuation(read_some_op* op)
144 {
145 using boost::asio::asio_handler_is_continuation;
146 return op->cont_ || asio_handler_is_continuation(
147 std::addressof(op->h_));
148 }
149
150 template<class Function>
151 friend
152 void asio_handler_invoke(Function&& f, read_some_op* op)
153 {
154 using boost::asio::asio_handler_invoke;
155 asio_handler_invoke(f, std::addressof(op->h_));
156 }
157 };
158
159 template<class NextLayer, bool deflateSupported>
160 template<class MutableBufferSequence, class Handler>
161 void
162 stream<NextLayer, deflateSupported>::
163 read_some_op<MutableBufferSequence, Handler>::
164 operator()(
165 error_code ec,
166 std::size_t bytes_transferred,
167 bool cont)
168 {
169 using beast::detail::clamp;
170 using boost::asio::buffer;
171 using boost::asio::buffer_size;
172 cont_ = cont;
173 BOOST_ASIO_CORO_REENTER(*this)
174 {
175 // Maybe suspend
176 do_maybe_suspend:
177 if(ws_.rd_block_.try_lock(this))
178 {
179 // Make sure the stream is not closed
180 if( ws_.status_ == status::closed ||
181 ws_.status_ == status::failed)
182 {
183 ec = boost::asio::error::operation_aborted;
184 goto upcall;
185 }
186 }
187 else
188 {
189 do_suspend:
190 // Suspend
191 BOOST_ASIO_CORO_YIELD
192 ws_.paused_r_rd_.emplace(std::move(*this));
193
194 // Acquire the read block
195 ws_.rd_block_.lock(this);
196
197 // Resume
198 BOOST_ASIO_CORO_YIELD
199 boost::asio::post(
200 ws_.get_executor(), std::move(*this));
201 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
202
203 // The only way to get read blocked is if
204 // a `close_op` wrote a close frame
205 BOOST_ASSERT(ws_.wr_close_);
206 BOOST_ASSERT(ws_.status_ != status::open);
207 ec = boost::asio::error::operation_aborted;
208 goto upcall;
209 }
210
211 // if status_ == status::closing, we want to suspend
212 // the read operation until the close completes,
213 // then finish the read with operation_aborted.
214
215 loop:
216 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
217 // See if we need to read a frame header. This
218 // condition is structured to give the decompressor
219 // a chance to emit the final empty deflate block
220 //
221 if(ws_.rd_remain_ == 0 &&
222 (! ws_.rd_fh_.fin || ws_.rd_done_))
223 {
224 // Read frame header
225 while(! ws_.parse_fh(
226 ws_.rd_fh_, ws_.rd_buf_, result_))
227 {
228 if(result_)
229 {
230 // _Fail the WebSocket Connection_
231 if(result_ == error::message_too_big)
232 code_ = close_code::too_big;
233 else
234 code_ = close_code::protocol_error;
235 goto close;
236 }
237 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
238 BOOST_ASIO_CORO_YIELD
239 ws_.stream_.async_read_some(
240 ws_.rd_buf_.prepare(read_size(
241 ws_.rd_buf_, ws_.rd_buf_.max_size())),
242 std::move(*this));
243 BOOST_ASSERT(ws_.rd_block_.is_locked(this));
244 if(! ws_.check_ok(ec))
245 goto upcall;
246 ws_.rd_buf_.commit(bytes_transferred);
247
248 // Allow a close operation
249 // to acquire the read block
250 ws_.rd_block_.unlock(this);
251 if( ws_.paused_r_close_.maybe_invoke())
252 {
253 // Suspend
254 BOOST_ASSERT(ws_.rd_block_.is_locked());
255 goto do_suspend;
256 }
257 // Acquire read block
258 ws_.rd_block_.lock(this);
259 }
260 // Immediately apply the mask to the portion
261 // of the buffer holding payload data.
262 if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
263 detail::mask_inplace(buffers_prefix(
264 clamp(ws_.rd_fh_.len),
265 ws_.rd_buf_.mutable_data()),
266 ws_.rd_key_);
267 if(detail::is_control(ws_.rd_fh_.op))
268 {
269 // Clear this otherwise the next
270 // frame will be considered final.
271 ws_.rd_fh_.fin = false;
272
273 // Handle ping frame
274 if(ws_.rd_fh_.op == detail::opcode::ping)
275 {
276 if(ws_.ctrl_cb_)
277 {
278 if(! cont_)
279 {
280 BOOST_ASIO_CORO_YIELD
281 boost::asio::post(
282 ws_.get_executor(),
283 std::move(*this));
284 BOOST_ASSERT(cont_);
285 }
286 }
287 {
288 auto const b = buffers_prefix(
289 clamp(ws_.rd_fh_.len),
290 ws_.rd_buf_.data());
291 auto const len = buffer_size(b);
292 BOOST_ASSERT(len == ws_.rd_fh_.len);
293 ping_data payload;
294 detail::read_ping(payload, b);
295 ws_.rd_buf_.consume(len);
296 // Ignore ping when closing
297 if(ws_.status_ == status::closing)
298 goto loop;
299 if(ws_.ctrl_cb_)
300 ws_.ctrl_cb_(
301 frame_type::ping, payload);
302 ws_.rd_fb_.reset();
303 ws_.template write_ping<
304 flat_static_buffer_base>(ws_.rd_fb_,
305 detail::opcode::pong, payload);
306 }
307
308 // Allow a close operation
309 // to acquire the read block
310 ws_.rd_block_.unlock(this);
311 ws_.paused_r_close_.maybe_invoke();
312
313 // Maybe suspend
314 if(! ws_.wr_block_.try_lock(this))
315 {
316 // Suspend
317 BOOST_ASIO_CORO_YIELD
318 ws_.paused_rd_.emplace(std::move(*this));
319
320 // Acquire the write block
321 ws_.wr_block_.lock(this);
322
323 // Resume
324 BOOST_ASIO_CORO_YIELD
325 boost::asio::post(
326 ws_.get_executor(), std::move(*this));
327 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
328
329 // Make sure the stream is open
330 if(! ws_.check_open(ec))
331 goto upcall;
332 }
333
334 // Send pong
335 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
336 BOOST_ASIO_CORO_YIELD
337 boost::asio::async_write(ws_.stream_,
338 ws_.rd_fb_.data(), std::move(*this));
339 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
340 if(! ws_.check_ok(ec))
341 goto upcall;
342 ws_.wr_block_.unlock(this);
343 ws_.paused_close_.maybe_invoke() ||
344 ws_.paused_ping_.maybe_invoke() ||
345 ws_.paused_wr_.maybe_invoke();
346 goto do_maybe_suspend;
347 }
348 // Handle pong frame
349 if(ws_.rd_fh_.op == detail::opcode::pong)
350 {
351 // Ignore pong when closing
352 if(! ws_.wr_close_ && ws_.ctrl_cb_)
353 {
354 if(! cont_)
355 {
356 BOOST_ASIO_CORO_YIELD
357 boost::asio::post(
358 ws_.get_executor(),
359 std::move(*this));
360 BOOST_ASSERT(cont_);
361 }
362 }
363 auto const cb = buffers_prefix(clamp(
364 ws_.rd_fh_.len), ws_.rd_buf_.data());
365 auto const len = buffer_size(cb);
366 BOOST_ASSERT(len == ws_.rd_fh_.len);
367 ping_data payload;
368 detail::read_ping(payload, cb);
369 ws_.rd_buf_.consume(len);
370 // Ignore pong when closing
371 if(! ws_.wr_close_ && ws_.ctrl_cb_)
372 ws_.ctrl_cb_(frame_type::pong, payload);
373 goto loop;
374 }
375 // Handle close frame
376 BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
377 {
378 if(ws_.ctrl_cb_)
379 {
380 if(! cont_)
381 {
382 BOOST_ASIO_CORO_YIELD
383 boost::asio::post(
384 ws_.get_executor(),
385 std::move(*this));
386 BOOST_ASSERT(cont_);
387 }
388 }
389 auto const cb = buffers_prefix(clamp(
390 ws_.rd_fh_.len), ws_.rd_buf_.data());
391 auto const len = buffer_size(cb);
392 BOOST_ASSERT(len == ws_.rd_fh_.len);
393 BOOST_ASSERT(! ws_.rd_close_);
394 ws_.rd_close_ = true;
395 close_reason cr;
396 detail::read_close(cr, cb, result_);
397 if(result_)
398 {
399 // _Fail the WebSocket Connection_
400 code_ = close_code::protocol_error;
401 goto close;
402 }
403 ws_.cr_ = cr;
404 ws_.rd_buf_.consume(len);
405 if(ws_.ctrl_cb_)
406 ws_.ctrl_cb_(frame_type::close,
407 ws_.cr_.reason);
408 // See if we are already closing
409 if(ws_.status_ == status::closing)
410 {
411 // _Close the WebSocket Connection_
412 BOOST_ASSERT(ws_.wr_close_);
413 code_ = close_code::none;
414 result_ = error::closed;
415 goto close;
416 }
417 // _Start the WebSocket Closing Handshake_
418 code_ = cr.code == close_code::none ?
419 close_code::normal :
420 static_cast<close_code>(cr.code);
421 result_ = error::closed;
422 goto close;
423 }
424 }
425 if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
426 {
427 // Empty non-final frame
428 goto loop;
429 }
430 ws_.rd_done_ = false;
431 }
432 if(! ws_.rd_deflated())
433 {
434 if(ws_.rd_remain_ > 0)
435 {
436 if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
437 (std::min)(clamp(ws_.rd_remain_),
438 buffer_size(cb_)))
439 {
440 // Fill the read buffer first, otherwise we
441 // get fewer bytes at the cost of one I/O.
442 BOOST_ASIO_CORO_YIELD
443 ws_.stream_.async_read_some(
444 ws_.rd_buf_.prepare(read_size(
445 ws_.rd_buf_, ws_.rd_buf_.max_size())),
446 std::move(*this));
447 if(! ws_.check_ok(ec))
448 goto upcall;
449 ws_.rd_buf_.commit(bytes_transferred);
450 if(ws_.rd_fh_.mask)
451 detail::mask_inplace(buffers_prefix(clamp(
452 ws_.rd_remain_), ws_.rd_buf_.mutable_data()),
453 ws_.rd_key_);
454 }
455 if(ws_.rd_buf_.size() > 0)
456 {
457 // Copy from the read buffer.
458 // The mask was already applied.
459 bytes_transferred = buffer_copy(cb_,
460 ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
461 auto const mb = buffers_prefix(
462 bytes_transferred, cb_);
463 ws_.rd_remain_ -= bytes_transferred;
464 if(ws_.rd_op_ == detail::opcode::text)
465 {
466 if(! ws_.rd_utf8_.write(mb) ||
467 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
468 ! ws_.rd_utf8_.finish()))
469 {
470 // _Fail the WebSocket Connection_
471 code_ = close_code::bad_payload;
472 result_ = error::bad_frame_payload;
473 goto close;
474 }
475 }
476 bytes_written_ += bytes_transferred;
477 ws_.rd_size_ += bytes_transferred;
478 ws_.rd_buf_.consume(bytes_transferred);
479 }
480 else
481 {
482 // Read into caller's buffer
483 BOOST_ASSERT(ws_.rd_remain_ > 0);
484 BOOST_ASSERT(buffer_size(cb_) > 0);
485 BOOST_ASSERT(buffer_size(buffers_prefix(
486 clamp(ws_.rd_remain_), cb_)) > 0);
487 BOOST_ASIO_CORO_YIELD
488 ws_.stream_.async_read_some(buffers_prefix(
489 clamp(ws_.rd_remain_), cb_), std::move(*this));
490 if(! ws_.check_ok(ec))
491 goto upcall;
492 BOOST_ASSERT(bytes_transferred > 0);
493 auto const mb = buffers_prefix(
494 bytes_transferred, cb_);
495 ws_.rd_remain_ -= bytes_transferred;
496 if(ws_.rd_fh_.mask)
497 detail::mask_inplace(mb, ws_.rd_key_);
498 if(ws_.rd_op_ == detail::opcode::text)
499 {
500 if(! ws_.rd_utf8_.write(mb) ||
501 (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
502 ! ws_.rd_utf8_.finish()))
503 {
504 // _Fail the WebSocket Connection_
505 code_ = close_code::bad_payload;
506 result_ = error::bad_frame_payload;
507 goto close;
508 }
509 }
510 bytes_written_ += bytes_transferred;
511 ws_.rd_size_ += bytes_transferred;
512 }
513 }
514 ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
515 }
516 else
517 {
518 // Read compressed message frame payload:
519 // inflate even if rd_fh_.len == 0, otherwise we
520 // never emit the end-of-stream deflate block.
521 while(buffer_size(cb_) > 0)
522 {
523 if( ws_.rd_remain_ > 0 &&
524 ws_.rd_buf_.size() == 0 &&
525 ! did_read_)
526 {
527 // read new
528 BOOST_ASIO_CORO_YIELD
529 ws_.stream_.async_read_some(
530 ws_.rd_buf_.prepare(read_size(
531 ws_.rd_buf_, ws_.rd_buf_.max_size())),
532 std::move(*this));
533 if(! ws_.check_ok(ec))
534 goto upcall;
535 BOOST_ASSERT(bytes_transferred > 0);
536 ws_.rd_buf_.commit(bytes_transferred);
537 if(ws_.rd_fh_.mask)
538 detail::mask_inplace(
539 buffers_prefix(clamp(ws_.rd_remain_),
540 ws_.rd_buf_.mutable_data()), ws_.rd_key_);
541 did_read_ = true;
542 }
543 zlib::z_params zs;
544 {
545 auto const out = buffers_front(cb_);
546 zs.next_out = out.data();
547 zs.avail_out = out.size();
548 BOOST_ASSERT(zs.avail_out > 0);
549 }
550 if(ws_.rd_remain_ > 0)
551 {
552 if(ws_.rd_buf_.size() > 0)
553 {
554 // use what's there
555 auto const in = buffers_prefix(
556 clamp(ws_.rd_remain_), buffers_front(
557 ws_.rd_buf_.data()));
558 zs.avail_in = in.size();
559 zs.next_in = in.data();
560 }
561 else
562 {
563 break;
564 }
565 }
566 else if(ws_.rd_fh_.fin)
567 {
568 // append the empty block codes
569 static std::uint8_t constexpr
570 empty_block[4] = {
571 0x00, 0x00, 0xff, 0xff };
572 zs.next_in = empty_block;
573 zs.avail_in = sizeof(empty_block);
574 ws_.inflate(zs, zlib::Flush::sync, ec);
575 if(! ec)
576 {
577 // https://github.com/madler/zlib/issues/280
578 if(zs.total_out > 0)
579 ec = error::partial_deflate_block;
580 }
581 if(! ws_.check_ok(ec))
582 goto upcall;
583 ws_.do_context_takeover_read(ws_.role_);
584 ws_.rd_done_ = true;
585 break;
586 }
587 else
588 {
589 break;
590 }
591 ws_.inflate(zs, zlib::Flush::sync, ec);
592 if(! ws_.check_ok(ec))
593 goto upcall;
594 if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
595 ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
596 {
597 // _Fail the WebSocket Connection_
598 code_ = close_code::too_big;
599 result_ = error::message_too_big;
600 goto close;
601 }
602 cb_.consume(zs.total_out);
603 ws_.rd_size_ += zs.total_out;
604 ws_.rd_remain_ -= zs.total_in;
605 ws_.rd_buf_.consume(zs.total_in);
606 bytes_written_ += zs.total_out;
607 }
608 if(ws_.rd_op_ == detail::opcode::text)
609 {
610 // check utf8
611 if(! ws_.rd_utf8_.write(
612 buffers_prefix(bytes_written_, bs_)) || (
613 ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
614 {
615 // _Fail the WebSocket Connection_
616 code_ = close_code::bad_payload;
617 result_ = error::bad_frame_payload;
618 goto close;
619 }
620 }
621 }
622 goto upcall;
623
624 close:
625 if(ws_.wr_block_.try_lock(this))
626 {
627 // Make sure the stream is open
628 BOOST_ASSERT(ws_.status_ == status::open);
629 }
630 else
631 {
632 // Suspend
633 BOOST_ASIO_CORO_YIELD
634 ws_.paused_rd_.emplace(std::move(*this));
635
636 // Acquire the write block
637 ws_.wr_block_.lock(this);
638
639 // Resume
640 BOOST_ASIO_CORO_YIELD
641 boost::asio::post(
642 ws_.get_executor(), std::move(*this));
643 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
644
645 // Make sure the stream is open
646 if(! ws_.check_open(ec))
647 goto upcall;
648 }
649
650 // Set the status
651 ws_.status_ = status::closing;
652
653 if(! ws_.wr_close_)
654 {
655 ws_.wr_close_ = true;
656
657 // Serialize close frame
658 ws_.rd_fb_.reset();
659 ws_.template write_close<
660 flat_static_buffer_base>(
661 ws_.rd_fb_, code_);
662
663 // Send close frame
664 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
665 BOOST_ASIO_CORO_YIELD
666 boost::asio::async_write(
667 ws_.stream_, ws_.rd_fb_.data(),
668 std::move(*this));
669 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
670 if(! ws_.check_ok(ec))
671 goto upcall;
672 }
673
674 // Teardown
675 using beast::websocket::async_teardown;
676 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
677 BOOST_ASIO_CORO_YIELD
678 async_teardown(ws_.role_,
679 ws_.stream_, std::move(*this));
680 BOOST_ASSERT(ws_.wr_block_.is_locked(this));
681 if(ec == boost::asio::error::eof)
682 {
683 // Rationale:
684 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
685 ec.assign(0, ec.category());
686 }
687 if(! ec)
688 ec = result_;
689 if(ec && ec != error::closed)
690 ws_.status_ = status::failed;
691 else
692 ws_.status_ = status::closed;
693 ws_.close();
694
695 upcall:
696 ws_.rd_block_.try_unlock(this);
697 ws_.paused_r_close_.maybe_invoke();
698 if(ws_.wr_block_.try_unlock(this))
699 ws_.paused_close_.maybe_invoke() ||
700 ws_.paused_ping_.maybe_invoke() ||
701 ws_.paused_wr_.maybe_invoke();
702 if(! cont_)
703 return boost::asio::post(
704 ws_.stream_.get_executor(),
705 bind_handler(std::move(h_),
706 ec, bytes_written_));
707 h_(ec, bytes_written_);
708 }
709 }
710
711 //------------------------------------------------------------------------------
712
713 template<class NextLayer, bool deflateSupported>
714 template<
715 class DynamicBuffer,
716 class Handler>
717 class stream<NextLayer, deflateSupported>::read_op
718 : public boost::asio::coroutine
719 {
720 Handler h_;
721 stream<NextLayer, deflateSupported>& ws_;
722 DynamicBuffer& b_;
723 std::size_t limit_;
724 std::size_t bytes_written_ = 0;
725 bool some_;
726
727 public:
728 using allocator_type =
729 boost::asio::associated_allocator_t<Handler>;
730
731 read_op(read_op&&) = default;
732 read_op(read_op const&) = delete;
733
734 template<class DeducedHandler>
735 read_op(
736 DeducedHandler&& h,
737 stream<NextLayer, deflateSupported>& ws,
738 DynamicBuffer& b,
739 std::size_t limit,
740 bool some)
741 : h_(std::forward<DeducedHandler>(h))
742 , ws_(ws)
743 , b_(b)
744 , limit_(limit ? limit : (
745 std::numeric_limits<std::size_t>::max)())
746 , some_(some)
747 {
748 }
749
750 allocator_type
751 get_allocator() const noexcept
752 {
753 return (boost::asio::get_associated_allocator)(h_);
754 }
755
756 using executor_type = boost::asio::associated_executor_t<
757 Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
758
759 executor_type
760 get_executor() const noexcept
761 {
762 return (boost::asio::get_associated_executor)(
763 h_, ws_.get_executor());
764 }
765
766 void operator()(
767 error_code ec = {},
768 std::size_t bytes_transferred = 0);
769
770 friend
771 bool asio_handler_is_continuation(read_op* op)
772 {
773 using boost::asio::asio_handler_is_continuation;
774 return asio_handler_is_continuation(
775 std::addressof(op->h_));
776 }
777
778 template<class Function>
779 friend
780 void asio_handler_invoke(Function&& f, read_op* op)
781 {
782 using boost::asio::asio_handler_invoke;
783 asio_handler_invoke(f, std::addressof(op->h_));
784 }
785 };
786
787 template<class NextLayer, bool deflateSupported>
788 template<class DynamicBuffer, class Handler>
789 void
790 stream<NextLayer, deflateSupported>::
791 read_op<DynamicBuffer, Handler>::
792 operator()(
793 error_code ec,
794 std::size_t bytes_transferred)
795 {
796 using beast::detail::clamp;
797 using buffers_type = typename
798 DynamicBuffer::mutable_buffers_type;
799 boost::optional<buffers_type> mb;
800 BOOST_ASIO_CORO_REENTER(*this)
801 {
802 do
803 {
804 try
805 {
806 mb.emplace(b_.prepare(clamp(
807 ws_.read_size_hint(b_), limit_)));
808 }
809 catch(std::length_error const&)
810 {
811 ec = error::buffer_overflow;
812 }
813 if(ec)
814 {
815 BOOST_ASIO_CORO_YIELD
816 boost::asio::post(
817 ws_.get_executor(),
818 bind_handler(std::move(*this),
819 error::buffer_overflow, 0));
820 break;
821 }
822 BOOST_ASIO_CORO_YIELD
823 read_some_op<buffers_type, read_op>{
824 std::move(*this), ws_, *mb}(
825 {}, 0, false);
826 if(ec)
827 break;
828 b_.commit(bytes_transferred);
829 bytes_written_ += bytes_transferred;
830 }
831 while(! some_ && ! ws_.is_message_done());
832 h_(ec, bytes_written_);
833 }
834 }
835
836 //------------------------------------------------------------------------------
837
838 template<class NextLayer, bool deflateSupported>
839 template<class DynamicBuffer>
840 std::size_t
841 stream<NextLayer, deflateSupported>::
842 read(DynamicBuffer& buffer)
843 {
844 static_assert(is_sync_stream<next_layer_type>::value,
845 "SyncStream requirements not met");
846 static_assert(
847 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
848 "DynamicBuffer requirements not met");
849 error_code ec;
850 auto const bytes_written = read(buffer, ec);
851 if(ec)
852 BOOST_THROW_EXCEPTION(system_error{ec});
853 return bytes_written;
854 }
855
856 template<class NextLayer, bool deflateSupported>
857 template<class DynamicBuffer>
858 std::size_t
859 stream<NextLayer, deflateSupported>::
860 read(DynamicBuffer& buffer, error_code& ec)
861 {
862 static_assert(is_sync_stream<next_layer_type>::value,
863 "SyncStream requirements not met");
864 static_assert(
865 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
866 "DynamicBuffer requirements not met");
867 std::size_t bytes_written = 0;
868 do
869 {
870 bytes_written += read_some(buffer, 0, ec);
871 if(ec)
872 return bytes_written;
873 }
874 while(! is_message_done());
875 return bytes_written;
876 }
877
878 template<class NextLayer, bool deflateSupported>
879 template<class DynamicBuffer, class ReadHandler>
880 BOOST_ASIO_INITFN_RESULT_TYPE(
881 ReadHandler, void(error_code, std::size_t))
882 stream<NextLayer, deflateSupported>::
883 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
884 {
885 static_assert(is_async_stream<next_layer_type>::value,
886 "AsyncStream requirements not met");
887 static_assert(
888 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
889 "DynamicBuffer requirements not met");
890 BOOST_BEAST_HANDLER_INIT(
891 ReadHandler, void(error_code, std::size_t));
892 read_op<
893 DynamicBuffer,
894 BOOST_ASIO_HANDLER_TYPE(
895 ReadHandler, void(error_code, std::size_t))>{
896 std::move(init.completion_handler),
897 *this,
898 buffer,
899 0,
900 false}();
901 return init.result.get();
902 }
903
904 //------------------------------------------------------------------------------
905
906 template<class NextLayer, bool deflateSupported>
907 template<class DynamicBuffer>
908 std::size_t
909 stream<NextLayer, deflateSupported>::
910 read_some(
911 DynamicBuffer& buffer,
912 std::size_t limit)
913 {
914 static_assert(is_sync_stream<next_layer_type>::value,
915 "SyncStream requirements not met");
916 static_assert(
917 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
918 "DynamicBuffer requirements not met");
919 error_code ec;
920 auto const bytes_written =
921 read_some(buffer, limit, ec);
922 if(ec)
923 BOOST_THROW_EXCEPTION(system_error{ec});
924 return bytes_written;
925 }
926
927 template<class NextLayer, bool deflateSupported>
928 template<class DynamicBuffer>
929 std::size_t
930 stream<NextLayer, deflateSupported>::
931 read_some(
932 DynamicBuffer& buffer,
933 std::size_t limit,
934 error_code& ec)
935 {
936 static_assert(is_sync_stream<next_layer_type>::value,
937 "SyncStream requirements not met");
938 static_assert(
939 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
940 "DynamicBuffer requirements not met");
941 using beast::detail::clamp;
942 if(! limit)
943 limit = (std::numeric_limits<std::size_t>::max)();
944 auto const size =
945 clamp(read_size_hint(buffer), limit);
946 BOOST_ASSERT(size > 0);
947 boost::optional<typename
948 DynamicBuffer::mutable_buffers_type> mb;
949 try
950 {
951 mb.emplace(buffer.prepare(size));
952 }
953 catch(std::length_error const&)
954 {
955 ec = error::buffer_overflow;
956 return 0;
957 }
958 auto const bytes_written = read_some(*mb, ec);
959 buffer.commit(bytes_written);
960 return bytes_written;
961 }
962
963 template<class NextLayer, bool deflateSupported>
964 template<class DynamicBuffer, class ReadHandler>
965 BOOST_ASIO_INITFN_RESULT_TYPE(
966 ReadHandler, void(error_code, std::size_t))
967 stream<NextLayer, deflateSupported>::
968 async_read_some(
969 DynamicBuffer& buffer,
970 std::size_t limit,
971 ReadHandler&& handler)
972 {
973 static_assert(is_async_stream<next_layer_type>::value,
974 "AsyncStream requirements not met");
975 static_assert(
976 boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
977 "DynamicBuffer requirements not met");
978 BOOST_BEAST_HANDLER_INIT(
979 ReadHandler, void(error_code, std::size_t));
980 read_op<
981 DynamicBuffer,
982 BOOST_ASIO_HANDLER_TYPE(
983 ReadHandler, void(error_code, std::size_t))>{
984 std::move(init.completion_handler),
985 *this,
986 buffer,
987 limit,
988 true}({}, 0);
989 return init.result.get();
990 }
991
992 //------------------------------------------------------------------------------
993
994 template<class NextLayer, bool deflateSupported>
995 template<class MutableBufferSequence>
996 std::size_t
997 stream<NextLayer, deflateSupported>::
998 read_some(
999 MutableBufferSequence const& buffers)
1000 {
1001 static_assert(is_sync_stream<next_layer_type>::value,
1002 "SyncStream requirements not met");
1003 static_assert(boost::asio::is_mutable_buffer_sequence<
1004 MutableBufferSequence>::value,
1005 "MutableBufferSequence requirements not met");
1006 error_code ec;
1007 auto const bytes_written = read_some(buffers, ec);
1008 if(ec)
1009 BOOST_THROW_EXCEPTION(system_error{ec});
1010 return bytes_written;
1011 }
1012
1013 template<class NextLayer, bool deflateSupported>
1014 template<class MutableBufferSequence>
1015 std::size_t
1016 stream<NextLayer, deflateSupported>::
1017 read_some(
1018 MutableBufferSequence const& buffers,
1019 error_code& ec)
1020 {
1021 static_assert(is_sync_stream<next_layer_type>::value,
1022 "SyncStream requirements not met");
1023 static_assert(boost::asio::is_mutable_buffer_sequence<
1024 MutableBufferSequence>::value,
1025 "MutableBufferSequence requirements not met");
1026 using beast::detail::clamp;
1027 using boost::asio::buffer;
1028 using boost::asio::buffer_size;
1029 close_code code{};
1030 std::size_t bytes_written = 0;
1031 ec.assign(0, ec.category());
1032 // Make sure the stream is open
1033 if(! check_open(ec))
1034 return 0;
1035 loop:
1036 // See if we need to read a frame header. This
1037 // condition is structured to give the decompressor
1038 // a chance to emit the final empty deflate block
1039 //
1040 if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
1041 {
1042 // Read frame header
1043 error_code result;
1044 while(! parse_fh(rd_fh_, rd_buf_, result))
1045 {
1046 if(result)
1047 {
1048 // _Fail the WebSocket Connection_
1049 if(result == error::message_too_big)
1050 code = close_code::too_big;
1051 else
1052 code = close_code::protocol_error;
1053 do_fail(code, result, ec);
1054 return bytes_written;
1055 }
1056 auto const bytes_transferred =
1057 stream_.read_some(
1058 rd_buf_.prepare(read_size(
1059 rd_buf_, rd_buf_.max_size())),
1060 ec);
1061 if(! check_ok(ec))
1062 return bytes_written;
1063 rd_buf_.commit(bytes_transferred);
1064 }
1065 // Immediately apply the mask to the portion
1066 // of the buffer holding payload data.
1067 if(rd_fh_.len > 0 && rd_fh_.mask)
1068 detail::mask_inplace(buffers_prefix(
1069 clamp(rd_fh_.len), rd_buf_.mutable_data()),
1070 rd_key_);
1071 if(detail::is_control(rd_fh_.op))
1072 {
1073 // Get control frame payload
1074 auto const b = buffers_prefix(
1075 clamp(rd_fh_.len), rd_buf_.data());
1076 auto const len = buffer_size(b);
1077 BOOST_ASSERT(len == rd_fh_.len);
1078
1079 // Clear this otherwise the next
1080 // frame will be considered final.
1081 rd_fh_.fin = false;
1082
1083 // Handle ping frame
1084 if(rd_fh_.op == detail::opcode::ping)
1085 {
1086 ping_data payload;
1087 detail::read_ping(payload, b);
1088 rd_buf_.consume(len);
1089 if(wr_close_)
1090 {
1091 // Ignore ping when closing
1092 goto loop;
1093 }
1094 if(ctrl_cb_)
1095 ctrl_cb_(frame_type::ping, payload);
1096 detail::frame_buffer fb;
1097 write_ping<flat_static_buffer_base>(fb,
1098 detail::opcode::pong, payload);
1099 boost::asio::write(stream_, fb.data(), ec);
1100 if(! check_ok(ec))
1101 return bytes_written;
1102 goto loop;
1103 }
1104 // Handle pong frame
1105 if(rd_fh_.op == detail::opcode::pong)
1106 {
1107 ping_data payload;
1108 detail::read_ping(payload, b);
1109 rd_buf_.consume(len);
1110 if(ctrl_cb_)
1111 ctrl_cb_(frame_type::pong, payload);
1112 goto loop;
1113 }
1114 // Handle close frame
1115 BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
1116 {
1117 BOOST_ASSERT(! rd_close_);
1118 rd_close_ = true;
1119 close_reason cr;
1120 detail::read_close(cr, b, result);
1121 if(result)
1122 {
1123 // _Fail the WebSocket Connection_
1124 do_fail(close_code::protocol_error,
1125 result, ec);
1126 return bytes_written;
1127 }
1128 cr_ = cr;
1129 rd_buf_.consume(len);
1130 if(ctrl_cb_)
1131 ctrl_cb_(frame_type::close, cr_.reason);
1132 BOOST_ASSERT(! wr_close_);
1133 // _Start the WebSocket Closing Handshake_
1134 do_fail(
1135 cr.code == close_code::none ?
1136 close_code::normal :
1137 static_cast<close_code>(cr.code),
1138 error::closed, ec);
1139 return bytes_written;
1140 }
1141 }
1142 if(rd_fh_.len == 0 && ! rd_fh_.fin)
1143 {
1144 // Empty non-final frame
1145 goto loop;
1146 }
1147 rd_done_ = false;
1148 }
1149 else
1150 {
1151 ec.assign(0, ec.category());
1152 }
1153 if(! this->rd_deflated())
1154 {
1155 if(rd_remain_ > 0)
1156 {
1157 if(rd_buf_.size() == 0 && rd_buf_.max_size() >
1158 (std::min)(clamp(rd_remain_),
1159 buffer_size(buffers)))
1160 {
1161 // Fill the read buffer first, otherwise we
1162 // get fewer bytes at the cost of one I/O.
1163 rd_buf_.commit(stream_.read_some(
1164 rd_buf_.prepare(read_size(rd_buf_,
1165 rd_buf_.max_size())), ec));
1166 if(! check_ok(ec))
1167 return bytes_written;
1168 if(rd_fh_.mask)
1169 detail::mask_inplace(
1170 buffers_prefix(clamp(rd_remain_),
1171 rd_buf_.mutable_data()), rd_key_);
1172 }
1173 if(rd_buf_.size() > 0)
1174 {
1175 // Copy from the read buffer.
1176 // The mask was already applied.
1177 auto const bytes_transferred =
1178 buffer_copy(buffers, rd_buf_.data(),
1179 clamp(rd_remain_));
1180 auto const mb = buffers_prefix(
1181 bytes_transferred, buffers);
1182 rd_remain_ -= bytes_transferred;
1183 if(rd_op_ == detail::opcode::text)
1184 {
1185 if(! rd_utf8_.write(mb) ||
1186 (rd_remain_ == 0 && rd_fh_.fin &&
1187 ! rd_utf8_.finish()))
1188 {
1189 // _Fail the WebSocket Connection_
1190 do_fail(close_code::bad_payload,
1191 error::bad_frame_payload, ec);
1192 return bytes_written;
1193 }
1194 }
1195 bytes_written += bytes_transferred;
1196 rd_size_ += bytes_transferred;
1197 rd_buf_.consume(bytes_transferred);
1198 }
1199 else
1200 {
1201 // Read into caller's buffer
1202 BOOST_ASSERT(rd_remain_ > 0);
1203 BOOST_ASSERT(buffer_size(buffers) > 0);
1204 BOOST_ASSERT(buffer_size(buffers_prefix(
1205 clamp(rd_remain_), buffers)) > 0);
1206 auto const bytes_transferred =
1207 stream_.read_some(buffers_prefix(
1208 clamp(rd_remain_), buffers), ec);
1209 if(! check_ok(ec))
1210 return bytes_written;
1211 BOOST_ASSERT(bytes_transferred > 0);
1212 auto const mb = buffers_prefix(
1213 bytes_transferred, buffers);
1214 rd_remain_ -= bytes_transferred;
1215 if(rd_fh_.mask)
1216 detail::mask_inplace(mb, rd_key_);
1217 if(rd_op_ == detail::opcode::text)
1218 {
1219 if(! rd_utf8_.write(mb) ||
1220 (rd_remain_ == 0 && rd_fh_.fin &&
1221 ! rd_utf8_.finish()))
1222 {
1223 // _Fail the WebSocket Connection_
1224 do_fail(close_code::bad_payload,
1225 error::bad_frame_payload, ec);
1226 return bytes_written;
1227 }
1228 }
1229 bytes_written += bytes_transferred;
1230 rd_size_ += bytes_transferred;
1231 }
1232 }
1233 rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
1234 }
1235 else
1236 {
1237 // Read compressed message frame payload:
1238 // inflate even if rd_fh_.len == 0, otherwise we
1239 // never emit the end-of-stream deflate block.
1240 //
1241 bool did_read = false;
1242 buffers_suffix<MutableBufferSequence> cb{buffers};
1243 while(buffer_size(cb) > 0)
1244 {
1245 zlib::z_params zs;
1246 {
1247 auto const out = buffers_front(cb);
1248 zs.next_out = out.data();
1249 zs.avail_out = out.size();
1250 BOOST_ASSERT(zs.avail_out > 0);
1251 }
1252 if(rd_remain_ > 0)
1253 {
1254 if(rd_buf_.size() > 0)
1255 {
1256 // use what's there
1257 auto const in = buffers_prefix(
1258 clamp(rd_remain_), buffers_front(
1259 rd_buf_.data()));
1260 zs.avail_in = in.size();
1261 zs.next_in = in.data();
1262 }
1263 else if(! did_read)
1264 {
1265 // read new
1266 auto const bytes_transferred =
1267 stream_.read_some(
1268 rd_buf_.prepare(read_size(
1269 rd_buf_, rd_buf_.max_size())),
1270 ec);
1271 if(! check_ok(ec))
1272 return bytes_written;
1273 BOOST_ASSERT(bytes_transferred > 0);
1274 rd_buf_.commit(bytes_transferred);
1275 if(rd_fh_.mask)
1276 detail::mask_inplace(
1277 buffers_prefix(clamp(rd_remain_),
1278 rd_buf_.mutable_data()), rd_key_);
1279 auto const in = buffers_prefix(
1280 clamp(rd_remain_), buffers_front(
1281 rd_buf_.data()));
1282 zs.avail_in = in.size();
1283 zs.next_in = in.data();
1284 did_read = true;
1285 }
1286 else
1287 {
1288 break;
1289 }
1290 }
1291 else if(rd_fh_.fin)
1292 {
1293 // append the empty block codes
1294 static std::uint8_t constexpr
1295 empty_block[4] = {
1296 0x00, 0x00, 0xff, 0xff };
1297 zs.next_in = empty_block;
1298 zs.avail_in = sizeof(empty_block);
1299 this->inflate(zs, zlib::Flush::sync, ec);
1300 if(! ec)
1301 {
1302 // https://github.com/madler/zlib/issues/280
1303 if(zs.total_out > 0)
1304 ec = error::partial_deflate_block;
1305 }
1306 if(! check_ok(ec))
1307 return bytes_written;
1308 this->do_context_takeover_read(role_);
1309 rd_done_ = true;
1310 break;
1311 }
1312 else
1313 {
1314 break;
1315 }
1316 this->inflate(zs, zlib::Flush::sync, ec);
1317 if(! check_ok(ec))
1318 return bytes_written;
1319 if(rd_msg_max_ && beast::detail::sum_exceeds(
1320 rd_size_, zs.total_out, rd_msg_max_))
1321 {
1322 do_fail(close_code::too_big,
1323 error::message_too_big, ec);
1324 return bytes_written;
1325 }
1326 cb.consume(zs.total_out);
1327 rd_size_ += zs.total_out;
1328 rd_remain_ -= zs.total_in;
1329 rd_buf_.consume(zs.total_in);
1330 bytes_written += zs.total_out;
1331 }
1332 if(rd_op_ == detail::opcode::text)
1333 {
1334 // check utf8
1335 if(! rd_utf8_.write(
1336 buffers_prefix(bytes_written, buffers)) || (
1337 rd_done_ && ! rd_utf8_.finish()))
1338 {
1339 // _Fail the WebSocket Connection_
1340 do_fail(close_code::bad_payload,
1341 error::bad_frame_payload, ec);
1342 return bytes_written;
1343 }
1344 }
1345 }
1346 return bytes_written;
1347 }
1348
1349 template<class NextLayer, bool deflateSupported>
1350 template<class MutableBufferSequence, class ReadHandler>
1351 BOOST_ASIO_INITFN_RESULT_TYPE(
1352 ReadHandler, void(error_code, std::size_t))
1353 stream<NextLayer, deflateSupported>::
1354 async_read_some(
1355 MutableBufferSequence const& buffers,
1356 ReadHandler&& handler)
1357 {
1358 static_assert(is_async_stream<next_layer_type>::value,
1359 "AsyncStream requirements not met");
1360 static_assert(boost::asio::is_mutable_buffer_sequence<
1361 MutableBufferSequence>::value,
1362 "MutableBufferSequence requirements not met");
1363 BOOST_BEAST_HANDLER_INIT(
1364 ReadHandler, void(error_code, std::size_t));
1365 read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
1366 ReadHandler, void(error_code, std::size_t))>{
1367 std::move(init.completion_handler), *this, buffers}(
1368 {}, 0, false);
1369 return init.result.get();
1370 }
1371
1372 } // websocket
1373 } // beast
1374 } // boost
1375
1376 #endif