]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/beast/websocket/impl/read.hpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / read.hpp
1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
12
13 #include <boost/beast/core/buffer_traits.hpp>
14 #include <boost/beast/websocket/teardown.hpp>
15 #include <boost/beast/websocket/detail/mask.hpp>
16 #include <boost/beast/websocket/impl/stream_impl.hpp>
17 #include <boost/beast/core/async_base.hpp>
18 #include <boost/beast/core/bind_handler.hpp>
19 #include <boost/beast/core/buffers_prefix.hpp>
20 #include <boost/beast/core/buffers_suffix.hpp>
21 #include <boost/beast/core/flat_static_buffer.hpp>
22 #include <boost/beast/core/read_size.hpp>
23 #include <boost/beast/core/stream_traits.hpp>
24 #include <boost/beast/core/detail/bind_continuation.hpp>
25 #include <boost/beast/core/detail/buffer.hpp>
26 #include <boost/beast/core/detail/clamp.hpp>
27 #include <boost/beast/core/detail/config.hpp>
28 #include <boost/asio/coroutine.hpp>
29 #include <boost/asio/post.hpp>
30 #include <boost/assert.hpp>
31 #include <boost/config.hpp>
32 #include <boost/optional.hpp>
33 #include <boost/throw_exception.hpp>
34 #include <algorithm>
35 #include <limits>
36 #include <memory>
37
38 namespace boost {
39 namespace beast {
40 namespace websocket {
41
42 /* Read some message data into a buffer sequence.
43
44 Also reads and handles control frames.
45 */
46 template<class NextLayer, bool deflateSupported>
47 template<class Handler, class MutableBufferSequence>
48 class stream<NextLayer, deflateSupported>::read_some_op
49 : public beast::async_base<
50 Handler, beast::executor_type<stream>>
51 , public asio::coroutine
52 {
53 boost::weak_ptr<impl_type> wp_;
54 MutableBufferSequence bs_;
55 buffers_suffix<MutableBufferSequence> cb_;
56 std::size_t bytes_written_ = 0;
57 error_code result_;
58 close_code code_;
59 bool did_read_ = false;
60
61 public:
62 static constexpr int id = 1; // for soft_mutex
63
64 template<class Handler_>
65 read_some_op(
66 Handler_&& h,
67 boost::shared_ptr<impl_type> const& sp,
68 MutableBufferSequence const& bs)
69 : async_base<
70 Handler, beast::executor_type<stream>>(
71 std::forward<Handler_>(h),
72 sp->stream().get_executor())
73 , wp_(sp)
74 , bs_(bs)
75 , cb_(bs)
76 , code_(close_code::none)
77 {
78 (*this)({}, 0, false);
79 }
80
81 void operator()(
82 error_code ec = {},
83 std::size_t bytes_transferred = 0,
84 bool cont = true)
85 {
86 using beast::detail::clamp;
87 auto sp = wp_.lock();
88 if(! sp)
89 {
90 ec = net::error::operation_aborted;
91 bytes_written_ = 0;
92 return this->complete(cont, ec, bytes_written_);
93 }
94 auto& impl = *sp;
95 BOOST_ASIO_CORO_REENTER(*this)
96 {
97 impl.update_timer(this->get_executor());
98
99 acquire_read_lock:
100 // Acquire the read lock
101 if(! impl.rd_block.try_lock(this))
102 {
103 do_suspend:
104 BOOST_ASIO_CORO_YIELD
105 impl.op_r_rd.emplace(std::move(*this));
106 impl.rd_block.lock(this);
107 BOOST_ASIO_CORO_YIELD
108 net::post(std::move(*this));
109 BOOST_ASSERT(impl.rd_block.is_locked(this));
110
111 // VFALCO Is this check correct here?
112 BOOST_ASSERT(! ec && impl.check_stop_now(ec));
113 if(impl.check_stop_now(ec))
114 {
115 BOOST_ASSERT(ec == net::error::operation_aborted);
116 goto upcall;
117 }
118 // VFALCO Should never get here
119
120 // The only way to get read blocked is if
121 // a `close_op` wrote a close frame
122 BOOST_ASSERT(impl.wr_close);
123 BOOST_ASSERT(impl.status_ != status::open);
124 ec = net::error::operation_aborted;
125 goto upcall;
126 }
127 else
128 {
129 // Make sure the stream is not closed
130 if( impl.status_ == status::closed ||
131 impl.status_ == status::failed)
132 {
133 ec = net::error::operation_aborted;
134 goto upcall;
135 }
136 }
137
138 // if status_ == status::closing, we want to suspend
139 // the read operation until the close completes,
140 // then finish the read with operation_aborted.
141
142 loop:
143 BOOST_ASSERT(impl.rd_block.is_locked(this));
144 // See if we need to read a frame header. This
145 // condition is structured to give the decompressor
146 // a chance to emit the final empty deflate block
147 //
148 if(impl.rd_remain == 0 &&
149 (! impl.rd_fh.fin || impl.rd_done))
150 {
151 // Read frame header
152 while(! impl.parse_fh(
153 impl.rd_fh, impl.rd_buf, result_))
154 {
155 if(result_)
156 {
157 // _Fail the WebSocket Connection_
158 if(result_ == error::message_too_big)
159 code_ = close_code::too_big;
160 else
161 code_ = close_code::protocol_error;
162 goto close;
163 }
164 BOOST_ASSERT(impl.rd_block.is_locked(this));
165 BOOST_ASIO_CORO_YIELD
166 impl.stream().async_read_some(
167 impl.rd_buf.prepare(read_size(
168 impl.rd_buf, impl.rd_buf.max_size())),
169 std::move(*this));
170 BOOST_ASSERT(impl.rd_block.is_locked(this));
171 impl.rd_buf.commit(bytes_transferred);
172 if(impl.check_stop_now(ec))
173 goto upcall;
174 impl.reset_idle();
175
176 // Allow a close operation
177 // to acquire the read block
178 impl.rd_block.unlock(this);
179 if( impl.op_r_close.maybe_invoke())
180 {
181 // Suspend
182 BOOST_ASSERT(impl.rd_block.is_locked());
183 goto do_suspend;
184 }
185 // Acquire read block
186 impl.rd_block.lock(this);
187 }
188 // Immediately apply the mask to the portion
189 // of the buffer holding payload data.
190 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
191 detail::mask_inplace(buffers_prefix(
192 clamp(impl.rd_fh.len),
193 impl.rd_buf.data()),
194 impl.rd_key);
195 if(detail::is_control(impl.rd_fh.op))
196 {
197 // Clear this otherwise the next
198 // frame will be considered final.
199 impl.rd_fh.fin = false;
200
201 // Handle ping frame
202 if(impl.rd_fh.op == detail::opcode::ping)
203 {
204 if(impl.ctrl_cb)
205 {
206 if(! cont)
207 {
208 BOOST_ASIO_CORO_YIELD
209 net::post(std::move(*this));
210 BOOST_ASSERT(cont);
211 // VFALCO call check_stop_now() here?
212 }
213 }
214 {
215 auto const b = buffers_prefix(
216 clamp(impl.rd_fh.len),
217 impl.rd_buf.data());
218 auto const len = buffer_bytes(b);
219 BOOST_ASSERT(len == impl.rd_fh.len);
220 ping_data payload;
221 detail::read_ping(payload, b);
222 impl.rd_buf.consume(len);
223 // Ignore ping when closing
224 if(impl.status_ == status::closing)
225 goto loop;
226 if(impl.ctrl_cb)
227 impl.ctrl_cb(
228 frame_type::ping, payload);
229 impl.rd_fb.clear();
230 impl.template write_ping<
231 flat_static_buffer_base>(impl.rd_fb,
232 detail::opcode::pong, payload);
233 }
234
235 // Allow a close operation
236 // to acquire the read block
237 impl.rd_block.unlock(this);
238 impl.op_r_close.maybe_invoke();
239
240 // Acquire the write lock
241 if(! impl.wr_block.try_lock(this))
242 {
243 BOOST_ASIO_CORO_YIELD
244 impl.op_rd.emplace(std::move(*this));
245 impl.wr_block.lock(this);
246 BOOST_ASIO_CORO_YIELD
247 net::post(std::move(*this));
248 BOOST_ASSERT(impl.wr_block.is_locked(this));
249 if(impl.check_stop_now(ec))
250 goto upcall;
251 }
252
253 // Send pong
254 BOOST_ASSERT(impl.wr_block.is_locked(this));
255 BOOST_ASIO_CORO_YIELD
256 net::async_write(
257 impl.stream(), impl.rd_fb.data(),
258 beast::detail::bind_continuation(std::move(*this)));
259 BOOST_ASSERT(impl.wr_block.is_locked(this));
260 if(impl.check_stop_now(ec))
261 goto upcall;
262 impl.wr_block.unlock(this);
263 impl.op_close.maybe_invoke()
264 || impl.op_idle_ping.maybe_invoke()
265 || impl.op_ping.maybe_invoke()
266 || impl.op_wr.maybe_invoke();
267 goto acquire_read_lock;
268 }
269
270 // Handle pong frame
271 if(impl.rd_fh.op == detail::opcode::pong)
272 {
273 // Ignore pong when closing
274 if(! impl.wr_close && impl.ctrl_cb)
275 {
276 if(! cont)
277 {
278 BOOST_ASIO_CORO_YIELD
279 net::post(std::move(*this));
280 BOOST_ASSERT(cont);
281 }
282 }
283 auto const cb = buffers_prefix(clamp(
284 impl.rd_fh.len), impl.rd_buf.data());
285 auto const len = buffer_bytes(cb);
286 BOOST_ASSERT(len == impl.rd_fh.len);
287 ping_data payload;
288 detail::read_ping(payload, cb);
289 impl.rd_buf.consume(len);
290 // Ignore pong when closing
291 if(! impl.wr_close && impl.ctrl_cb)
292 impl.ctrl_cb(frame_type::pong, payload);
293 goto loop;
294 }
295
296 // Handle close frame
297 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
298 {
299 if(impl.ctrl_cb)
300 {
301 if(! cont)
302 {
303 BOOST_ASIO_CORO_YIELD
304 net::post(std::move(*this));
305 BOOST_ASSERT(cont);
306 }
307 }
308 auto const cb = buffers_prefix(clamp(
309 impl.rd_fh.len), impl.rd_buf.data());
310 auto const len = buffer_bytes(cb);
311 BOOST_ASSERT(len == impl.rd_fh.len);
312 BOOST_ASSERT(! impl.rd_close);
313 impl.rd_close = true;
314 close_reason cr;
315 detail::read_close(cr, cb, result_);
316 if(result_)
317 {
318 // _Fail the WebSocket Connection_
319 code_ = close_code::protocol_error;
320 goto close;
321 }
322 impl.cr = cr;
323 impl.rd_buf.consume(len);
324 if(impl.ctrl_cb)
325 impl.ctrl_cb(frame_type::close,
326 impl.cr.reason);
327 // See if we are already closing
328 if(impl.status_ == status::closing)
329 {
330 // _Close the WebSocket Connection_
331 BOOST_ASSERT(impl.wr_close);
332 code_ = close_code::none;
333 result_ = error::closed;
334 goto close;
335 }
336 // _Start the WebSocket Closing Handshake_
337 code_ = cr.code == close_code::none ?
338 close_code::normal :
339 static_cast<close_code>(cr.code);
340 result_ = error::closed;
341 goto close;
342 }
343 }
344 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
345 {
346 // Empty non-final frame
347 goto loop;
348 }
349 impl.rd_done = false;
350 }
351 if(! impl.rd_deflated())
352 {
353 if(impl.rd_remain > 0)
354 {
355 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
356 (std::min)(clamp(impl.rd_remain),
357 buffer_bytes(cb_)))
358 {
359 // Fill the read buffer first, otherwise we
360 // get fewer bytes at the cost of one I/O.
361 BOOST_ASIO_CORO_YIELD
362 impl.stream().async_read_some(
363 impl.rd_buf.prepare(read_size(
364 impl.rd_buf, impl.rd_buf.max_size())),
365 std::move(*this));
366 impl.rd_buf.commit(bytes_transferred);
367 if(impl.check_stop_now(ec))
368 goto upcall;
369 impl.reset_idle();
370 if(impl.rd_fh.mask)
371 detail::mask_inplace(buffers_prefix(clamp(
372 impl.rd_remain), impl.rd_buf.data()),
373 impl.rd_key);
374 }
375 if(impl.rd_buf.size() > 0)
376 {
377 // Copy from the read buffer.
378 // The mask was already applied.
379 bytes_transferred = net::buffer_copy(cb_,
380 impl.rd_buf.data(), clamp(impl.rd_remain));
381 auto const mb = buffers_prefix(
382 bytes_transferred, cb_);
383 impl.rd_remain -= bytes_transferred;
384 if(impl.rd_op == detail::opcode::text)
385 {
386 if(! impl.rd_utf8.write(mb) ||
387 (impl.rd_remain == 0 && impl.rd_fh.fin &&
388 ! impl.rd_utf8.finish()))
389 {
390 // _Fail the WebSocket Connection_
391 code_ = close_code::bad_payload;
392 result_ = error::bad_frame_payload;
393 goto close;
394 }
395 }
396 bytes_written_ += bytes_transferred;
397 impl.rd_size += bytes_transferred;
398 impl.rd_buf.consume(bytes_transferred);
399 }
400 else
401 {
402 // Read into caller's buffer
403 BOOST_ASSERT(impl.rd_remain > 0);
404 BOOST_ASSERT(buffer_bytes(cb_) > 0);
405 BOOST_ASSERT(buffer_bytes(buffers_prefix(
406 clamp(impl.rd_remain), cb_)) > 0);
407 BOOST_ASIO_CORO_YIELD
408 impl.stream().async_read_some(buffers_prefix(
409 clamp(impl.rd_remain), cb_), std::move(*this));
410 if(impl.check_stop_now(ec))
411 goto upcall;
412 impl.reset_idle();
413 BOOST_ASSERT(bytes_transferred > 0);
414 auto const mb = buffers_prefix(
415 bytes_transferred, cb_);
416 impl.rd_remain -= bytes_transferred;
417 if(impl.rd_fh.mask)
418 detail::mask_inplace(mb, impl.rd_key);
419 if(impl.rd_op == detail::opcode::text)
420 {
421 if(! impl.rd_utf8.write(mb) ||
422 (impl.rd_remain == 0 && impl.rd_fh.fin &&
423 ! impl.rd_utf8.finish()))
424 {
425 // _Fail the WebSocket Connection_
426 code_ = close_code::bad_payload;
427 result_ = error::bad_frame_payload;
428 goto close;
429 }
430 }
431 bytes_written_ += bytes_transferred;
432 impl.rd_size += bytes_transferred;
433 }
434 }
435 impl.rd_done = impl.rd_remain == 0 && impl.rd_fh.fin;
436 }
437 else
438 {
439 // Read compressed message frame payload:
440 // inflate even if rd_fh_.len == 0, otherwise we
441 // never emit the end-of-stream deflate block.
442 while(buffer_bytes(cb_) > 0)
443 {
444 if( impl.rd_remain > 0 &&
445 impl.rd_buf.size() == 0 &&
446 ! did_read_)
447 {
448 // read new
449 BOOST_ASIO_CORO_YIELD
450 impl.stream().async_read_some(
451 impl.rd_buf.prepare(read_size(
452 impl.rd_buf, impl.rd_buf.max_size())),
453 std::move(*this));
454 if(impl.check_stop_now(ec))
455 goto upcall;
456 impl.reset_idle();
457 BOOST_ASSERT(bytes_transferred > 0);
458 impl.rd_buf.commit(bytes_transferred);
459 if(impl.rd_fh.mask)
460 detail::mask_inplace(
461 buffers_prefix(clamp(impl.rd_remain),
462 impl.rd_buf.data()), impl.rd_key);
463 did_read_ = true;
464 }
465 zlib::z_params zs;
466 {
467 auto const out = buffers_front(cb_);
468 zs.next_out = out.data();
469 zs.avail_out = out.size();
470 BOOST_ASSERT(zs.avail_out > 0);
471 }
472 if(impl.rd_remain > 0)
473 {
474 if(impl.rd_buf.size() > 0)
475 {
476 // use what's there
477 auto const in = buffers_prefix(
478 clamp(impl.rd_remain), buffers_front(
479 impl.rd_buf.data()));
480 zs.avail_in = in.size();
481 zs.next_in = in.data();
482 }
483 else
484 {
485 break;
486 }
487 }
488 else if(impl.rd_fh.fin)
489 {
490 // append the empty block codes
491 std::uint8_t constexpr
492 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
493 zs.next_in = empty_block;
494 zs.avail_in = sizeof(empty_block);
495 impl.inflate(zs, zlib::Flush::sync, ec);
496 if(! ec)
497 {
498 // https://github.com/madler/zlib/issues/280
499 if(zs.total_out > 0)
500 ec = error::partial_deflate_block;
501 }
502 if(impl.check_stop_now(ec))
503 goto upcall;
504 impl.do_context_takeover_read(impl.role);
505 impl.rd_done = true;
506 break;
507 }
508 else
509 {
510 break;
511 }
512 impl.inflate(zs, zlib::Flush::sync, ec);
513 if(impl.check_stop_now(ec))
514 goto upcall;
515 if(impl.rd_msg_max && beast::detail::sum_exceeds(
516 impl.rd_size, zs.total_out, impl.rd_msg_max))
517 {
518 // _Fail the WebSocket Connection_
519 code_ = close_code::too_big;
520 result_ = error::message_too_big;
521 goto close;
522 }
523 cb_.consume(zs.total_out);
524 impl.rd_size += zs.total_out;
525 impl.rd_remain -= zs.total_in;
526 impl.rd_buf.consume(zs.total_in);
527 bytes_written_ += zs.total_out;
528 }
529 if(impl.rd_op == detail::opcode::text)
530 {
531 // check utf8
532 if(! impl.rd_utf8.write(
533 buffers_prefix(bytes_written_, bs_)) || (
534 impl.rd_done && ! impl.rd_utf8.finish()))
535 {
536 // _Fail the WebSocket Connection_
537 code_ = close_code::bad_payload;
538 result_ = error::bad_frame_payload;
539 goto close;
540 }
541 }
542 }
543 goto upcall;
544
545 close:
546 // Acquire the write lock
547 if(! impl.wr_block.try_lock(this))
548 {
549 BOOST_ASIO_CORO_YIELD
550 impl.op_rd.emplace(std::move(*this));
551 impl.wr_block.lock(this);
552 BOOST_ASIO_CORO_YIELD
553 net::post(std::move(*this));
554 BOOST_ASSERT(impl.wr_block.is_locked(this));
555 if(impl.check_stop_now(ec))
556 goto upcall;
557 }
558
559 impl.change_status(status::closing);
560
561 if(! impl.wr_close)
562 {
563 impl.wr_close = true;
564
565 // Serialize close frame
566 impl.rd_fb.clear();
567 impl.template write_close<
568 flat_static_buffer_base>(
569 impl.rd_fb, code_);
570
571 // Send close frame
572 BOOST_ASSERT(impl.wr_block.is_locked(this));
573 BOOST_ASIO_CORO_YIELD
574 net::async_write(impl.stream(), impl.rd_fb.data(),
575 beast::detail::bind_continuation(std::move(*this)));
576 BOOST_ASSERT(impl.wr_block.is_locked(this));
577 if(impl.check_stop_now(ec))
578 goto upcall;
579 }
580
581 // Teardown
582 using beast::websocket::async_teardown;
583 BOOST_ASSERT(impl.wr_block.is_locked(this));
584 BOOST_ASIO_CORO_YIELD
585 async_teardown(impl.role, impl.stream(),
586 beast::detail::bind_continuation(std::move(*this)));
587 BOOST_ASSERT(impl.wr_block.is_locked(this));
588 if(ec == net::error::eof)
589 {
590 // Rationale:
591 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
592 ec = {};
593 }
594 if(! ec)
595 ec = result_;
596 if(ec && ec != error::closed)
597 impl.change_status(status::failed);
598 else
599 impl.change_status(status::closed);
600 impl.close();
601
602 upcall:
603 impl.rd_block.try_unlock(this);
604 impl.op_r_close.maybe_invoke();
605 if(impl.wr_block.try_unlock(this))
606 impl.op_close.maybe_invoke()
607 || impl.op_idle_ping.maybe_invoke()
608 || impl.op_ping.maybe_invoke()
609 || impl.op_wr.maybe_invoke();
610 this->complete(cont, ec, bytes_written_);
611 }
612 }
613 };
614
615 //------------------------------------------------------------------------------
616
617 template<class NextLayer, bool deflateSupported>
618 template<class Handler, class DynamicBuffer>
619 class stream<NextLayer, deflateSupported>::read_op
620 : public beast::async_base<
621 Handler, beast::executor_type<stream>>
622 , public asio::coroutine
623 {
624 boost::weak_ptr<impl_type> wp_;
625 DynamicBuffer& b_;
626 std::size_t limit_;
627 std::size_t bytes_written_ = 0;
628 bool some_;
629
630 public:
631 template<class Handler_>
632 read_op(
633 Handler_&& h,
634 boost::shared_ptr<impl_type> const& sp,
635 DynamicBuffer& b,
636 std::size_t limit,
637 bool some)
638 : async_base<Handler,
639 beast::executor_type<stream>>(
640 std::forward<Handler_>(h),
641 sp->stream().get_executor())
642 , wp_(sp)
643 , b_(b)
644 , limit_(limit ? limit : (
645 std::numeric_limits<std::size_t>::max)())
646 , some_(some)
647 {
648 (*this)({}, 0, false);
649 }
650
651 void operator()(
652 error_code ec = {},
653 std::size_t bytes_transferred = 0,
654 bool cont = true)
655 {
656 using beast::detail::clamp;
657 auto sp = wp_.lock();
658 if(! sp)
659 {
660 ec = net::error::operation_aborted;
661 bytes_written_ = 0;
662 return this->complete(cont, ec, bytes_written_);
663 }
664 auto& impl = *sp;
665 using mutable_buffers_type = typename
666 DynamicBuffer::mutable_buffers_type;
667 BOOST_ASIO_CORO_REENTER(*this)
668 {
669 do
670 {
671 // VFALCO TODO use boost::beast::bind_continuation
672 BOOST_ASIO_CORO_YIELD
673 {
674 auto mb = beast::detail::dynamic_buffer_prepare(b_,
675 clamp(impl.read_size_hint_db(b_), limit_),
676 ec, error::buffer_overflow);
677 if(impl.check_stop_now(ec))
678 goto upcall;
679 read_some_op<read_op, mutable_buffers_type>(
680 std::move(*this), sp, *mb);
681 }
682
683 b_.commit(bytes_transferred);
684 bytes_written_ += bytes_transferred;
685 if(ec)
686 goto upcall;
687 }
688 while(! some_ && ! impl.rd_done);
689
690 upcall:
691 this->complete(cont, ec, bytes_written_);
692 }
693 }
694 };
695
696 template<class NextLayer, bool deflateSupported>
697 struct stream<NextLayer, deflateSupported>::
698 run_read_some_op
699 {
700 template<
701 class ReadHandler,
702 class MutableBufferSequence>
703 void
704 operator()(
705 ReadHandler&& h,
706 boost::shared_ptr<impl_type> const& sp,
707 MutableBufferSequence const& b)
708 {
709 // If you get an error on the following line it means
710 // that your handler does not meet the documented type
711 // requirements for the handler.
712
713 static_assert(
714 beast::detail::is_invocable<ReadHandler,
715 void(error_code, std::size_t)>::value,
716 "ReadHandler type requirements not met");
717
718 read_some_op<
719 typename std::decay<ReadHandler>::type,
720 MutableBufferSequence>(
721 std::forward<ReadHandler>(h),
722 sp,
723 b);
724 }
725 };
726
727 template<class NextLayer, bool deflateSupported>
728 struct stream<NextLayer, deflateSupported>::
729 run_read_op
730 {
731 template<
732 class ReadHandler,
733 class DynamicBuffer>
734 void
735 operator()(
736 ReadHandler&& h,
737 boost::shared_ptr<impl_type> const& sp,
738 DynamicBuffer* b,
739 std::size_t limit,
740 bool some)
741 {
742 // If you get an error on the following line it means
743 // that your handler does not meet the documented type
744 // requirements for the handler.
745
746 static_assert(
747 beast::detail::is_invocable<ReadHandler,
748 void(error_code, std::size_t)>::value,
749 "ReadHandler type requirements not met");
750
751 read_op<
752 typename std::decay<ReadHandler>::type,
753 DynamicBuffer>(
754 std::forward<ReadHandler>(h),
755 sp,
756 *b,
757 limit,
758 some);
759 }
760 };
761
762 //------------------------------------------------------------------------------
763
764 template<class NextLayer, bool deflateSupported>
765 template<class DynamicBuffer>
766 std::size_t
767 stream<NextLayer, deflateSupported>::
768 read(DynamicBuffer& buffer)
769 {
770 static_assert(is_sync_stream<next_layer_type>::value,
771 "SyncStream type requirements not met");
772 static_assert(
773 net::is_dynamic_buffer<DynamicBuffer>::value,
774 "DynamicBuffer type requirements not met");
775 error_code ec;
776 auto const bytes_written = read(buffer, ec);
777 if(ec)
778 BOOST_THROW_EXCEPTION(system_error{ec});
779 return bytes_written;
780 }
781
782 template<class NextLayer, bool deflateSupported>
783 template<class DynamicBuffer>
784 std::size_t
785 stream<NextLayer, deflateSupported>::
786 read(DynamicBuffer& buffer, error_code& ec)
787 {
788 static_assert(is_sync_stream<next_layer_type>::value,
789 "SyncStream type requirements not met");
790 static_assert(
791 net::is_dynamic_buffer<DynamicBuffer>::value,
792 "DynamicBuffer type requirements not met");
793 std::size_t bytes_written = 0;
794 do
795 {
796 bytes_written += read_some(buffer, 0, ec);
797 if(ec)
798 return bytes_written;
799 }
800 while(! is_message_done());
801 return bytes_written;
802 }
803
804 template<class NextLayer, bool deflateSupported>
805 template<class DynamicBuffer, class ReadHandler>
806 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
807 stream<NextLayer, deflateSupported>::
808 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
809 {
810 static_assert(is_async_stream<next_layer_type>::value,
811 "AsyncStream type requirements not met");
812 static_assert(
813 net::is_dynamic_buffer<DynamicBuffer>::value,
814 "DynamicBuffer type requirements not met");
815 return net::async_initiate<
816 ReadHandler,
817 void(error_code, std::size_t)>(
818 run_read_op{},
819 handler,
820 impl_,
821 &buffer,
822 0,
823 false);
824 }
825
826 //------------------------------------------------------------------------------
827
828 template<class NextLayer, bool deflateSupported>
829 template<class DynamicBuffer>
830 std::size_t
831 stream<NextLayer, deflateSupported>::
832 read_some(
833 DynamicBuffer& buffer,
834 std::size_t limit)
835 {
836 static_assert(is_sync_stream<next_layer_type>::value,
837 "SyncStream type requirements not met");
838 static_assert(
839 net::is_dynamic_buffer<DynamicBuffer>::value,
840 "DynamicBuffer type requirements not met");
841 error_code ec;
842 auto const bytes_written =
843 read_some(buffer, limit, ec);
844 if(ec)
845 BOOST_THROW_EXCEPTION(system_error{ec});
846 return bytes_written;
847 }
848
849 template<class NextLayer, bool deflateSupported>
850 template<class DynamicBuffer>
851 std::size_t
852 stream<NextLayer, deflateSupported>::
853 read_some(
854 DynamicBuffer& buffer,
855 std::size_t limit,
856 error_code& ec)
857 {
858 static_assert(is_sync_stream<next_layer_type>::value,
859 "SyncStream type requirements not met");
860 static_assert(
861 net::is_dynamic_buffer<DynamicBuffer>::value,
862 "DynamicBuffer type requirements not met");
863 using beast::detail::clamp;
864 if(! limit)
865 limit = (std::numeric_limits<std::size_t>::max)();
866 auto const size =
867 clamp(read_size_hint(buffer), limit);
868 BOOST_ASSERT(size > 0);
869 auto mb = beast::detail::dynamic_buffer_prepare(
870 buffer, size, ec, error::buffer_overflow);
871 if(impl_->check_stop_now(ec))
872 return 0;
873 auto const bytes_written = read_some(*mb, ec);
874 buffer.commit(bytes_written);
875 return bytes_written;
876 }
877
878 template<class NextLayer, bool deflateSupported>
879 template<class DynamicBuffer, class ReadHandler>
880 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
881 stream<NextLayer, deflateSupported>::
882 async_read_some(
883 DynamicBuffer& buffer,
884 std::size_t limit,
885 ReadHandler&& handler)
886 {
887 static_assert(is_async_stream<next_layer_type>::value,
888 "AsyncStream type requirements not met");
889 static_assert(
890 net::is_dynamic_buffer<DynamicBuffer>::value,
891 "DynamicBuffer type requirements not met");
892 return net::async_initiate<
893 ReadHandler,
894 void(error_code, std::size_t)>(
895 run_read_op{},
896 handler,
897 impl_,
898 &buffer,
899 limit,
900 true);
901 }
902
903 //------------------------------------------------------------------------------
904
905 template<class NextLayer, bool deflateSupported>
906 template<class MutableBufferSequence>
907 std::size_t
908 stream<NextLayer, deflateSupported>::
909 read_some(
910 MutableBufferSequence const& buffers)
911 {
912 static_assert(is_sync_stream<next_layer_type>::value,
913 "SyncStream type requirements not met");
914 static_assert(net::is_mutable_buffer_sequence<
915 MutableBufferSequence>::value,
916 "MutableBufferSequence type requirements not met");
917 error_code ec;
918 auto const bytes_written = read_some(buffers, ec);
919 if(ec)
920 BOOST_THROW_EXCEPTION(system_error{ec});
921 return bytes_written;
922 }
923
924 template<class NextLayer, bool deflateSupported>
925 template<class MutableBufferSequence>
926 std::size_t
927 stream<NextLayer, deflateSupported>::
928 read_some(
929 MutableBufferSequence const& buffers,
930 error_code& ec)
931 {
932 static_assert(is_sync_stream<next_layer_type>::value,
933 "SyncStream type requirements not met");
934 static_assert(net::is_mutable_buffer_sequence<
935 MutableBufferSequence>::value,
936 "MutableBufferSequence type requirements not met");
937 using beast::detail::clamp;
938 auto& impl = *impl_;
939 close_code code{};
940 std::size_t bytes_written = 0;
941 ec = {};
942 // Make sure the stream is open
943 if(impl.check_stop_now(ec))
944 return bytes_written;
945 loop:
946 // See if we need to read a frame header. This
947 // condition is structured to give the decompressor
948 // a chance to emit the final empty deflate block
949 //
950 if(impl.rd_remain == 0 && (
951 ! impl.rd_fh.fin || impl.rd_done))
952 {
953 // Read frame header
954 error_code result;
955 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
956 {
957 if(result)
958 {
959 // _Fail the WebSocket Connection_
960 if(result == error::message_too_big)
961 code = close_code::too_big;
962 else
963 code = close_code::protocol_error;
964 do_fail(code, result, ec);
965 return bytes_written;
966 }
967 auto const bytes_transferred =
968 impl.stream().read_some(
969 impl.rd_buf.prepare(read_size(
970 impl.rd_buf, impl.rd_buf.max_size())),
971 ec);
972 impl.rd_buf.commit(bytes_transferred);
973 if(impl.check_stop_now(ec))
974 return bytes_written;
975 }
976 // Immediately apply the mask to the portion
977 // of the buffer holding payload data.
978 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
979 detail::mask_inplace(buffers_prefix(
980 clamp(impl.rd_fh.len), impl.rd_buf.data()),
981 impl.rd_key);
982 if(detail::is_control(impl.rd_fh.op))
983 {
984 // Get control frame payload
985 auto const b = buffers_prefix(
986 clamp(impl.rd_fh.len), impl.rd_buf.data());
987 auto const len = buffer_bytes(b);
988 BOOST_ASSERT(len == impl.rd_fh.len);
989
990 // Clear this otherwise the next
991 // frame will be considered final.
992 impl.rd_fh.fin = false;
993
994 // Handle ping frame
995 if(impl.rd_fh.op == detail::opcode::ping)
996 {
997 ping_data payload;
998 detail::read_ping(payload, b);
999 impl.rd_buf.consume(len);
1000 if(impl.wr_close)
1001 {
1002 // Ignore ping when closing
1003 goto loop;
1004 }
1005 if(impl.ctrl_cb)
1006 impl.ctrl_cb(frame_type::ping, payload);
1007 detail::frame_buffer fb;
1008 impl.template write_ping<flat_static_buffer_base>(fb,
1009 detail::opcode::pong, payload);
1010 net::write(impl.stream(), fb.data(), ec);
1011 if(impl.check_stop_now(ec))
1012 return bytes_written;
1013 goto loop;
1014 }
1015 // Handle pong frame
1016 if(impl.rd_fh.op == detail::opcode::pong)
1017 {
1018 ping_data payload;
1019 detail::read_ping(payload, b);
1020 impl.rd_buf.consume(len);
1021 if(impl.ctrl_cb)
1022 impl.ctrl_cb(frame_type::pong, payload);
1023 goto loop;
1024 }
1025 // Handle close frame
1026 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1027 {
1028 BOOST_ASSERT(! impl.rd_close);
1029 impl.rd_close = true;
1030 close_reason cr;
1031 detail::read_close(cr, b, result);
1032 if(result)
1033 {
1034 // _Fail the WebSocket Connection_
1035 do_fail(close_code::protocol_error,
1036 result, ec);
1037 return bytes_written;
1038 }
1039 impl.cr = cr;
1040 impl.rd_buf.consume(len);
1041 if(impl.ctrl_cb)
1042 impl.ctrl_cb(frame_type::close, impl.cr.reason);
1043 BOOST_ASSERT(! impl.wr_close);
1044 // _Start the WebSocket Closing Handshake_
1045 do_fail(
1046 cr.code == close_code::none ?
1047 close_code::normal :
1048 static_cast<close_code>(cr.code),
1049 error::closed, ec);
1050 return bytes_written;
1051 }
1052 }
1053 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1054 {
1055 // Empty non-final frame
1056 goto loop;
1057 }
1058 impl.rd_done = false;
1059 }
1060 else
1061 {
1062 ec = {};
1063 }
1064 if(! impl.rd_deflated())
1065 {
1066 if(impl.rd_remain > 0)
1067 {
1068 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1069 (std::min)(clamp(impl.rd_remain),
1070 buffer_bytes(buffers)))
1071 {
1072 // Fill the read buffer first, otherwise we
1073 // get fewer bytes at the cost of one I/O.
1074 impl.rd_buf.commit(impl.stream().read_some(
1075 impl.rd_buf.prepare(read_size(impl.rd_buf,
1076 impl.rd_buf.max_size())), ec));
1077 if(impl.check_stop_now(ec))
1078 return bytes_written;
1079 if(impl.rd_fh.mask)
1080 detail::mask_inplace(
1081 buffers_prefix(clamp(impl.rd_remain),
1082 impl.rd_buf.data()), impl.rd_key);
1083 }
1084 if(impl.rd_buf.size() > 0)
1085 {
1086 // Copy from the read buffer.
1087 // The mask was already applied.
1088 auto const bytes_transferred = net::buffer_copy(
1089 buffers, impl.rd_buf.data(),
1090 clamp(impl.rd_remain));
1091 auto const mb = buffers_prefix(
1092 bytes_transferred, buffers);
1093 impl.rd_remain -= bytes_transferred;
1094 if(impl.rd_op == detail::opcode::text)
1095 {
1096 if(! impl.rd_utf8.write(mb) ||
1097 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1098 ! impl.rd_utf8.finish()))
1099 {
1100 // _Fail the WebSocket Connection_
1101 do_fail(close_code::bad_payload,
1102 error::bad_frame_payload, ec);
1103 return bytes_written;
1104 }
1105 }
1106 bytes_written += bytes_transferred;
1107 impl.rd_size += bytes_transferred;
1108 impl.rd_buf.consume(bytes_transferred);
1109 }
1110 else
1111 {
1112 // Read into caller's buffer
1113 BOOST_ASSERT(impl.rd_remain > 0);
1114 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1115 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1116 clamp(impl.rd_remain), buffers)) > 0);
1117 auto const bytes_transferred =
1118 impl.stream().read_some(buffers_prefix(
1119 clamp(impl.rd_remain), buffers), ec);
1120 // VFALCO What if some bytes were written?
1121 if(impl.check_stop_now(ec))
1122 return bytes_written;
1123 BOOST_ASSERT(bytes_transferred > 0);
1124 auto const mb = buffers_prefix(
1125 bytes_transferred, buffers);
1126 impl.rd_remain -= bytes_transferred;
1127 if(impl.rd_fh.mask)
1128 detail::mask_inplace(mb, impl.rd_key);
1129 if(impl.rd_op == detail::opcode::text)
1130 {
1131 if(! impl.rd_utf8.write(mb) ||
1132 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1133 ! impl.rd_utf8.finish()))
1134 {
1135 // _Fail the WebSocket Connection_
1136 do_fail(close_code::bad_payload,
1137 error::bad_frame_payload, ec);
1138 return bytes_written;
1139 }
1140 }
1141 bytes_written += bytes_transferred;
1142 impl.rd_size += bytes_transferred;
1143 }
1144 }
1145 impl.rd_done = impl.rd_remain == 0 && impl.rd_fh.fin;
1146 }
1147 else
1148 {
1149 // Read compressed message frame payload:
1150 // inflate even if rd_fh_.len == 0, otherwise we
1151 // never emit the end-of-stream deflate block.
1152 //
1153 bool did_read = false;
1154 buffers_suffix<MutableBufferSequence> cb(buffers);
1155 while(buffer_bytes(cb) > 0)
1156 {
1157 zlib::z_params zs;
1158 {
1159 auto const out = beast::buffers_front(cb);
1160 zs.next_out = out.data();
1161 zs.avail_out = out.size();
1162 BOOST_ASSERT(zs.avail_out > 0);
1163 }
1164 if(impl.rd_remain > 0)
1165 {
1166 if(impl.rd_buf.size() > 0)
1167 {
1168 // use what's there
1169 auto const in = buffers_prefix(
1170 clamp(impl.rd_remain), beast::buffers_front(
1171 impl.rd_buf.data()));
1172 zs.avail_in = in.size();
1173 zs.next_in = in.data();
1174 }
1175 else if(! did_read)
1176 {
1177 // read new
1178 auto const bytes_transferred =
1179 impl.stream().read_some(
1180 impl.rd_buf.prepare(read_size(
1181 impl.rd_buf, impl.rd_buf.max_size())),
1182 ec);
1183 if(impl.check_stop_now(ec))
1184 return bytes_written;
1185 BOOST_ASSERT(bytes_transferred > 0);
1186 impl.rd_buf.commit(bytes_transferred);
1187 if(impl.rd_fh.mask)
1188 detail::mask_inplace(
1189 buffers_prefix(clamp(impl.rd_remain),
1190 impl.rd_buf.data()), impl.rd_key);
1191 auto const in = buffers_prefix(
1192 clamp(impl.rd_remain), buffers_front(
1193 impl.rd_buf.data()));
1194 zs.avail_in = in.size();
1195 zs.next_in = in.data();
1196 did_read = true;
1197 }
1198 else
1199 {
1200 break;
1201 }
1202 }
1203 else if(impl.rd_fh.fin)
1204 {
1205 // append the empty block codes
1206 static std::uint8_t constexpr
1207 empty_block[4] = {
1208 0x00, 0x00, 0xff, 0xff };
1209 zs.next_in = empty_block;
1210 zs.avail_in = sizeof(empty_block);
1211 impl.inflate(zs, zlib::Flush::sync, ec);
1212 if(! ec)
1213 {
1214 // https://github.com/madler/zlib/issues/280
1215 if(zs.total_out > 0)
1216 ec = error::partial_deflate_block;
1217 }
1218 if(impl.check_stop_now(ec))
1219 return bytes_written;
1220 impl.do_context_takeover_read(impl.role);
1221 impl.rd_done = true;
1222 break;
1223 }
1224 else
1225 {
1226 break;
1227 }
1228 impl.inflate(zs, zlib::Flush::sync, ec);
1229 if(impl.check_stop_now(ec))
1230 return bytes_written;
1231 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1232 impl.rd_size, zs.total_out, impl.rd_msg_max))
1233 {
1234 do_fail(close_code::too_big,
1235 error::message_too_big, ec);
1236 return bytes_written;
1237 }
1238 cb.consume(zs.total_out);
1239 impl.rd_size += zs.total_out;
1240 impl.rd_remain -= zs.total_in;
1241 impl.rd_buf.consume(zs.total_in);
1242 bytes_written += zs.total_out;
1243 }
1244 if(impl.rd_op == detail::opcode::text)
1245 {
1246 // check utf8
1247 if(! impl.rd_utf8.write(beast::buffers_prefix(
1248 bytes_written, buffers)) || (
1249 impl.rd_done && ! impl.rd_utf8.finish()))
1250 {
1251 // _Fail the WebSocket Connection_
1252 do_fail(close_code::bad_payload,
1253 error::bad_frame_payload, ec);
1254 return bytes_written;
1255 }
1256 }
1257 }
1258 return bytes_written;
1259 }
1260
1261 template<class NextLayer, bool deflateSupported>
1262 template<class MutableBufferSequence, class ReadHandler>
1263 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1264 stream<NextLayer, deflateSupported>::
1265 async_read_some(
1266 MutableBufferSequence const& buffers,
1267 ReadHandler&& handler)
1268 {
1269 static_assert(is_async_stream<next_layer_type>::value,
1270 "AsyncStream type requirements not met");
1271 static_assert(net::is_mutable_buffer_sequence<
1272 MutableBufferSequence>::value,
1273 "MutableBufferSequence type requirements not met");
1274 return net::async_initiate<
1275 ReadHandler,
1276 void(error_code, std::size_t)>(
1277 run_read_some_op{},
1278 handler,
1279 impl_,
1280 buffers);
1281 }
1282
1283 } // websocket
1284 } // beast
1285 } // boost
1286
1287 #endif