]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/beast/websocket/impl/read.hpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / read.hpp
CommitLineData
92f5a8d4
TL
1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
11#define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
12
13#include <boost/beast/core/buffer_traits.hpp>
14#include <boost/beast/websocket/teardown.hpp>
15#include <boost/beast/websocket/detail/mask.hpp>
16#include <boost/beast/websocket/impl/stream_impl.hpp>
17#include <boost/beast/core/async_base.hpp>
18#include <boost/beast/core/bind_handler.hpp>
19#include <boost/beast/core/buffers_prefix.hpp>
20#include <boost/beast/core/buffers_suffix.hpp>
21#include <boost/beast/core/flat_static_buffer.hpp>
22#include <boost/beast/core/read_size.hpp>
23#include <boost/beast/core/stream_traits.hpp>
24#include <boost/beast/core/detail/bind_continuation.hpp>
25#include <boost/beast/core/detail/buffer.hpp>
26#include <boost/beast/core/detail/clamp.hpp>
27#include <boost/beast/core/detail/config.hpp>
28#include <boost/asio/coroutine.hpp>
29#include <boost/asio/post.hpp>
30#include <boost/assert.hpp>
31#include <boost/config.hpp>
32#include <boost/optional.hpp>
33#include <boost/throw_exception.hpp>
34#include <algorithm>
35#include <limits>
36#include <memory>
37
38namespace boost {
39namespace beast {
40namespace websocket {
41
42/* Read some message data into a buffer sequence.
43
44 Also reads and handles control frames.
45*/
46template<class NextLayer, bool deflateSupported>
47template<class Handler, class MutableBufferSequence>
48class stream<NextLayer, deflateSupported>::read_some_op
49 : public beast::async_base<
50 Handler, beast::executor_type<stream>>
51 , public asio::coroutine
52{
53 boost::weak_ptr<impl_type> wp_;
54 MutableBufferSequence bs_;
55 buffers_suffix<MutableBufferSequence> cb_;
56 std::size_t bytes_written_ = 0;
57 error_code result_;
58 close_code code_;
59 bool did_read_ = false;
60
61public:
62 static constexpr int id = 1; // for soft_mutex
63
64 template<class Handler_>
65 read_some_op(
66 Handler_&& h,
67 boost::shared_ptr<impl_type> const& sp,
68 MutableBufferSequence const& bs)
69 : async_base<
70 Handler, beast::executor_type<stream>>(
71 std::forward<Handler_>(h),
72 sp->stream().get_executor())
73 , wp_(sp)
74 , bs_(bs)
75 , cb_(bs)
76 , code_(close_code::none)
77 {
78 (*this)({}, 0, false);
79 }
80
81 void operator()(
82 error_code ec = {},
83 std::size_t bytes_transferred = 0,
84 bool cont = true)
85 {
86 using beast::detail::clamp;
87 auto sp = wp_.lock();
88 if(! sp)
89 {
90 ec = net::error::operation_aborted;
91 bytes_written_ = 0;
92 return this->complete(cont, ec, bytes_written_);
93 }
94 auto& impl = *sp;
95 BOOST_ASIO_CORO_REENTER(*this)
96 {
97 impl.update_timer(this->get_executor());
98
99 acquire_read_lock:
100 // Acquire the read lock
101 if(! impl.rd_block.try_lock(this))
102 {
103 do_suspend:
104 BOOST_ASIO_CORO_YIELD
20effc67
TL
105 {
106 BOOST_ASIO_HANDLER_LOCATION((
107 __FILE__, __LINE__,
108 "websocket::async_read_some"));
109
110 impl.op_r_rd.emplace(std::move(*this));
111 }
92f5a8d4
TL
112 impl.rd_block.lock(this);
113 BOOST_ASIO_CORO_YIELD
20effc67
TL
114 {
115 BOOST_ASIO_HANDLER_LOCATION((
116 __FILE__, __LINE__,
117 "websocket::async_read_some"));
118
119 net::post(std::move(*this));
120 }
92f5a8d4
TL
121 BOOST_ASSERT(impl.rd_block.is_locked(this));
122
20effc67 123 BOOST_ASSERT(!ec);
92f5a8d4
TL
124 if(impl.check_stop_now(ec))
125 {
126 BOOST_ASSERT(ec == net::error::operation_aborted);
127 goto upcall;
128 }
129 // VFALCO Should never get here
130
131 // The only way to get read blocked is if
132 // a `close_op` wrote a close frame
133 BOOST_ASSERT(impl.wr_close);
134 BOOST_ASSERT(impl.status_ != status::open);
135 ec = net::error::operation_aborted;
136 goto upcall;
137 }
138 else
139 {
140 // Make sure the stream is not closed
141 if( impl.status_ == status::closed ||
142 impl.status_ == status::failed)
143 {
144 ec = net::error::operation_aborted;
145 goto upcall;
146 }
147 }
148
149 // if status_ == status::closing, we want to suspend
150 // the read operation until the close completes,
151 // then finish the read with operation_aborted.
152
153 loop:
154 BOOST_ASSERT(impl.rd_block.is_locked(this));
155 // See if we need to read a frame header. This
156 // condition is structured to give the decompressor
157 // a chance to emit the final empty deflate block
158 //
159 if(impl.rd_remain == 0 &&
160 (! impl.rd_fh.fin || impl.rd_done))
161 {
162 // Read frame header
163 while(! impl.parse_fh(
164 impl.rd_fh, impl.rd_buf, result_))
165 {
166 if(result_)
167 {
168 // _Fail the WebSocket Connection_
169 if(result_ == error::message_too_big)
170 code_ = close_code::too_big;
171 else
172 code_ = close_code::protocol_error;
173 goto close;
174 }
175 BOOST_ASSERT(impl.rd_block.is_locked(this));
176 BOOST_ASIO_CORO_YIELD
20effc67
TL
177 {
178 BOOST_ASIO_HANDLER_LOCATION((
179 __FILE__, __LINE__,
180 "websocket::async_read_some"));
181
182 impl.stream().async_read_some(
183 impl.rd_buf.prepare(read_size(
184 impl.rd_buf, impl.rd_buf.max_size())),
185 std::move(*this));
186 }
92f5a8d4
TL
187 BOOST_ASSERT(impl.rd_block.is_locked(this));
188 impl.rd_buf.commit(bytes_transferred);
189 if(impl.check_stop_now(ec))
190 goto upcall;
191 impl.reset_idle();
192
193 // Allow a close operation
194 // to acquire the read block
195 impl.rd_block.unlock(this);
196 if( impl.op_r_close.maybe_invoke())
197 {
198 // Suspend
199 BOOST_ASSERT(impl.rd_block.is_locked());
200 goto do_suspend;
201 }
202 // Acquire read block
203 impl.rd_block.lock(this);
204 }
205 // Immediately apply the mask to the portion
206 // of the buffer holding payload data.
207 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
208 detail::mask_inplace(buffers_prefix(
209 clamp(impl.rd_fh.len),
210 impl.rd_buf.data()),
211 impl.rd_key);
212 if(detail::is_control(impl.rd_fh.op))
213 {
214 // Clear this otherwise the next
215 // frame will be considered final.
216 impl.rd_fh.fin = false;
217
218 // Handle ping frame
219 if(impl.rd_fh.op == detail::opcode::ping)
220 {
221 if(impl.ctrl_cb)
222 {
223 if(! cont)
224 {
225 BOOST_ASIO_CORO_YIELD
20effc67
TL
226 {
227 BOOST_ASIO_HANDLER_LOCATION((
228 __FILE__, __LINE__,
229 "websocket::async_read_some"));
230
231 net::post(std::move(*this));
232 }
92f5a8d4
TL
233 BOOST_ASSERT(cont);
234 // VFALCO call check_stop_now() here?
235 }
236 }
237 {
238 auto const b = buffers_prefix(
239 clamp(impl.rd_fh.len),
240 impl.rd_buf.data());
241 auto const len = buffer_bytes(b);
242 BOOST_ASSERT(len == impl.rd_fh.len);
243 ping_data payload;
244 detail::read_ping(payload, b);
245 impl.rd_buf.consume(len);
246 // Ignore ping when closing
247 if(impl.status_ == status::closing)
248 goto loop;
249 if(impl.ctrl_cb)
250 impl.ctrl_cb(
251 frame_type::ping, payload);
252 impl.rd_fb.clear();
253 impl.template write_ping<
254 flat_static_buffer_base>(impl.rd_fb,
255 detail::opcode::pong, payload);
256 }
257
258 // Allow a close operation
259 // to acquire the read block
260 impl.rd_block.unlock(this);
261 impl.op_r_close.maybe_invoke();
262
263 // Acquire the write lock
264 if(! impl.wr_block.try_lock(this))
265 {
266 BOOST_ASIO_CORO_YIELD
20effc67
TL
267 {
268 BOOST_ASIO_HANDLER_LOCATION((
269 __FILE__, __LINE__,
270 "websocket::async_read_some"));
271
272 impl.op_rd.emplace(std::move(*this));
273 }
92f5a8d4
TL
274 impl.wr_block.lock(this);
275 BOOST_ASIO_CORO_YIELD
20effc67
TL
276 {
277 BOOST_ASIO_HANDLER_LOCATION((
278 __FILE__, __LINE__,
279 "websocket::async_read_some"));
280
281 net::post(std::move(*this));
282 }
92f5a8d4
TL
283 BOOST_ASSERT(impl.wr_block.is_locked(this));
284 if(impl.check_stop_now(ec))
285 goto upcall;
286 }
287
288 // Send pong
289 BOOST_ASSERT(impl.wr_block.is_locked(this));
290 BOOST_ASIO_CORO_YIELD
20effc67
TL
291 {
292 BOOST_ASIO_HANDLER_LOCATION((
293 __FILE__, __LINE__,
294 "websocket::async_read_some"));
295
296 net::async_write(
1e59de90 297 impl.stream(), net::const_buffer(impl.rd_fb.data()),
20effc67
TL
298 beast::detail::bind_continuation(std::move(*this)));
299 }
92f5a8d4
TL
300 BOOST_ASSERT(impl.wr_block.is_locked(this));
301 if(impl.check_stop_now(ec))
302 goto upcall;
303 impl.wr_block.unlock(this);
304 impl.op_close.maybe_invoke()
305 || impl.op_idle_ping.maybe_invoke()
306 || impl.op_ping.maybe_invoke()
307 || impl.op_wr.maybe_invoke();
308 goto acquire_read_lock;
309 }
310
311 // Handle pong frame
312 if(impl.rd_fh.op == detail::opcode::pong)
313 {
314 // Ignore pong when closing
315 if(! impl.wr_close && impl.ctrl_cb)
316 {
317 if(! cont)
318 {
319 BOOST_ASIO_CORO_YIELD
20effc67
TL
320 {
321 BOOST_ASIO_HANDLER_LOCATION((
322 __FILE__, __LINE__,
323 "websocket::async_read_some"));
324
325 net::post(std::move(*this));
326 }
92f5a8d4
TL
327 BOOST_ASSERT(cont);
328 }
329 }
330 auto const cb = buffers_prefix(clamp(
331 impl.rd_fh.len), impl.rd_buf.data());
332 auto const len = buffer_bytes(cb);
333 BOOST_ASSERT(len == impl.rd_fh.len);
334 ping_data payload;
335 detail::read_ping(payload, cb);
336 impl.rd_buf.consume(len);
337 // Ignore pong when closing
338 if(! impl.wr_close && impl.ctrl_cb)
339 impl.ctrl_cb(frame_type::pong, payload);
340 goto loop;
341 }
342
343 // Handle close frame
344 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
345 {
346 if(impl.ctrl_cb)
347 {
348 if(! cont)
349 {
350 BOOST_ASIO_CORO_YIELD
20effc67
TL
351 {
352 BOOST_ASIO_HANDLER_LOCATION((
353 __FILE__, __LINE__,
354 "websocket::async_read_some"));
355
356 net::post(std::move(*this));
357 }
92f5a8d4
TL
358 BOOST_ASSERT(cont);
359 }
360 }
361 auto const cb = buffers_prefix(clamp(
362 impl.rd_fh.len), impl.rd_buf.data());
363 auto const len = buffer_bytes(cb);
364 BOOST_ASSERT(len == impl.rd_fh.len);
365 BOOST_ASSERT(! impl.rd_close);
366 impl.rd_close = true;
367 close_reason cr;
368 detail::read_close(cr, cb, result_);
369 if(result_)
370 {
371 // _Fail the WebSocket Connection_
372 code_ = close_code::protocol_error;
373 goto close;
374 }
375 impl.cr = cr;
376 impl.rd_buf.consume(len);
377 if(impl.ctrl_cb)
378 impl.ctrl_cb(frame_type::close,
379 impl.cr.reason);
380 // See if we are already closing
381 if(impl.status_ == status::closing)
382 {
383 // _Close the WebSocket Connection_
384 BOOST_ASSERT(impl.wr_close);
385 code_ = close_code::none;
386 result_ = error::closed;
387 goto close;
388 }
389 // _Start the WebSocket Closing Handshake_
390 code_ = cr.code == close_code::none ?
391 close_code::normal :
392 static_cast<close_code>(cr.code);
393 result_ = error::closed;
394 goto close;
395 }
396 }
397 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
398 {
399 // Empty non-final frame
400 goto loop;
401 }
402 impl.rd_done = false;
403 }
404 if(! impl.rd_deflated())
405 {
406 if(impl.rd_remain > 0)
407 {
408 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
409 (std::min)(clamp(impl.rd_remain),
410 buffer_bytes(cb_)))
411 {
412 // Fill the read buffer first, otherwise we
413 // get fewer bytes at the cost of one I/O.
414 BOOST_ASIO_CORO_YIELD
20effc67
TL
415 {
416 BOOST_ASIO_HANDLER_LOCATION((
417 __FILE__, __LINE__,
418 "websocket::async_read_some"));
419
420 impl.stream().async_read_some(
421 impl.rd_buf.prepare(read_size(
422 impl.rd_buf, impl.rd_buf.max_size())),
423 std::move(*this));
424 }
92f5a8d4
TL
425 impl.rd_buf.commit(bytes_transferred);
426 if(impl.check_stop_now(ec))
427 goto upcall;
428 impl.reset_idle();
429 if(impl.rd_fh.mask)
430 detail::mask_inplace(buffers_prefix(clamp(
431 impl.rd_remain), impl.rd_buf.data()),
432 impl.rd_key);
433 }
434 if(impl.rd_buf.size() > 0)
435 {
436 // Copy from the read buffer.
437 // The mask was already applied.
438 bytes_transferred = net::buffer_copy(cb_,
439 impl.rd_buf.data(), clamp(impl.rd_remain));
440 auto const mb = buffers_prefix(
441 bytes_transferred, cb_);
442 impl.rd_remain -= bytes_transferred;
443 if(impl.rd_op == detail::opcode::text)
444 {
445 if(! impl.rd_utf8.write(mb) ||
446 (impl.rd_remain == 0 && impl.rd_fh.fin &&
447 ! impl.rd_utf8.finish()))
448 {
449 // _Fail the WebSocket Connection_
450 code_ = close_code::bad_payload;
451 result_ = error::bad_frame_payload;
452 goto close;
453 }
454 }
455 bytes_written_ += bytes_transferred;
456 impl.rd_size += bytes_transferred;
457 impl.rd_buf.consume(bytes_transferred);
458 }
459 else
460 {
461 // Read into caller's buffer
462 BOOST_ASSERT(impl.rd_remain > 0);
463 BOOST_ASSERT(buffer_bytes(cb_) > 0);
464 BOOST_ASSERT(buffer_bytes(buffers_prefix(
465 clamp(impl.rd_remain), cb_)) > 0);
466 BOOST_ASIO_CORO_YIELD
20effc67
TL
467 {
468 BOOST_ASIO_HANDLER_LOCATION((
469 __FILE__, __LINE__,
470 "websocket::async_read_some"));
471
472 impl.stream().async_read_some(buffers_prefix(
473 clamp(impl.rd_remain), cb_), std::move(*this));
474 }
92f5a8d4
TL
475 if(impl.check_stop_now(ec))
476 goto upcall;
477 impl.reset_idle();
478 BOOST_ASSERT(bytes_transferred > 0);
479 auto const mb = buffers_prefix(
480 bytes_transferred, cb_);
481 impl.rd_remain -= bytes_transferred;
482 if(impl.rd_fh.mask)
483 detail::mask_inplace(mb, impl.rd_key);
484 if(impl.rd_op == detail::opcode::text)
485 {
486 if(! impl.rd_utf8.write(mb) ||
487 (impl.rd_remain == 0 && impl.rd_fh.fin &&
488 ! impl.rd_utf8.finish()))
489 {
490 // _Fail the WebSocket Connection_
491 code_ = close_code::bad_payload;
492 result_ = error::bad_frame_payload;
493 goto close;
494 }
495 }
496 bytes_written_ += bytes_transferred;
497 impl.rd_size += bytes_transferred;
498 }
499 }
f67539c2
TL
500 BOOST_ASSERT( ! impl.rd_done );
501 if( impl.rd_remain == 0 && impl.rd_fh.fin )
502 impl.rd_done = true;
92f5a8d4
TL
503 }
504 else
505 {
506 // Read compressed message frame payload:
507 // inflate even if rd_fh_.len == 0, otherwise we
508 // never emit the end-of-stream deflate block.
509 while(buffer_bytes(cb_) > 0)
510 {
511 if( impl.rd_remain > 0 &&
512 impl.rd_buf.size() == 0 &&
513 ! did_read_)
514 {
515 // read new
516 BOOST_ASIO_CORO_YIELD
20effc67
TL
517 {
518 BOOST_ASIO_HANDLER_LOCATION((
519 __FILE__, __LINE__,
520 "websocket::async_read_some"));
521
522 impl.stream().async_read_some(
523 impl.rd_buf.prepare(read_size(
524 impl.rd_buf, impl.rd_buf.max_size())),
525 std::move(*this));
526 }
92f5a8d4
TL
527 if(impl.check_stop_now(ec))
528 goto upcall;
529 impl.reset_idle();
530 BOOST_ASSERT(bytes_transferred > 0);
531 impl.rd_buf.commit(bytes_transferred);
532 if(impl.rd_fh.mask)
533 detail::mask_inplace(
534 buffers_prefix(clamp(impl.rd_remain),
535 impl.rd_buf.data()), impl.rd_key);
536 did_read_ = true;
537 }
538 zlib::z_params zs;
539 {
540 auto const out = buffers_front(cb_);
541 zs.next_out = out.data();
542 zs.avail_out = out.size();
543 BOOST_ASSERT(zs.avail_out > 0);
544 }
1e59de90
TL
545 // boolean to track the end of the message.
546 bool fin = false;
92f5a8d4
TL
547 if(impl.rd_remain > 0)
548 {
549 if(impl.rd_buf.size() > 0)
550 {
551 // use what's there
552 auto const in = buffers_prefix(
553 clamp(impl.rd_remain), buffers_front(
554 impl.rd_buf.data()));
555 zs.avail_in = in.size();
556 zs.next_in = in.data();
557 }
558 else
559 {
560 break;
561 }
562 }
563 else if(impl.rd_fh.fin)
564 {
565 // append the empty block codes
1e59de90 566 static std::uint8_t constexpr
92f5a8d4
TL
567 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
568 zs.next_in = empty_block;
569 zs.avail_in = sizeof(empty_block);
1e59de90 570 fin = true;
92f5a8d4
TL
571 }
572 else
573 {
574 break;
575 }
576 impl.inflate(zs, zlib::Flush::sync, ec);
577 if(impl.check_stop_now(ec))
578 goto upcall;
1e59de90
TL
579 if(fin && zs.total_out == 0) {
580 impl.do_context_takeover_read(impl.role);
581 impl.rd_done = true;
582 break;
583 }
92f5a8d4
TL
584 if(impl.rd_msg_max && beast::detail::sum_exceeds(
585 impl.rd_size, zs.total_out, impl.rd_msg_max))
586 {
587 // _Fail the WebSocket Connection_
588 code_ = close_code::too_big;
589 result_ = error::message_too_big;
590 goto close;
591 }
592 cb_.consume(zs.total_out);
593 impl.rd_size += zs.total_out;
1e59de90
TL
594 if (! fin) {
595 impl.rd_remain -= zs.total_in;
596 impl.rd_buf.consume(zs.total_in);
597 }
92f5a8d4
TL
598 bytes_written_ += zs.total_out;
599 }
600 if(impl.rd_op == detail::opcode::text)
601 {
602 // check utf8
603 if(! impl.rd_utf8.write(
604 buffers_prefix(bytes_written_, bs_)) || (
605 impl.rd_done && ! impl.rd_utf8.finish()))
606 {
607 // _Fail the WebSocket Connection_
608 code_ = close_code::bad_payload;
609 result_ = error::bad_frame_payload;
610 goto close;
611 }
612 }
613 }
614 goto upcall;
615
616 close:
617 // Acquire the write lock
618 if(! impl.wr_block.try_lock(this))
619 {
620 BOOST_ASIO_CORO_YIELD
20effc67
TL
621 {
622 BOOST_ASIO_HANDLER_LOCATION((
623 __FILE__, __LINE__,
624 "websocket::async_read_some"));
625
626 impl.op_rd.emplace(std::move(*this));
627 }
92f5a8d4
TL
628 impl.wr_block.lock(this);
629 BOOST_ASIO_CORO_YIELD
20effc67
TL
630 {
631 BOOST_ASIO_HANDLER_LOCATION((
632 __FILE__, __LINE__,
633 "websocket::async_read_some"));
634
635 net::post(std::move(*this));
636 }
92f5a8d4
TL
637 BOOST_ASSERT(impl.wr_block.is_locked(this));
638 if(impl.check_stop_now(ec))
639 goto upcall;
640 }
641
642 impl.change_status(status::closing);
643
644 if(! impl.wr_close)
645 {
646 impl.wr_close = true;
647
648 // Serialize close frame
649 impl.rd_fb.clear();
650 impl.template write_close<
651 flat_static_buffer_base>(
652 impl.rd_fb, code_);
653
654 // Send close frame
655 BOOST_ASSERT(impl.wr_block.is_locked(this));
656 BOOST_ASIO_CORO_YIELD
20effc67
TL
657 {
658 BOOST_ASIO_HANDLER_LOCATION((
659 __FILE__, __LINE__,
660 "websocket::async_read_some"));
661
1e59de90 662 net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
20effc67
TL
663 beast::detail::bind_continuation(std::move(*this)));
664 }
92f5a8d4
TL
665 BOOST_ASSERT(impl.wr_block.is_locked(this));
666 if(impl.check_stop_now(ec))
667 goto upcall;
668 }
669
670 // Teardown
671 using beast::websocket::async_teardown;
672 BOOST_ASSERT(impl.wr_block.is_locked(this));
673 BOOST_ASIO_CORO_YIELD
20effc67
TL
674 {
675 BOOST_ASIO_HANDLER_LOCATION((
676 __FILE__, __LINE__,
677 "websocket::async_read_some"));
678
679 async_teardown(impl.role, impl.stream(),
680 beast::detail::bind_continuation(std::move(*this)));
681 }
92f5a8d4
TL
682 BOOST_ASSERT(impl.wr_block.is_locked(this));
683 if(ec == net::error::eof)
684 {
685 // Rationale:
686 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
687 ec = {};
688 }
689 if(! ec)
690 ec = result_;
691 if(ec && ec != error::closed)
692 impl.change_status(status::failed);
693 else
694 impl.change_status(status::closed);
695 impl.close();
696
697 upcall:
698 impl.rd_block.try_unlock(this);
699 impl.op_r_close.maybe_invoke();
700 if(impl.wr_block.try_unlock(this))
701 impl.op_close.maybe_invoke()
702 || impl.op_idle_ping.maybe_invoke()
703 || impl.op_ping.maybe_invoke()
704 || impl.op_wr.maybe_invoke();
705 this->complete(cont, ec, bytes_written_);
706 }
707 }
708};
709
710//------------------------------------------------------------------------------
711
712template<class NextLayer, bool deflateSupported>
713template<class Handler, class DynamicBuffer>
714class stream<NextLayer, deflateSupported>::read_op
715 : public beast::async_base<
716 Handler, beast::executor_type<stream>>
717 , public asio::coroutine
718{
719 boost::weak_ptr<impl_type> wp_;
720 DynamicBuffer& b_;
721 std::size_t limit_;
722 std::size_t bytes_written_ = 0;
723 bool some_;
724
725public:
726 template<class Handler_>
727 read_op(
728 Handler_&& h,
729 boost::shared_ptr<impl_type> const& sp,
730 DynamicBuffer& b,
731 std::size_t limit,
732 bool some)
733 : async_base<Handler,
734 beast::executor_type<stream>>(
735 std::forward<Handler_>(h),
736 sp->stream().get_executor())
737 , wp_(sp)
738 , b_(b)
739 , limit_(limit ? limit : (
740 std::numeric_limits<std::size_t>::max)())
741 , some_(some)
742 {
743 (*this)({}, 0, false);
744 }
745
746 void operator()(
747 error_code ec = {},
748 std::size_t bytes_transferred = 0,
749 bool cont = true)
750 {
751 using beast::detail::clamp;
752 auto sp = wp_.lock();
753 if(! sp)
754 {
755 ec = net::error::operation_aborted;
756 bytes_written_ = 0;
757 return this->complete(cont, ec, bytes_written_);
758 }
759 auto& impl = *sp;
760 using mutable_buffers_type = typename
761 DynamicBuffer::mutable_buffers_type;
762 BOOST_ASIO_CORO_REENTER(*this)
763 {
764 do
765 {
766 // VFALCO TODO use boost::beast::bind_continuation
767 BOOST_ASIO_CORO_YIELD
768 {
769 auto mb = beast::detail::dynamic_buffer_prepare(b_,
770 clamp(impl.read_size_hint_db(b_), limit_),
771 ec, error::buffer_overflow);
772 if(impl.check_stop_now(ec))
773 goto upcall;
20effc67
TL
774
775 BOOST_ASIO_HANDLER_LOCATION((
776 __FILE__, __LINE__,
777 "websocket::async_read"));
778
92f5a8d4
TL
779 read_some_op<read_op, mutable_buffers_type>(
780 std::move(*this), sp, *mb);
781 }
782
783 b_.commit(bytes_transferred);
784 bytes_written_ += bytes_transferred;
785 if(ec)
786 goto upcall;
787 }
788 while(! some_ && ! impl.rd_done);
789
790 upcall:
791 this->complete(cont, ec, bytes_written_);
792 }
793 }
794};
795
796template<class NextLayer, bool deflateSupported>
797struct stream<NextLayer, deflateSupported>::
798 run_read_some_op
799{
800 template<
801 class ReadHandler,
802 class MutableBufferSequence>
803 void
804 operator()(
805 ReadHandler&& h,
806 boost::shared_ptr<impl_type> const& sp,
807 MutableBufferSequence const& b)
808 {
809 // If you get an error on the following line it means
810 // that your handler does not meet the documented type
811 // requirements for the handler.
812
813 static_assert(
814 beast::detail::is_invocable<ReadHandler,
815 void(error_code, std::size_t)>::value,
816 "ReadHandler type requirements not met");
817
818 read_some_op<
819 typename std::decay<ReadHandler>::type,
820 MutableBufferSequence>(
821 std::forward<ReadHandler>(h),
822 sp,
823 b);
824 }
825};
826
827template<class NextLayer, bool deflateSupported>
828struct stream<NextLayer, deflateSupported>::
829 run_read_op
830{
831 template<
832 class ReadHandler,
833 class DynamicBuffer>
834 void
835 operator()(
836 ReadHandler&& h,
837 boost::shared_ptr<impl_type> const& sp,
838 DynamicBuffer* b,
839 std::size_t limit,
840 bool some)
841 {
842 // If you get an error on the following line it means
843 // that your handler does not meet the documented type
844 // requirements for the handler.
845
846 static_assert(
847 beast::detail::is_invocable<ReadHandler,
848 void(error_code, std::size_t)>::value,
849 "ReadHandler type requirements not met");
850
851 read_op<
852 typename std::decay<ReadHandler>::type,
853 DynamicBuffer>(
854 std::forward<ReadHandler>(h),
855 sp,
856 *b,
857 limit,
858 some);
859 }
860};
861
862//------------------------------------------------------------------------------
863
864template<class NextLayer, bool deflateSupported>
865template<class DynamicBuffer>
866std::size_t
867stream<NextLayer, deflateSupported>::
868read(DynamicBuffer& buffer)
869{
870 static_assert(is_sync_stream<next_layer_type>::value,
871 "SyncStream type requirements not met");
872 static_assert(
873 net::is_dynamic_buffer<DynamicBuffer>::value,
874 "DynamicBuffer type requirements not met");
875 error_code ec;
876 auto const bytes_written = read(buffer, ec);
877 if(ec)
878 BOOST_THROW_EXCEPTION(system_error{ec});
879 return bytes_written;
880}
881
882template<class NextLayer, bool deflateSupported>
883template<class DynamicBuffer>
884std::size_t
885stream<NextLayer, deflateSupported>::
886read(DynamicBuffer& buffer, error_code& ec)
887{
888 static_assert(is_sync_stream<next_layer_type>::value,
889 "SyncStream type requirements not met");
890 static_assert(
891 net::is_dynamic_buffer<DynamicBuffer>::value,
892 "DynamicBuffer type requirements not met");
893 std::size_t bytes_written = 0;
894 do
895 {
896 bytes_written += read_some(buffer, 0, ec);
897 if(ec)
898 return bytes_written;
899 }
900 while(! is_message_done());
901 return bytes_written;
902}
903
904template<class NextLayer, bool deflateSupported>
20effc67 905template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
92f5a8d4
TL
906BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
907stream<NextLayer, deflateSupported>::
908async_read(DynamicBuffer& buffer, ReadHandler&& handler)
909{
910 static_assert(is_async_stream<next_layer_type>::value,
911 "AsyncStream type requirements not met");
912 static_assert(
913 net::is_dynamic_buffer<DynamicBuffer>::value,
914 "DynamicBuffer type requirements not met");
915 return net::async_initiate<
916 ReadHandler,
917 void(error_code, std::size_t)>(
918 run_read_op{},
919 handler,
920 impl_,
921 &buffer,
922 0,
923 false);
924}
925
926//------------------------------------------------------------------------------
927
928template<class NextLayer, bool deflateSupported>
929template<class DynamicBuffer>
930std::size_t
931stream<NextLayer, deflateSupported>::
932read_some(
933 DynamicBuffer& buffer,
934 std::size_t limit)
935{
936 static_assert(is_sync_stream<next_layer_type>::value,
937 "SyncStream type requirements not met");
938 static_assert(
939 net::is_dynamic_buffer<DynamicBuffer>::value,
940 "DynamicBuffer type requirements not met");
941 error_code ec;
942 auto const bytes_written =
943 read_some(buffer, limit, ec);
944 if(ec)
945 BOOST_THROW_EXCEPTION(system_error{ec});
946 return bytes_written;
947}
948
949template<class NextLayer, bool deflateSupported>
950template<class DynamicBuffer>
951std::size_t
952stream<NextLayer, deflateSupported>::
953read_some(
954 DynamicBuffer& buffer,
955 std::size_t limit,
956 error_code& ec)
957{
958 static_assert(is_sync_stream<next_layer_type>::value,
959 "SyncStream type requirements not met");
960 static_assert(
961 net::is_dynamic_buffer<DynamicBuffer>::value,
962 "DynamicBuffer type requirements not met");
963 using beast::detail::clamp;
964 if(! limit)
965 limit = (std::numeric_limits<std::size_t>::max)();
966 auto const size =
967 clamp(read_size_hint(buffer), limit);
968 BOOST_ASSERT(size > 0);
969 auto mb = beast::detail::dynamic_buffer_prepare(
970 buffer, size, ec, error::buffer_overflow);
971 if(impl_->check_stop_now(ec))
972 return 0;
973 auto const bytes_written = read_some(*mb, ec);
974 buffer.commit(bytes_written);
975 return bytes_written;
976}
977
978template<class NextLayer, bool deflateSupported>
20effc67 979template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
92f5a8d4
TL
980BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
981stream<NextLayer, deflateSupported>::
982async_read_some(
983 DynamicBuffer& buffer,
984 std::size_t limit,
985 ReadHandler&& handler)
986{
987 static_assert(is_async_stream<next_layer_type>::value,
988 "AsyncStream type requirements not met");
989 static_assert(
990 net::is_dynamic_buffer<DynamicBuffer>::value,
991 "DynamicBuffer type requirements not met");
992 return net::async_initiate<
993 ReadHandler,
994 void(error_code, std::size_t)>(
995 run_read_op{},
996 handler,
997 impl_,
998 &buffer,
999 limit,
1000 true);
1001}
1002
1003//------------------------------------------------------------------------------
1004
1005template<class NextLayer, bool deflateSupported>
1006template<class MutableBufferSequence>
1007std::size_t
1008stream<NextLayer, deflateSupported>::
1009read_some(
1010 MutableBufferSequence const& buffers)
1011{
1012 static_assert(is_sync_stream<next_layer_type>::value,
1013 "SyncStream type requirements not met");
1014 static_assert(net::is_mutable_buffer_sequence<
1015 MutableBufferSequence>::value,
1016 "MutableBufferSequence type requirements not met");
1017 error_code ec;
1018 auto const bytes_written = read_some(buffers, ec);
1019 if(ec)
1020 BOOST_THROW_EXCEPTION(system_error{ec});
1021 return bytes_written;
1022}
1023
1024template<class NextLayer, bool deflateSupported>
1025template<class MutableBufferSequence>
1026std::size_t
1027stream<NextLayer, deflateSupported>::
1028read_some(
1029 MutableBufferSequence const& buffers,
1030 error_code& ec)
1031{
1032 static_assert(is_sync_stream<next_layer_type>::value,
1033 "SyncStream type requirements not met");
1034 static_assert(net::is_mutable_buffer_sequence<
1035 MutableBufferSequence>::value,
1036 "MutableBufferSequence type requirements not met");
1037 using beast::detail::clamp;
1038 auto& impl = *impl_;
1039 close_code code{};
1040 std::size_t bytes_written = 0;
1041 ec = {};
1042 // Make sure the stream is open
1043 if(impl.check_stop_now(ec))
1044 return bytes_written;
1045loop:
1046 // See if we need to read a frame header. This
1047 // condition is structured to give the decompressor
1048 // a chance to emit the final empty deflate block
1049 //
1050 if(impl.rd_remain == 0 && (
1051 ! impl.rd_fh.fin || impl.rd_done))
1052 {
1053 // Read frame header
1054 error_code result;
1055 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1056 {
1057 if(result)
1058 {
1059 // _Fail the WebSocket Connection_
1060 if(result == error::message_too_big)
1061 code = close_code::too_big;
1062 else
1063 code = close_code::protocol_error;
1064 do_fail(code, result, ec);
1065 return bytes_written;
1066 }
1067 auto const bytes_transferred =
1068 impl.stream().read_some(
1069 impl.rd_buf.prepare(read_size(
1070 impl.rd_buf, impl.rd_buf.max_size())),
1071 ec);
1072 impl.rd_buf.commit(bytes_transferred);
1073 if(impl.check_stop_now(ec))
1074 return bytes_written;
1075 }
1076 // Immediately apply the mask to the portion
1077 // of the buffer holding payload data.
1078 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1079 detail::mask_inplace(buffers_prefix(
1080 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1081 impl.rd_key);
1082 if(detail::is_control(impl.rd_fh.op))
1083 {
1084 // Get control frame payload
1085 auto const b = buffers_prefix(
1086 clamp(impl.rd_fh.len), impl.rd_buf.data());
1087 auto const len = buffer_bytes(b);
1088 BOOST_ASSERT(len == impl.rd_fh.len);
1089
1090 // Clear this otherwise the next
1091 // frame will be considered final.
1092 impl.rd_fh.fin = false;
1093
1094 // Handle ping frame
1095 if(impl.rd_fh.op == detail::opcode::ping)
1096 {
1097 ping_data payload;
1098 detail::read_ping(payload, b);
1099 impl.rd_buf.consume(len);
1100 if(impl.wr_close)
1101 {
1102 // Ignore ping when closing
1103 goto loop;
1104 }
1105 if(impl.ctrl_cb)
1106 impl.ctrl_cb(frame_type::ping, payload);
1107 detail::frame_buffer fb;
1108 impl.template write_ping<flat_static_buffer_base>(fb,
1109 detail::opcode::pong, payload);
1110 net::write(impl.stream(), fb.data(), ec);
1111 if(impl.check_stop_now(ec))
1112 return bytes_written;
1113 goto loop;
1114 }
1115 // Handle pong frame
1116 if(impl.rd_fh.op == detail::opcode::pong)
1117 {
1118 ping_data payload;
1119 detail::read_ping(payload, b);
1120 impl.rd_buf.consume(len);
1121 if(impl.ctrl_cb)
1122 impl.ctrl_cb(frame_type::pong, payload);
1123 goto loop;
1124 }
1125 // Handle close frame
1126 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1127 {
1128 BOOST_ASSERT(! impl.rd_close);
1129 impl.rd_close = true;
1130 close_reason cr;
1131 detail::read_close(cr, b, result);
1132 if(result)
1133 {
1134 // _Fail the WebSocket Connection_
1135 do_fail(close_code::protocol_error,
1136 result, ec);
1137 return bytes_written;
1138 }
1139 impl.cr = cr;
1140 impl.rd_buf.consume(len);
1141 if(impl.ctrl_cb)
1142 impl.ctrl_cb(frame_type::close, impl.cr.reason);
1143 BOOST_ASSERT(! impl.wr_close);
1144 // _Start the WebSocket Closing Handshake_
1145 do_fail(
1146 cr.code == close_code::none ?
1147 close_code::normal :
1148 static_cast<close_code>(cr.code),
1149 error::closed, ec);
1150 return bytes_written;
1151 }
1152 }
1153 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1154 {
1155 // Empty non-final frame
1156 goto loop;
1157 }
1158 impl.rd_done = false;
1159 }
1160 else
1161 {
1162 ec = {};
1163 }
1164 if(! impl.rd_deflated())
1165 {
1166 if(impl.rd_remain > 0)
1167 {
1168 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1169 (std::min)(clamp(impl.rd_remain),
1170 buffer_bytes(buffers)))
1171 {
1172 // Fill the read buffer first, otherwise we
1173 // get fewer bytes at the cost of one I/O.
1174 impl.rd_buf.commit(impl.stream().read_some(
1175 impl.rd_buf.prepare(read_size(impl.rd_buf,
1176 impl.rd_buf.max_size())), ec));
1177 if(impl.check_stop_now(ec))
1178 return bytes_written;
1179 if(impl.rd_fh.mask)
1180 detail::mask_inplace(
1181 buffers_prefix(clamp(impl.rd_remain),
1182 impl.rd_buf.data()), impl.rd_key);
1183 }
1184 if(impl.rd_buf.size() > 0)
1185 {
1186 // Copy from the read buffer.
1187 // The mask was already applied.
1188 auto const bytes_transferred = net::buffer_copy(
1189 buffers, impl.rd_buf.data(),
1190 clamp(impl.rd_remain));
1191 auto const mb = buffers_prefix(
1192 bytes_transferred, buffers);
1193 impl.rd_remain -= bytes_transferred;
1194 if(impl.rd_op == detail::opcode::text)
1195 {
1196 if(! impl.rd_utf8.write(mb) ||
1197 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1198 ! impl.rd_utf8.finish()))
1199 {
1200 // _Fail the WebSocket Connection_
1201 do_fail(close_code::bad_payload,
1202 error::bad_frame_payload, ec);
1203 return bytes_written;
1204 }
1205 }
1206 bytes_written += bytes_transferred;
1207 impl.rd_size += bytes_transferred;
1208 impl.rd_buf.consume(bytes_transferred);
1209 }
1210 else
1211 {
1212 // Read into caller's buffer
1213 BOOST_ASSERT(impl.rd_remain > 0);
1214 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1215 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1216 clamp(impl.rd_remain), buffers)) > 0);
1217 auto const bytes_transferred =
1218 impl.stream().read_some(buffers_prefix(
1219 clamp(impl.rd_remain), buffers), ec);
1220 // VFALCO What if some bytes were written?
1221 if(impl.check_stop_now(ec))
1222 return bytes_written;
1223 BOOST_ASSERT(bytes_transferred > 0);
1224 auto const mb = buffers_prefix(
1225 bytes_transferred, buffers);
1226 impl.rd_remain -= bytes_transferred;
1227 if(impl.rd_fh.mask)
1228 detail::mask_inplace(mb, impl.rd_key);
1229 if(impl.rd_op == detail::opcode::text)
1230 {
1231 if(! impl.rd_utf8.write(mb) ||
1232 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1233 ! impl.rd_utf8.finish()))
1234 {
1235 // _Fail the WebSocket Connection_
1236 do_fail(close_code::bad_payload,
1237 error::bad_frame_payload, ec);
1238 return bytes_written;
1239 }
1240 }
1241 bytes_written += bytes_transferred;
1242 impl.rd_size += bytes_transferred;
1243 }
1244 }
f67539c2
TL
1245 BOOST_ASSERT( ! impl.rd_done );
1246 if( impl.rd_remain == 0 && impl.rd_fh.fin )
1247 impl.rd_done = true;
92f5a8d4
TL
1248 }
1249 else
1250 {
1251 // Read compressed message frame payload:
1252 // inflate even if rd_fh_.len == 0, otherwise we
1253 // never emit the end-of-stream deflate block.
1254 //
1255 bool did_read = false;
1256 buffers_suffix<MutableBufferSequence> cb(buffers);
1257 while(buffer_bytes(cb) > 0)
1258 {
1259 zlib::z_params zs;
1260 {
1261 auto const out = beast::buffers_front(cb);
1262 zs.next_out = out.data();
1263 zs.avail_out = out.size();
1264 BOOST_ASSERT(zs.avail_out > 0);
1265 }
1e59de90
TL
1266 // boolean to track the end of the message.
1267 bool fin = false;
92f5a8d4
TL
1268 if(impl.rd_remain > 0)
1269 {
1270 if(impl.rd_buf.size() > 0)
1271 {
1272 // use what's there
1273 auto const in = buffers_prefix(
1274 clamp(impl.rd_remain), beast::buffers_front(
1275 impl.rd_buf.data()));
1276 zs.avail_in = in.size();
1277 zs.next_in = in.data();
1278 }
1279 else if(! did_read)
1280 {
1281 // read new
1282 auto const bytes_transferred =
1283 impl.stream().read_some(
1284 impl.rd_buf.prepare(read_size(
1285 impl.rd_buf, impl.rd_buf.max_size())),
1286 ec);
1287 if(impl.check_stop_now(ec))
1288 return bytes_written;
1289 BOOST_ASSERT(bytes_transferred > 0);
1290 impl.rd_buf.commit(bytes_transferred);
1291 if(impl.rd_fh.mask)
1292 detail::mask_inplace(
1293 buffers_prefix(clamp(impl.rd_remain),
1294 impl.rd_buf.data()), impl.rd_key);
1295 auto const in = buffers_prefix(
1296 clamp(impl.rd_remain), buffers_front(
1297 impl.rd_buf.data()));
1298 zs.avail_in = in.size();
1299 zs.next_in = in.data();
1300 did_read = true;
1301 }
1302 else
1303 {
1304 break;
1305 }
1306 }
1307 else if(impl.rd_fh.fin)
1308 {
1309 // append the empty block codes
1310 static std::uint8_t constexpr
1e59de90 1311 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
92f5a8d4
TL
1312 zs.next_in = empty_block;
1313 zs.avail_in = sizeof(empty_block);
1e59de90 1314 fin = true;
92f5a8d4
TL
1315 }
1316 else
1317 {
1318 break;
1319 }
1320 impl.inflate(zs, zlib::Flush::sync, ec);
1321 if(impl.check_stop_now(ec))
1322 return bytes_written;
1e59de90
TL
1323 if (fin && zs.total_out == 0) {
1324 impl.do_context_takeover_read(impl.role);
1325 impl.rd_done = true;
1326 break;
1327 }
92f5a8d4
TL
1328 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1329 impl.rd_size, zs.total_out, impl.rd_msg_max))
1330 {
1331 do_fail(close_code::too_big,
1332 error::message_too_big, ec);
1333 return bytes_written;
1334 }
1335 cb.consume(zs.total_out);
1336 impl.rd_size += zs.total_out;
1e59de90
TL
1337 if (! fin) {
1338 impl.rd_remain -= zs.total_in;
1339 impl.rd_buf.consume(zs.total_in);
1340 }
92f5a8d4
TL
1341 bytes_written += zs.total_out;
1342 }
1343 if(impl.rd_op == detail::opcode::text)
1344 {
1345 // check utf8
1346 if(! impl.rd_utf8.write(beast::buffers_prefix(
1347 bytes_written, buffers)) || (
1348 impl.rd_done && ! impl.rd_utf8.finish()))
1349 {
1350 // _Fail the WebSocket Connection_
1351 do_fail(close_code::bad_payload,
1352 error::bad_frame_payload, ec);
1353 return bytes_written;
1354 }
1355 }
1356 }
1357 return bytes_written;
1358}
1359
1360template<class NextLayer, bool deflateSupported>
20effc67 1361template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
92f5a8d4
TL
1362BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1363stream<NextLayer, deflateSupported>::
1364async_read_some(
1365 MutableBufferSequence const& buffers,
1366 ReadHandler&& handler)
1367{
1368 static_assert(is_async_stream<next_layer_type>::value,
1369 "AsyncStream type requirements not met");
1370 static_assert(net::is_mutable_buffer_sequence<
1371 MutableBufferSequence>::value,
1372 "MutableBufferSequence type requirements not met");
1373 return net::async_initiate<
1374 ReadHandler,
1375 void(error_code, std::size_t)>(
1376 run_read_some_op{},
1377 handler,
1378 impl_,
1379 buffers);
1380}
1381
1382} // websocket
1383} // beast
1384} // boost
1385
1386#endif