]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/beast/websocket/impl/read.hpp
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / read.hpp
CommitLineData
92f5a8d4
TL
1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
11#define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
12
13#include <boost/beast/core/buffer_traits.hpp>
14#include <boost/beast/websocket/teardown.hpp>
15#include <boost/beast/websocket/detail/mask.hpp>
16#include <boost/beast/websocket/impl/stream_impl.hpp>
17#include <boost/beast/core/async_base.hpp>
18#include <boost/beast/core/bind_handler.hpp>
19#include <boost/beast/core/buffers_prefix.hpp>
20#include <boost/beast/core/buffers_suffix.hpp>
21#include <boost/beast/core/flat_static_buffer.hpp>
22#include <boost/beast/core/read_size.hpp>
23#include <boost/beast/core/stream_traits.hpp>
24#include <boost/beast/core/detail/bind_continuation.hpp>
25#include <boost/beast/core/detail/buffer.hpp>
26#include <boost/beast/core/detail/clamp.hpp>
27#include <boost/beast/core/detail/config.hpp>
28#include <boost/asio/coroutine.hpp>
29#include <boost/asio/post.hpp>
30#include <boost/assert.hpp>
31#include <boost/config.hpp>
32#include <boost/optional.hpp>
33#include <boost/throw_exception.hpp>
34#include <algorithm>
35#include <limits>
36#include <memory>
37
38namespace boost {
39namespace beast {
40namespace websocket {
41
42/* Read some message data into a buffer sequence.
43
44 Also reads and handles control frames.
45*/
46template<class NextLayer, bool deflateSupported>
47template<class Handler, class MutableBufferSequence>
48class stream<NextLayer, deflateSupported>::read_some_op
49 : public beast::async_base<
50 Handler, beast::executor_type<stream>>
51 , public asio::coroutine
52{
53 boost::weak_ptr<impl_type> wp_;
54 MutableBufferSequence bs_;
55 buffers_suffix<MutableBufferSequence> cb_;
56 std::size_t bytes_written_ = 0;
57 error_code result_;
58 close_code code_;
59 bool did_read_ = false;
60
61public:
62 static constexpr int id = 1; // for soft_mutex
63
64 template<class Handler_>
65 read_some_op(
66 Handler_&& h,
67 boost::shared_ptr<impl_type> const& sp,
68 MutableBufferSequence const& bs)
69 : async_base<
70 Handler, beast::executor_type<stream>>(
71 std::forward<Handler_>(h),
72 sp->stream().get_executor())
73 , wp_(sp)
74 , bs_(bs)
75 , cb_(bs)
76 , code_(close_code::none)
77 {
78 (*this)({}, 0, false);
79 }
80
81 void operator()(
82 error_code ec = {},
83 std::size_t bytes_transferred = 0,
84 bool cont = true)
85 {
86 using beast::detail::clamp;
87 auto sp = wp_.lock();
88 if(! sp)
89 {
90 ec = net::error::operation_aborted;
91 bytes_written_ = 0;
92 return this->complete(cont, ec, bytes_written_);
93 }
94 auto& impl = *sp;
95 BOOST_ASIO_CORO_REENTER(*this)
96 {
97 impl.update_timer(this->get_executor());
98
99 acquire_read_lock:
100 // Acquire the read lock
101 if(! impl.rd_block.try_lock(this))
102 {
103 do_suspend:
104 BOOST_ASIO_CORO_YIELD
105 impl.op_r_rd.emplace(std::move(*this));
106 impl.rd_block.lock(this);
107 BOOST_ASIO_CORO_YIELD
108 net::post(std::move(*this));
109 BOOST_ASSERT(impl.rd_block.is_locked(this));
110
111 // VFALCO Is this check correct here?
112 BOOST_ASSERT(! ec && impl.check_stop_now(ec));
113 if(impl.check_stop_now(ec))
114 {
115 BOOST_ASSERT(ec == net::error::operation_aborted);
116 goto upcall;
117 }
118 // VFALCO Should never get here
119
120 // The only way to get read blocked is if
121 // a `close_op` wrote a close frame
122 BOOST_ASSERT(impl.wr_close);
123 BOOST_ASSERT(impl.status_ != status::open);
124 ec = net::error::operation_aborted;
125 goto upcall;
126 }
127 else
128 {
129 // Make sure the stream is not closed
130 if( impl.status_ == status::closed ||
131 impl.status_ == status::failed)
132 {
133 ec = net::error::operation_aborted;
134 goto upcall;
135 }
136 }
137
138 // if status_ == status::closing, we want to suspend
139 // the read operation until the close completes,
140 // then finish the read with operation_aborted.
141
142 loop:
143 BOOST_ASSERT(impl.rd_block.is_locked(this));
144 // See if we need to read a frame header. This
145 // condition is structured to give the decompressor
146 // a chance to emit the final empty deflate block
147 //
148 if(impl.rd_remain == 0 &&
149 (! impl.rd_fh.fin || impl.rd_done))
150 {
151 // Read frame header
152 while(! impl.parse_fh(
153 impl.rd_fh, impl.rd_buf, result_))
154 {
155 if(result_)
156 {
157 // _Fail the WebSocket Connection_
158 if(result_ == error::message_too_big)
159 code_ = close_code::too_big;
160 else
161 code_ = close_code::protocol_error;
162 goto close;
163 }
164 BOOST_ASSERT(impl.rd_block.is_locked(this));
165 BOOST_ASIO_CORO_YIELD
166 impl.stream().async_read_some(
167 impl.rd_buf.prepare(read_size(
168 impl.rd_buf, impl.rd_buf.max_size())),
169 std::move(*this));
170 BOOST_ASSERT(impl.rd_block.is_locked(this));
171 impl.rd_buf.commit(bytes_transferred);
172 if(impl.check_stop_now(ec))
173 goto upcall;
174 impl.reset_idle();
175
176 // Allow a close operation
177 // to acquire the read block
178 impl.rd_block.unlock(this);
179 if( impl.op_r_close.maybe_invoke())
180 {
181 // Suspend
182 BOOST_ASSERT(impl.rd_block.is_locked());
183 goto do_suspend;
184 }
185 // Acquire read block
186 impl.rd_block.lock(this);
187 }
188 // Immediately apply the mask to the portion
189 // of the buffer holding payload data.
190 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
191 detail::mask_inplace(buffers_prefix(
192 clamp(impl.rd_fh.len),
193 impl.rd_buf.data()),
194 impl.rd_key);
195 if(detail::is_control(impl.rd_fh.op))
196 {
197 // Clear this otherwise the next
198 // frame will be considered final.
199 impl.rd_fh.fin = false;
200
201 // Handle ping frame
202 if(impl.rd_fh.op == detail::opcode::ping)
203 {
204 if(impl.ctrl_cb)
205 {
206 if(! cont)
207 {
208 BOOST_ASIO_CORO_YIELD
209 net::post(std::move(*this));
210 BOOST_ASSERT(cont);
211 // VFALCO call check_stop_now() here?
212 }
213 }
214 {
215 auto const b = buffers_prefix(
216 clamp(impl.rd_fh.len),
217 impl.rd_buf.data());
218 auto const len = buffer_bytes(b);
219 BOOST_ASSERT(len == impl.rd_fh.len);
220 ping_data payload;
221 detail::read_ping(payload, b);
222 impl.rd_buf.consume(len);
223 // Ignore ping when closing
224 if(impl.status_ == status::closing)
225 goto loop;
226 if(impl.ctrl_cb)
227 impl.ctrl_cb(
228 frame_type::ping, payload);
229 impl.rd_fb.clear();
230 impl.template write_ping<
231 flat_static_buffer_base>(impl.rd_fb,
232 detail::opcode::pong, payload);
233 }
234
235 // Allow a close operation
236 // to acquire the read block
237 impl.rd_block.unlock(this);
238 impl.op_r_close.maybe_invoke();
239
240 // Acquire the write lock
241 if(! impl.wr_block.try_lock(this))
242 {
243 BOOST_ASIO_CORO_YIELD
244 impl.op_rd.emplace(std::move(*this));
245 impl.wr_block.lock(this);
246 BOOST_ASIO_CORO_YIELD
247 net::post(std::move(*this));
248 BOOST_ASSERT(impl.wr_block.is_locked(this));
249 if(impl.check_stop_now(ec))
250 goto upcall;
251 }
252
253 // Send pong
254 BOOST_ASSERT(impl.wr_block.is_locked(this));
255 BOOST_ASIO_CORO_YIELD
256 net::async_write(
257 impl.stream(), impl.rd_fb.data(),
258 beast::detail::bind_continuation(std::move(*this)));
259 BOOST_ASSERT(impl.wr_block.is_locked(this));
260 if(impl.check_stop_now(ec))
261 goto upcall;
262 impl.wr_block.unlock(this);
263 impl.op_close.maybe_invoke()
264 || impl.op_idle_ping.maybe_invoke()
265 || impl.op_ping.maybe_invoke()
266 || impl.op_wr.maybe_invoke();
267 goto acquire_read_lock;
268 }
269
270 // Handle pong frame
271 if(impl.rd_fh.op == detail::opcode::pong)
272 {
273 // Ignore pong when closing
274 if(! impl.wr_close && impl.ctrl_cb)
275 {
276 if(! cont)
277 {
278 BOOST_ASIO_CORO_YIELD
279 net::post(std::move(*this));
280 BOOST_ASSERT(cont);
281 }
282 }
283 auto const cb = buffers_prefix(clamp(
284 impl.rd_fh.len), impl.rd_buf.data());
285 auto const len = buffer_bytes(cb);
286 BOOST_ASSERT(len == impl.rd_fh.len);
287 ping_data payload;
288 detail::read_ping(payload, cb);
289 impl.rd_buf.consume(len);
290 // Ignore pong when closing
291 if(! impl.wr_close && impl.ctrl_cb)
292 impl.ctrl_cb(frame_type::pong, payload);
293 goto loop;
294 }
295
296 // Handle close frame
297 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
298 {
299 if(impl.ctrl_cb)
300 {
301 if(! cont)
302 {
303 BOOST_ASIO_CORO_YIELD
304 net::post(std::move(*this));
305 BOOST_ASSERT(cont);
306 }
307 }
308 auto const cb = buffers_prefix(clamp(
309 impl.rd_fh.len), impl.rd_buf.data());
310 auto const len = buffer_bytes(cb);
311 BOOST_ASSERT(len == impl.rd_fh.len);
312 BOOST_ASSERT(! impl.rd_close);
313 impl.rd_close = true;
314 close_reason cr;
315 detail::read_close(cr, cb, result_);
316 if(result_)
317 {
318 // _Fail the WebSocket Connection_
319 code_ = close_code::protocol_error;
320 goto close;
321 }
322 impl.cr = cr;
323 impl.rd_buf.consume(len);
324 if(impl.ctrl_cb)
325 impl.ctrl_cb(frame_type::close,
326 impl.cr.reason);
327 // See if we are already closing
328 if(impl.status_ == status::closing)
329 {
330 // _Close the WebSocket Connection_
331 BOOST_ASSERT(impl.wr_close);
332 code_ = close_code::none;
333 result_ = error::closed;
334 goto close;
335 }
336 // _Start the WebSocket Closing Handshake_
337 code_ = cr.code == close_code::none ?
338 close_code::normal :
339 static_cast<close_code>(cr.code);
340 result_ = error::closed;
341 goto close;
342 }
343 }
344 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
345 {
346 // Empty non-final frame
347 goto loop;
348 }
349 impl.rd_done = false;
350 }
351 if(! impl.rd_deflated())
352 {
353 if(impl.rd_remain > 0)
354 {
355 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
356 (std::min)(clamp(impl.rd_remain),
357 buffer_bytes(cb_)))
358 {
359 // Fill the read buffer first, otherwise we
360 // get fewer bytes at the cost of one I/O.
361 BOOST_ASIO_CORO_YIELD
362 impl.stream().async_read_some(
363 impl.rd_buf.prepare(read_size(
364 impl.rd_buf, impl.rd_buf.max_size())),
365 std::move(*this));
366 impl.rd_buf.commit(bytes_transferred);
367 if(impl.check_stop_now(ec))
368 goto upcall;
369 impl.reset_idle();
370 if(impl.rd_fh.mask)
371 detail::mask_inplace(buffers_prefix(clamp(
372 impl.rd_remain), impl.rd_buf.data()),
373 impl.rd_key);
374 }
375 if(impl.rd_buf.size() > 0)
376 {
377 // Copy from the read buffer.
378 // The mask was already applied.
379 bytes_transferred = net::buffer_copy(cb_,
380 impl.rd_buf.data(), clamp(impl.rd_remain));
381 auto const mb = buffers_prefix(
382 bytes_transferred, cb_);
383 impl.rd_remain -= bytes_transferred;
384 if(impl.rd_op == detail::opcode::text)
385 {
386 if(! impl.rd_utf8.write(mb) ||
387 (impl.rd_remain == 0 && impl.rd_fh.fin &&
388 ! impl.rd_utf8.finish()))
389 {
390 // _Fail the WebSocket Connection_
391 code_ = close_code::bad_payload;
392 result_ = error::bad_frame_payload;
393 goto close;
394 }
395 }
396 bytes_written_ += bytes_transferred;
397 impl.rd_size += bytes_transferred;
398 impl.rd_buf.consume(bytes_transferred);
399 }
400 else
401 {
402 // Read into caller's buffer
403 BOOST_ASSERT(impl.rd_remain > 0);
404 BOOST_ASSERT(buffer_bytes(cb_) > 0);
405 BOOST_ASSERT(buffer_bytes(buffers_prefix(
406 clamp(impl.rd_remain), cb_)) > 0);
407 BOOST_ASIO_CORO_YIELD
408 impl.stream().async_read_some(buffers_prefix(
409 clamp(impl.rd_remain), cb_), std::move(*this));
410 if(impl.check_stop_now(ec))
411 goto upcall;
412 impl.reset_idle();
413 BOOST_ASSERT(bytes_transferred > 0);
414 auto const mb = buffers_prefix(
415 bytes_transferred, cb_);
416 impl.rd_remain -= bytes_transferred;
417 if(impl.rd_fh.mask)
418 detail::mask_inplace(mb, impl.rd_key);
419 if(impl.rd_op == detail::opcode::text)
420 {
421 if(! impl.rd_utf8.write(mb) ||
422 (impl.rd_remain == 0 && impl.rd_fh.fin &&
423 ! impl.rd_utf8.finish()))
424 {
425 // _Fail the WebSocket Connection_
426 code_ = close_code::bad_payload;
427 result_ = error::bad_frame_payload;
428 goto close;
429 }
430 }
431 bytes_written_ += bytes_transferred;
432 impl.rd_size += bytes_transferred;
433 }
434 }
f67539c2
TL
435 BOOST_ASSERT( ! impl.rd_done );
436 if( impl.rd_remain == 0 && impl.rd_fh.fin )
437 impl.rd_done = true;
92f5a8d4
TL
438 }
439 else
440 {
441 // Read compressed message frame payload:
442 // inflate even if rd_fh_.len == 0, otherwise we
443 // never emit the end-of-stream deflate block.
444 while(buffer_bytes(cb_) > 0)
445 {
446 if( impl.rd_remain > 0 &&
447 impl.rd_buf.size() == 0 &&
448 ! did_read_)
449 {
450 // read new
451 BOOST_ASIO_CORO_YIELD
452 impl.stream().async_read_some(
453 impl.rd_buf.prepare(read_size(
454 impl.rd_buf, impl.rd_buf.max_size())),
455 std::move(*this));
456 if(impl.check_stop_now(ec))
457 goto upcall;
458 impl.reset_idle();
459 BOOST_ASSERT(bytes_transferred > 0);
460 impl.rd_buf.commit(bytes_transferred);
461 if(impl.rd_fh.mask)
462 detail::mask_inplace(
463 buffers_prefix(clamp(impl.rd_remain),
464 impl.rd_buf.data()), impl.rd_key);
465 did_read_ = true;
466 }
467 zlib::z_params zs;
468 {
469 auto const out = buffers_front(cb_);
470 zs.next_out = out.data();
471 zs.avail_out = out.size();
472 BOOST_ASSERT(zs.avail_out > 0);
473 }
474 if(impl.rd_remain > 0)
475 {
476 if(impl.rd_buf.size() > 0)
477 {
478 // use what's there
479 auto const in = buffers_prefix(
480 clamp(impl.rd_remain), buffers_front(
481 impl.rd_buf.data()));
482 zs.avail_in = in.size();
483 zs.next_in = in.data();
484 }
485 else
486 {
487 break;
488 }
489 }
490 else if(impl.rd_fh.fin)
491 {
492 // append the empty block codes
493 std::uint8_t constexpr
494 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
495 zs.next_in = empty_block;
496 zs.avail_in = sizeof(empty_block);
497 impl.inflate(zs, zlib::Flush::sync, ec);
498 if(! ec)
499 {
500 // https://github.com/madler/zlib/issues/280
501 if(zs.total_out > 0)
502 ec = error::partial_deflate_block;
503 }
504 if(impl.check_stop_now(ec))
505 goto upcall;
506 impl.do_context_takeover_read(impl.role);
507 impl.rd_done = true;
508 break;
509 }
510 else
511 {
512 break;
513 }
514 impl.inflate(zs, zlib::Flush::sync, ec);
515 if(impl.check_stop_now(ec))
516 goto upcall;
517 if(impl.rd_msg_max && beast::detail::sum_exceeds(
518 impl.rd_size, zs.total_out, impl.rd_msg_max))
519 {
520 // _Fail the WebSocket Connection_
521 code_ = close_code::too_big;
522 result_ = error::message_too_big;
523 goto close;
524 }
525 cb_.consume(zs.total_out);
526 impl.rd_size += zs.total_out;
527 impl.rd_remain -= zs.total_in;
528 impl.rd_buf.consume(zs.total_in);
529 bytes_written_ += zs.total_out;
530 }
531 if(impl.rd_op == detail::opcode::text)
532 {
533 // check utf8
534 if(! impl.rd_utf8.write(
535 buffers_prefix(bytes_written_, bs_)) || (
536 impl.rd_done && ! impl.rd_utf8.finish()))
537 {
538 // _Fail the WebSocket Connection_
539 code_ = close_code::bad_payload;
540 result_ = error::bad_frame_payload;
541 goto close;
542 }
543 }
544 }
545 goto upcall;
546
547 close:
548 // Acquire the write lock
549 if(! impl.wr_block.try_lock(this))
550 {
551 BOOST_ASIO_CORO_YIELD
552 impl.op_rd.emplace(std::move(*this));
553 impl.wr_block.lock(this);
554 BOOST_ASIO_CORO_YIELD
555 net::post(std::move(*this));
556 BOOST_ASSERT(impl.wr_block.is_locked(this));
557 if(impl.check_stop_now(ec))
558 goto upcall;
559 }
560
561 impl.change_status(status::closing);
562
563 if(! impl.wr_close)
564 {
565 impl.wr_close = true;
566
567 // Serialize close frame
568 impl.rd_fb.clear();
569 impl.template write_close<
570 flat_static_buffer_base>(
571 impl.rd_fb, code_);
572
573 // Send close frame
574 BOOST_ASSERT(impl.wr_block.is_locked(this));
575 BOOST_ASIO_CORO_YIELD
576 net::async_write(impl.stream(), impl.rd_fb.data(),
577 beast::detail::bind_continuation(std::move(*this)));
578 BOOST_ASSERT(impl.wr_block.is_locked(this));
579 if(impl.check_stop_now(ec))
580 goto upcall;
581 }
582
583 // Teardown
584 using beast::websocket::async_teardown;
585 BOOST_ASSERT(impl.wr_block.is_locked(this));
586 BOOST_ASIO_CORO_YIELD
587 async_teardown(impl.role, impl.stream(),
588 beast::detail::bind_continuation(std::move(*this)));
589 BOOST_ASSERT(impl.wr_block.is_locked(this));
590 if(ec == net::error::eof)
591 {
592 // Rationale:
593 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
594 ec = {};
595 }
596 if(! ec)
597 ec = result_;
598 if(ec && ec != error::closed)
599 impl.change_status(status::failed);
600 else
601 impl.change_status(status::closed);
602 impl.close();
603
604 upcall:
605 impl.rd_block.try_unlock(this);
606 impl.op_r_close.maybe_invoke();
607 if(impl.wr_block.try_unlock(this))
608 impl.op_close.maybe_invoke()
609 || impl.op_idle_ping.maybe_invoke()
610 || impl.op_ping.maybe_invoke()
611 || impl.op_wr.maybe_invoke();
612 this->complete(cont, ec, bytes_written_);
613 }
614 }
615};
616
617//------------------------------------------------------------------------------
618
619template<class NextLayer, bool deflateSupported>
620template<class Handler, class DynamicBuffer>
621class stream<NextLayer, deflateSupported>::read_op
622 : public beast::async_base<
623 Handler, beast::executor_type<stream>>
624 , public asio::coroutine
625{
626 boost::weak_ptr<impl_type> wp_;
627 DynamicBuffer& b_;
628 std::size_t limit_;
629 std::size_t bytes_written_ = 0;
630 bool some_;
631
632public:
633 template<class Handler_>
634 read_op(
635 Handler_&& h,
636 boost::shared_ptr<impl_type> const& sp,
637 DynamicBuffer& b,
638 std::size_t limit,
639 bool some)
640 : async_base<Handler,
641 beast::executor_type<stream>>(
642 std::forward<Handler_>(h),
643 sp->stream().get_executor())
644 , wp_(sp)
645 , b_(b)
646 , limit_(limit ? limit : (
647 std::numeric_limits<std::size_t>::max)())
648 , some_(some)
649 {
650 (*this)({}, 0, false);
651 }
652
653 void operator()(
654 error_code ec = {},
655 std::size_t bytes_transferred = 0,
656 bool cont = true)
657 {
658 using beast::detail::clamp;
659 auto sp = wp_.lock();
660 if(! sp)
661 {
662 ec = net::error::operation_aborted;
663 bytes_written_ = 0;
664 return this->complete(cont, ec, bytes_written_);
665 }
666 auto& impl = *sp;
667 using mutable_buffers_type = typename
668 DynamicBuffer::mutable_buffers_type;
669 BOOST_ASIO_CORO_REENTER(*this)
670 {
671 do
672 {
673 // VFALCO TODO use boost::beast::bind_continuation
674 BOOST_ASIO_CORO_YIELD
675 {
676 auto mb = beast::detail::dynamic_buffer_prepare(b_,
677 clamp(impl.read_size_hint_db(b_), limit_),
678 ec, error::buffer_overflow);
679 if(impl.check_stop_now(ec))
680 goto upcall;
681 read_some_op<read_op, mutable_buffers_type>(
682 std::move(*this), sp, *mb);
683 }
684
685 b_.commit(bytes_transferred);
686 bytes_written_ += bytes_transferred;
687 if(ec)
688 goto upcall;
689 }
690 while(! some_ && ! impl.rd_done);
691
692 upcall:
693 this->complete(cont, ec, bytes_written_);
694 }
695 }
696};
697
698template<class NextLayer, bool deflateSupported>
699struct stream<NextLayer, deflateSupported>::
700 run_read_some_op
701{
702 template<
703 class ReadHandler,
704 class MutableBufferSequence>
705 void
706 operator()(
707 ReadHandler&& h,
708 boost::shared_ptr<impl_type> const& sp,
709 MutableBufferSequence const& b)
710 {
711 // If you get an error on the following line it means
712 // that your handler does not meet the documented type
713 // requirements for the handler.
714
715 static_assert(
716 beast::detail::is_invocable<ReadHandler,
717 void(error_code, std::size_t)>::value,
718 "ReadHandler type requirements not met");
719
720 read_some_op<
721 typename std::decay<ReadHandler>::type,
722 MutableBufferSequence>(
723 std::forward<ReadHandler>(h),
724 sp,
725 b);
726 }
727};
728
729template<class NextLayer, bool deflateSupported>
730struct stream<NextLayer, deflateSupported>::
731 run_read_op
732{
733 template<
734 class ReadHandler,
735 class DynamicBuffer>
736 void
737 operator()(
738 ReadHandler&& h,
739 boost::shared_ptr<impl_type> const& sp,
740 DynamicBuffer* b,
741 std::size_t limit,
742 bool some)
743 {
744 // If you get an error on the following line it means
745 // that your handler does not meet the documented type
746 // requirements for the handler.
747
748 static_assert(
749 beast::detail::is_invocable<ReadHandler,
750 void(error_code, std::size_t)>::value,
751 "ReadHandler type requirements not met");
752
753 read_op<
754 typename std::decay<ReadHandler>::type,
755 DynamicBuffer>(
756 std::forward<ReadHandler>(h),
757 sp,
758 *b,
759 limit,
760 some);
761 }
762};
763
764//------------------------------------------------------------------------------
765
766template<class NextLayer, bool deflateSupported>
767template<class DynamicBuffer>
768std::size_t
769stream<NextLayer, deflateSupported>::
770read(DynamicBuffer& buffer)
771{
772 static_assert(is_sync_stream<next_layer_type>::value,
773 "SyncStream type requirements not met");
774 static_assert(
775 net::is_dynamic_buffer<DynamicBuffer>::value,
776 "DynamicBuffer type requirements not met");
777 error_code ec;
778 auto const bytes_written = read(buffer, ec);
779 if(ec)
780 BOOST_THROW_EXCEPTION(system_error{ec});
781 return bytes_written;
782}
783
784template<class NextLayer, bool deflateSupported>
785template<class DynamicBuffer>
786std::size_t
787stream<NextLayer, deflateSupported>::
788read(DynamicBuffer& buffer, error_code& ec)
789{
790 static_assert(is_sync_stream<next_layer_type>::value,
791 "SyncStream type requirements not met");
792 static_assert(
793 net::is_dynamic_buffer<DynamicBuffer>::value,
794 "DynamicBuffer type requirements not met");
795 std::size_t bytes_written = 0;
796 do
797 {
798 bytes_written += read_some(buffer, 0, ec);
799 if(ec)
800 return bytes_written;
801 }
802 while(! is_message_done());
803 return bytes_written;
804}
805
806template<class NextLayer, bool deflateSupported>
807template<class DynamicBuffer, class ReadHandler>
808BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
809stream<NextLayer, deflateSupported>::
810async_read(DynamicBuffer& buffer, ReadHandler&& handler)
811{
812 static_assert(is_async_stream<next_layer_type>::value,
813 "AsyncStream type requirements not met");
814 static_assert(
815 net::is_dynamic_buffer<DynamicBuffer>::value,
816 "DynamicBuffer type requirements not met");
817 return net::async_initiate<
818 ReadHandler,
819 void(error_code, std::size_t)>(
820 run_read_op{},
821 handler,
822 impl_,
823 &buffer,
824 0,
825 false);
826}
827
828//------------------------------------------------------------------------------
829
830template<class NextLayer, bool deflateSupported>
831template<class DynamicBuffer>
832std::size_t
833stream<NextLayer, deflateSupported>::
834read_some(
835 DynamicBuffer& buffer,
836 std::size_t limit)
837{
838 static_assert(is_sync_stream<next_layer_type>::value,
839 "SyncStream type requirements not met");
840 static_assert(
841 net::is_dynamic_buffer<DynamicBuffer>::value,
842 "DynamicBuffer type requirements not met");
843 error_code ec;
844 auto const bytes_written =
845 read_some(buffer, limit, ec);
846 if(ec)
847 BOOST_THROW_EXCEPTION(system_error{ec});
848 return bytes_written;
849}
850
851template<class NextLayer, bool deflateSupported>
852template<class DynamicBuffer>
853std::size_t
854stream<NextLayer, deflateSupported>::
855read_some(
856 DynamicBuffer& buffer,
857 std::size_t limit,
858 error_code& ec)
859{
860 static_assert(is_sync_stream<next_layer_type>::value,
861 "SyncStream type requirements not met");
862 static_assert(
863 net::is_dynamic_buffer<DynamicBuffer>::value,
864 "DynamicBuffer type requirements not met");
865 using beast::detail::clamp;
866 if(! limit)
867 limit = (std::numeric_limits<std::size_t>::max)();
868 auto const size =
869 clamp(read_size_hint(buffer), limit);
870 BOOST_ASSERT(size > 0);
871 auto mb = beast::detail::dynamic_buffer_prepare(
872 buffer, size, ec, error::buffer_overflow);
873 if(impl_->check_stop_now(ec))
874 return 0;
875 auto const bytes_written = read_some(*mb, ec);
876 buffer.commit(bytes_written);
877 return bytes_written;
878}
879
880template<class NextLayer, bool deflateSupported>
881template<class DynamicBuffer, class ReadHandler>
882BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
883stream<NextLayer, deflateSupported>::
884async_read_some(
885 DynamicBuffer& buffer,
886 std::size_t limit,
887 ReadHandler&& handler)
888{
889 static_assert(is_async_stream<next_layer_type>::value,
890 "AsyncStream type requirements not met");
891 static_assert(
892 net::is_dynamic_buffer<DynamicBuffer>::value,
893 "DynamicBuffer type requirements not met");
894 return net::async_initiate<
895 ReadHandler,
896 void(error_code, std::size_t)>(
897 run_read_op{},
898 handler,
899 impl_,
900 &buffer,
901 limit,
902 true);
903}
904
905//------------------------------------------------------------------------------
906
907template<class NextLayer, bool deflateSupported>
908template<class MutableBufferSequence>
909std::size_t
910stream<NextLayer, deflateSupported>::
911read_some(
912 MutableBufferSequence const& buffers)
913{
914 static_assert(is_sync_stream<next_layer_type>::value,
915 "SyncStream type requirements not met");
916 static_assert(net::is_mutable_buffer_sequence<
917 MutableBufferSequence>::value,
918 "MutableBufferSequence type requirements not met");
919 error_code ec;
920 auto const bytes_written = read_some(buffers, ec);
921 if(ec)
922 BOOST_THROW_EXCEPTION(system_error{ec});
923 return bytes_written;
924}
925
926template<class NextLayer, bool deflateSupported>
927template<class MutableBufferSequence>
928std::size_t
929stream<NextLayer, deflateSupported>::
930read_some(
931 MutableBufferSequence const& buffers,
932 error_code& ec)
933{
934 static_assert(is_sync_stream<next_layer_type>::value,
935 "SyncStream type requirements not met");
936 static_assert(net::is_mutable_buffer_sequence<
937 MutableBufferSequence>::value,
938 "MutableBufferSequence type requirements not met");
939 using beast::detail::clamp;
940 auto& impl = *impl_;
941 close_code code{};
942 std::size_t bytes_written = 0;
943 ec = {};
944 // Make sure the stream is open
945 if(impl.check_stop_now(ec))
946 return bytes_written;
947loop:
948 // See if we need to read a frame header. This
949 // condition is structured to give the decompressor
950 // a chance to emit the final empty deflate block
951 //
952 if(impl.rd_remain == 0 && (
953 ! impl.rd_fh.fin || impl.rd_done))
954 {
955 // Read frame header
956 error_code result;
957 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
958 {
959 if(result)
960 {
961 // _Fail the WebSocket Connection_
962 if(result == error::message_too_big)
963 code = close_code::too_big;
964 else
965 code = close_code::protocol_error;
966 do_fail(code, result, ec);
967 return bytes_written;
968 }
969 auto const bytes_transferred =
970 impl.stream().read_some(
971 impl.rd_buf.prepare(read_size(
972 impl.rd_buf, impl.rd_buf.max_size())),
973 ec);
974 impl.rd_buf.commit(bytes_transferred);
975 if(impl.check_stop_now(ec))
976 return bytes_written;
977 }
978 // Immediately apply the mask to the portion
979 // of the buffer holding payload data.
980 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
981 detail::mask_inplace(buffers_prefix(
982 clamp(impl.rd_fh.len), impl.rd_buf.data()),
983 impl.rd_key);
984 if(detail::is_control(impl.rd_fh.op))
985 {
986 // Get control frame payload
987 auto const b = buffers_prefix(
988 clamp(impl.rd_fh.len), impl.rd_buf.data());
989 auto const len = buffer_bytes(b);
990 BOOST_ASSERT(len == impl.rd_fh.len);
991
992 // Clear this otherwise the next
993 // frame will be considered final.
994 impl.rd_fh.fin = false;
995
996 // Handle ping frame
997 if(impl.rd_fh.op == detail::opcode::ping)
998 {
999 ping_data payload;
1000 detail::read_ping(payload, b);
1001 impl.rd_buf.consume(len);
1002 if(impl.wr_close)
1003 {
1004 // Ignore ping when closing
1005 goto loop;
1006 }
1007 if(impl.ctrl_cb)
1008 impl.ctrl_cb(frame_type::ping, payload);
1009 detail::frame_buffer fb;
1010 impl.template write_ping<flat_static_buffer_base>(fb,
1011 detail::opcode::pong, payload);
1012 net::write(impl.stream(), fb.data(), ec);
1013 if(impl.check_stop_now(ec))
1014 return bytes_written;
1015 goto loop;
1016 }
1017 // Handle pong frame
1018 if(impl.rd_fh.op == detail::opcode::pong)
1019 {
1020 ping_data payload;
1021 detail::read_ping(payload, b);
1022 impl.rd_buf.consume(len);
1023 if(impl.ctrl_cb)
1024 impl.ctrl_cb(frame_type::pong, payload);
1025 goto loop;
1026 }
1027 // Handle close frame
1028 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1029 {
1030 BOOST_ASSERT(! impl.rd_close);
1031 impl.rd_close = true;
1032 close_reason cr;
1033 detail::read_close(cr, b, result);
1034 if(result)
1035 {
1036 // _Fail the WebSocket Connection_
1037 do_fail(close_code::protocol_error,
1038 result, ec);
1039 return bytes_written;
1040 }
1041 impl.cr = cr;
1042 impl.rd_buf.consume(len);
1043 if(impl.ctrl_cb)
1044 impl.ctrl_cb(frame_type::close, impl.cr.reason);
1045 BOOST_ASSERT(! impl.wr_close);
1046 // _Start the WebSocket Closing Handshake_
1047 do_fail(
1048 cr.code == close_code::none ?
1049 close_code::normal :
1050 static_cast<close_code>(cr.code),
1051 error::closed, ec);
1052 return bytes_written;
1053 }
1054 }
1055 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1056 {
1057 // Empty non-final frame
1058 goto loop;
1059 }
1060 impl.rd_done = false;
1061 }
1062 else
1063 {
1064 ec = {};
1065 }
1066 if(! impl.rd_deflated())
1067 {
1068 if(impl.rd_remain > 0)
1069 {
1070 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1071 (std::min)(clamp(impl.rd_remain),
1072 buffer_bytes(buffers)))
1073 {
1074 // Fill the read buffer first, otherwise we
1075 // get fewer bytes at the cost of one I/O.
1076 impl.rd_buf.commit(impl.stream().read_some(
1077 impl.rd_buf.prepare(read_size(impl.rd_buf,
1078 impl.rd_buf.max_size())), ec));
1079 if(impl.check_stop_now(ec))
1080 return bytes_written;
1081 if(impl.rd_fh.mask)
1082 detail::mask_inplace(
1083 buffers_prefix(clamp(impl.rd_remain),
1084 impl.rd_buf.data()), impl.rd_key);
1085 }
1086 if(impl.rd_buf.size() > 0)
1087 {
1088 // Copy from the read buffer.
1089 // The mask was already applied.
1090 auto const bytes_transferred = net::buffer_copy(
1091 buffers, impl.rd_buf.data(),
1092 clamp(impl.rd_remain));
1093 auto const mb = buffers_prefix(
1094 bytes_transferred, buffers);
1095 impl.rd_remain -= bytes_transferred;
1096 if(impl.rd_op == detail::opcode::text)
1097 {
1098 if(! impl.rd_utf8.write(mb) ||
1099 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1100 ! impl.rd_utf8.finish()))
1101 {
1102 // _Fail the WebSocket Connection_
1103 do_fail(close_code::bad_payload,
1104 error::bad_frame_payload, ec);
1105 return bytes_written;
1106 }
1107 }
1108 bytes_written += bytes_transferred;
1109 impl.rd_size += bytes_transferred;
1110 impl.rd_buf.consume(bytes_transferred);
1111 }
1112 else
1113 {
1114 // Read into caller's buffer
1115 BOOST_ASSERT(impl.rd_remain > 0);
1116 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1117 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1118 clamp(impl.rd_remain), buffers)) > 0);
1119 auto const bytes_transferred =
1120 impl.stream().read_some(buffers_prefix(
1121 clamp(impl.rd_remain), buffers), ec);
1122 // VFALCO What if some bytes were written?
1123 if(impl.check_stop_now(ec))
1124 return bytes_written;
1125 BOOST_ASSERT(bytes_transferred > 0);
1126 auto const mb = buffers_prefix(
1127 bytes_transferred, buffers);
1128 impl.rd_remain -= bytes_transferred;
1129 if(impl.rd_fh.mask)
1130 detail::mask_inplace(mb, impl.rd_key);
1131 if(impl.rd_op == detail::opcode::text)
1132 {
1133 if(! impl.rd_utf8.write(mb) ||
1134 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1135 ! impl.rd_utf8.finish()))
1136 {
1137 // _Fail the WebSocket Connection_
1138 do_fail(close_code::bad_payload,
1139 error::bad_frame_payload, ec);
1140 return bytes_written;
1141 }
1142 }
1143 bytes_written += bytes_transferred;
1144 impl.rd_size += bytes_transferred;
1145 }
1146 }
f67539c2
TL
1147 BOOST_ASSERT( ! impl.rd_done );
1148 if( impl.rd_remain == 0 && impl.rd_fh.fin )
1149 impl.rd_done = true;
92f5a8d4
TL
1150 }
1151 else
1152 {
1153 // Read compressed message frame payload:
1154 // inflate even if rd_fh_.len == 0, otherwise we
1155 // never emit the end-of-stream deflate block.
1156 //
1157 bool did_read = false;
1158 buffers_suffix<MutableBufferSequence> cb(buffers);
1159 while(buffer_bytes(cb) > 0)
1160 {
1161 zlib::z_params zs;
1162 {
1163 auto const out = beast::buffers_front(cb);
1164 zs.next_out = out.data();
1165 zs.avail_out = out.size();
1166 BOOST_ASSERT(zs.avail_out > 0);
1167 }
1168 if(impl.rd_remain > 0)
1169 {
1170 if(impl.rd_buf.size() > 0)
1171 {
1172 // use what's there
1173 auto const in = buffers_prefix(
1174 clamp(impl.rd_remain), beast::buffers_front(
1175 impl.rd_buf.data()));
1176 zs.avail_in = in.size();
1177 zs.next_in = in.data();
1178 }
1179 else if(! did_read)
1180 {
1181 // read new
1182 auto const bytes_transferred =
1183 impl.stream().read_some(
1184 impl.rd_buf.prepare(read_size(
1185 impl.rd_buf, impl.rd_buf.max_size())),
1186 ec);
1187 if(impl.check_stop_now(ec))
1188 return bytes_written;
1189 BOOST_ASSERT(bytes_transferred > 0);
1190 impl.rd_buf.commit(bytes_transferred);
1191 if(impl.rd_fh.mask)
1192 detail::mask_inplace(
1193 buffers_prefix(clamp(impl.rd_remain),
1194 impl.rd_buf.data()), impl.rd_key);
1195 auto const in = buffers_prefix(
1196 clamp(impl.rd_remain), buffers_front(
1197 impl.rd_buf.data()));
1198 zs.avail_in = in.size();
1199 zs.next_in = in.data();
1200 did_read = true;
1201 }
1202 else
1203 {
1204 break;
1205 }
1206 }
1207 else if(impl.rd_fh.fin)
1208 {
1209 // append the empty block codes
1210 static std::uint8_t constexpr
1211 empty_block[4] = {
1212 0x00, 0x00, 0xff, 0xff };
1213 zs.next_in = empty_block;
1214 zs.avail_in = sizeof(empty_block);
1215 impl.inflate(zs, zlib::Flush::sync, ec);
1216 if(! ec)
1217 {
1218 // https://github.com/madler/zlib/issues/280
1219 if(zs.total_out > 0)
1220 ec = error::partial_deflate_block;
1221 }
1222 if(impl.check_stop_now(ec))
1223 return bytes_written;
1224 impl.do_context_takeover_read(impl.role);
1225 impl.rd_done = true;
1226 break;
1227 }
1228 else
1229 {
1230 break;
1231 }
1232 impl.inflate(zs, zlib::Flush::sync, ec);
1233 if(impl.check_stop_now(ec))
1234 return bytes_written;
1235 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1236 impl.rd_size, zs.total_out, impl.rd_msg_max))
1237 {
1238 do_fail(close_code::too_big,
1239 error::message_too_big, ec);
1240 return bytes_written;
1241 }
1242 cb.consume(zs.total_out);
1243 impl.rd_size += zs.total_out;
1244 impl.rd_remain -= zs.total_in;
1245 impl.rd_buf.consume(zs.total_in);
1246 bytes_written += zs.total_out;
1247 }
1248 if(impl.rd_op == detail::opcode::text)
1249 {
1250 // check utf8
1251 if(! impl.rd_utf8.write(beast::buffers_prefix(
1252 bytes_written, buffers)) || (
1253 impl.rd_done && ! impl.rd_utf8.finish()))
1254 {
1255 // _Fail the WebSocket Connection_
1256 do_fail(close_code::bad_payload,
1257 error::bad_frame_payload, ec);
1258 return bytes_written;
1259 }
1260 }
1261 }
1262 return bytes_written;
1263}
1264
1265template<class NextLayer, bool deflateSupported>
1266template<class MutableBufferSequence, class ReadHandler>
1267BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1268stream<NextLayer, deflateSupported>::
1269async_read_some(
1270 MutableBufferSequence const& buffers,
1271 ReadHandler&& handler)
1272{
1273 static_assert(is_async_stream<next_layer_type>::value,
1274 "AsyncStream type requirements not met");
1275 static_assert(net::is_mutable_buffer_sequence<
1276 MutableBufferSequence>::value,
1277 "MutableBufferSequence type requirements not met");
1278 return net::async_initiate<
1279 ReadHandler,
1280 void(error_code, std::size_t)>(
1281 run_read_some_op{},
1282 handler,
1283 impl_,
1284 buffers);
1285}
1286
1287} // websocket
1288} // beast
1289} // boost
1290
1291#endif