]> git.proxmox.com Git - ceph.git/blame - ceph/src/Beast/include/beast/websocket/impl/read.ipp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / Beast / include / beast / websocket / impl / read.ipp
CommitLineData
7c673cae
FG
1//
2// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BEAST_WEBSOCKET_IMPL_READ_IPP
9#define BEAST_WEBSOCKET_IMPL_READ_IPP
10
11#include <beast/websocket/teardown.hpp>
12#include <beast/core/buffer_concepts.hpp>
13#include <beast/core/handler_helpers.hpp>
14#include <beast/core/handler_ptr.hpp>
15#include <beast/core/prepare_buffers.hpp>
16#include <beast/core/static_streambuf.hpp>
17#include <beast/core/stream_concepts.hpp>
18#include <beast/core/detail/clamp.hpp>
19#include <boost/assert.hpp>
20#include <boost/optional.hpp>
21#include <limits>
22#include <memory>
23
24namespace beast {
25namespace websocket {
26
27//------------------------------------------------------------------------------
28
29// Reads a single message frame,
30// processes any received control frames.
31//
32template<class NextLayer>
33template<class DynamicBuffer, class Handler>
34class stream<NextLayer>::read_frame_op
35{
36 using fb_type =
37 detail::frame_streambuf;
38
39 using fmb_type =
40 typename fb_type::mutable_buffers_type;
41
42 using dmb_type =
43 typename DynamicBuffer::mutable_buffers_type;
44
45 struct data : op
46 {
47 bool cont;
48 stream<NextLayer>& ws;
49 frame_info& fi;
50 DynamicBuffer& db;
51 fb_type fb;
52 std::uint64_t remain;
53 detail::frame_header fh;
54 detail::prepared_key key;
55 boost::optional<dmb_type> dmb;
56 boost::optional<fmb_type> fmb;
57 int state = 0;
58
59 data(Handler& handler, stream<NextLayer>& ws_,
60 frame_info& fi_, DynamicBuffer& sb_)
61 : cont(beast_asio_helpers::
62 is_continuation(handler))
63 , ws(ws_)
64 , fi(fi_)
65 , db(sb_)
66 {
67 }
68 };
69
70 handler_ptr<data, Handler> d_;
71
72public:
73 read_frame_op(read_frame_op&&) = default;
74 read_frame_op(read_frame_op const&) = default;
75
76 template<class DeducedHandler, class... Args>
77 read_frame_op(DeducedHandler&& h,
78 stream<NextLayer>& ws, Args&&... args)
79 : d_(std::forward<DeducedHandler>(h),
80 ws, std::forward<Args>(args)...)
81 {
82 (*this)(error_code{}, 0, false);
83 }
84
85 void operator()()
86 {
87 (*this)(error_code{}, 0, true);
88 }
89
90 void operator()(error_code const& ec)
91 {
92 (*this)(ec, 0, true);
93 }
94
95 void operator()(error_code ec,
96 std::size_t bytes_transferred);
97
98 void operator()(error_code ec,
99 std::size_t bytes_transferred, bool again);
100
101 friend
102 void* asio_handler_allocate(
103 std::size_t size, read_frame_op* op)
104 {
105 return beast_asio_helpers::
106 allocate(size, op->d_.handler());
107 }
108
109 friend
110 void asio_handler_deallocate(
111 void* p, std::size_t size, read_frame_op* op)
112 {
113 return beast_asio_helpers::
114 deallocate(p, size, op->d_.handler());
115 }
116
117 friend
118 bool asio_handler_is_continuation(read_frame_op* op)
119 {
120 return op->d_->cont;
121 }
122
123 template<class Function>
124 friend
125 void asio_handler_invoke(Function&& f, read_frame_op* op)
126 {
127 return beast_asio_helpers::
128 invoke(f, op->d_.handler());
129 }
130};
131
132template<class NextLayer>
133template<class DynamicBuffer, class Handler>
134void
135stream<NextLayer>::read_frame_op<DynamicBuffer, Handler>::
136operator()(error_code ec, std::size_t bytes_transferred)
137{
138 auto& d = *d_;
139 if(ec)
140 d.ws.failed_ = true;
141 (*this)(ec, bytes_transferred, true);
142}
143
144template<class NextLayer>
145template<class DynamicBuffer, class Handler>
146void
147stream<NextLayer>::read_frame_op<DynamicBuffer, Handler>::
148operator()(error_code ec,
149 std::size_t bytes_transferred, bool again)
150{
151 using beast::detail::clamp;
152 using boost::asio::buffer;
153 enum
154 {
155 do_start = 0,
156 do_read_payload = 1,
157 do_inflate_payload = 30,
158 do_frame_done = 4,
159 do_read_fh = 5,
160 do_control_payload = 8,
161 do_control = 9,
162 do_pong_resume = 10,
163 do_pong = 12,
164 do_close_resume = 14,
165 do_close = 16,
166 do_teardown = 17,
167 do_fail = 19,
168
169 do_call_handler = 99
170 };
171
172 auto& d = *d_;
173 if(! ec)
174 {
175 d.cont = d.cont || again;
176 close_code code = close_code::none;
177 do
178 {
179 switch(d.state)
180 {
181 case do_start:
182 if(d.ws.failed_)
183 {
184 d.state = do_call_handler;
185 d.ws.get_io_service().post(
186 bind_handler(std::move(*this),
187 boost::asio::error::operation_aborted, 0));
188 return;
189 }
190 d.state = do_read_fh;
191 break;
192
193 //------------------------------------------------------------------
194
195 case do_read_payload:
196 if(d.fh.len == 0)
197 {
198 d.state = do_frame_done;
199 break;
200 }
201 // Enforce message size limit
202 if(d.ws.rd_msg_max_ && d.fh.len >
203 d.ws.rd_msg_max_ - d.ws.rd_.size)
204 {
205 code = close_code::too_big;
206 d.state = do_fail;
207 break;
208 }
209 d.ws.rd_.size += d.fh.len;
210 d.remain = d.fh.len;
211 if(d.fh.mask)
212 detail::prepare_key(d.key, d.fh.key);
213 // fall through
214
215 case do_read_payload + 1:
216 d.state = do_read_payload + 2;
217 d.dmb = d.db.prepare(clamp(d.remain));
218 // Read frame payload data
219 d.ws.stream_.async_read_some(
220 *d.dmb, std::move(*this));
221 return;
222
223 case do_read_payload + 2:
224 {
225 d.remain -= bytes_transferred;
226 auto const pb = prepare_buffers(
227 bytes_transferred, *d.dmb);
228 if(d.fh.mask)
229 detail::mask_inplace(pb, d.key);
230 if(d.ws.rd_.op == opcode::text)
231 {
232 if(! d.ws.rd_.utf8.write(pb) ||
233 (d.remain == 0 && d.fh.fin &&
234 ! d.ws.rd_.utf8.finish()))
235 {
236 // invalid utf8
237 code = close_code::bad_payload;
238 d.state = do_fail;
239 break;
240 }
241 }
242 d.db.commit(bytes_transferred);
243 if(d.remain > 0)
244 {
245 d.state = do_read_payload + 1;
246 break;
247 }
248 d.state = do_frame_done;
249 break;
250 }
251
252 //------------------------------------------------------------------
253
254 case do_inflate_payload:
255 d.remain = d.fh.len;
256 if(d.fh.len == 0)
257 {
258 // inflate even if fh.len == 0, otherwise we
259 // never emit the end-of-stream deflate block.
260 bytes_transferred = 0;
261 d.state = do_inflate_payload + 2;
262 break;
263 }
264 if(d.fh.mask)
265 detail::prepare_key(d.key, d.fh.key);
266 // fall through
267
268 case do_inflate_payload + 1:
269 {
270 d.state = do_inflate_payload + 2;
271 // Read compressed frame payload data
272 d.ws.stream_.async_read_some(
273 buffer(d.ws.rd_.buf.get(), clamp(
274 d.remain, d.ws.rd_.buf_size)),
275 std::move(*this));
276 return;
277 }
278
279 case do_inflate_payload + 2:
280 {
281 d.remain -= bytes_transferred;
282 auto const in = buffer(
283 d.ws.rd_.buf.get(), bytes_transferred);
284 if(d.fh.mask)
285 detail::mask_inplace(in, d.key);
286 auto const prev = d.db.size();
287 detail::inflate(d.ws.pmd_->zi, d.db, in, ec);
288 d.ws.failed_ = ec != 0;
289 if(d.ws.failed_)
290 break;
291 if(d.remain == 0 && d.fh.fin)
292 {
293 static std::uint8_t constexpr
294 empty_block[4] = {
295 0x00, 0x00, 0xff, 0xff };
296 detail::inflate(d.ws.pmd_->zi, d.db,
297 buffer(&empty_block[0], 4), ec);
298 d.ws.failed_ = ec != 0;
299 if(d.ws.failed_)
300 break;
301 }
302 if(d.ws.rd_.op == opcode::text)
303 {
304 consuming_buffers<typename
305 DynamicBuffer::const_buffers_type
306 > cb{d.db.data()};
307 cb.consume(prev);
308 if(! d.ws.rd_.utf8.write(cb) ||
309 (d.remain == 0 && d.fh.fin &&
310 ! d.ws.rd_.utf8.finish()))
311 {
312 // invalid utf8
313 code = close_code::bad_payload;
314 d.state = do_fail;
315 break;
316 }
317 }
318 if(d.remain > 0)
319 {
320 d.state = do_inflate_payload + 1;
321 break;
322 }
323 if(d.fh.fin && (
324 (d.ws.role_ == detail::role_type::client &&
325 d.ws.pmd_config_.server_no_context_takeover) ||
326 (d.ws.role_ == detail::role_type::server &&
327 d.ws.pmd_config_.client_no_context_takeover)))
328 d.ws.pmd_->zi.reset();
329 d.state = do_frame_done;
330 break;
331 }
332
333 //------------------------------------------------------------------
334
335 case do_frame_done:
336 // call handler
337 d.fi.op = d.ws.rd_.op;
338 d.fi.fin = d.fh.fin;
339 goto upcall;
340
341 //------------------------------------------------------------------
342
343 case do_read_fh:
344 d.state = do_read_fh + 1;
345 boost::asio::async_read(d.ws.stream_,
346 d.fb.prepare(2), std::move(*this));
347 return;
348
349 case do_read_fh + 1:
350 {
351 d.fb.commit(bytes_transferred);
352 code = close_code::none;
353 auto const n = d.ws.read_fh1(
354 d.fh, d.fb, code);
355 if(code != close_code::none)
356 {
357 // protocol error
358 d.state = do_fail;
359 break;
360 }
361 d.state = do_read_fh + 2;
362 if(n == 0)
363 {
364 bytes_transferred = 0;
365 break;
366 }
367 // read variable header
368 boost::asio::async_read(d.ws.stream_,
369 d.fb.prepare(n), std::move(*this));
370 return;
371 }
372
373 case do_read_fh + 2:
374 d.fb.commit(bytes_transferred);
375 code = close_code::none;
376 d.ws.read_fh2(d.fh, d.fb, code);
377 if(code != close_code::none)
378 {
379 // protocol error
380 d.state = do_fail;
381 break;
382 }
383 if(detail::is_control(d.fh.op))
384 {
385 if(d.fh.len > 0)
386 {
387 // read control payload
388 d.state = do_control_payload;
389 d.fmb = d.fb.prepare(static_cast<
390 std::size_t>(d.fh.len));
391 boost::asio::async_read(d.ws.stream_,
392 *d.fmb, std::move(*this));
393 return;
394 }
395 d.state = do_control;
396 break;
397 }
398 if(d.fh.op == opcode::text ||
399 d.fh.op == opcode::binary)
400 d.ws.rd_begin();
401 if(d.fh.len == 0 && ! d.fh.fin)
402 {
403 // Empty message frame
404 d.state = do_frame_done;
405 break;
406 }
407 if(! d.ws.pmd_ || ! d.ws.pmd_->rd_set)
408 d.state = do_read_payload;
409 else
410 d.state = do_inflate_payload;
411 break;
412
413 //------------------------------------------------------------------
414
415 case do_control_payload:
416 if(d.fh.mask)
417 {
418 detail::prepare_key(d.key, d.fh.key);
419 detail::mask_inplace(*d.fmb, d.key);
420 }
421 d.fb.commit(bytes_transferred);
422 d.state = do_control; // VFALCO fall through?
423 break;
424
425 //------------------------------------------------------------------
426
427 case do_control:
428 if(d.fh.op == opcode::ping)
429 {
430 ping_data payload;
431 detail::read(payload, d.fb.data());
432 d.fb.reset();
433 if(d.ws.ping_cb_)
434 d.ws.ping_cb_(false, payload);
435 if(d.ws.wr_close_)
436 {
437 // ignore ping when closing
438 d.state = do_read_fh;
439 break;
440 }
441 d.ws.template write_ping<static_streambuf>(
442 d.fb, opcode::pong, payload);
443 if(d.ws.wr_block_)
444 {
445 // suspend
446 d.state = do_pong_resume;
447 BOOST_ASSERT(d.ws.wr_block_ != &d);
448 d.ws.rd_op_.template emplace<
449 read_frame_op>(std::move(*this));
450 return;
451 }
452 d.state = do_pong;
453 break;
454 }
455 else if(d.fh.op == opcode::pong)
456 {
457 code = close_code::none;
458 ping_data payload;
459 detail::read(payload, d.fb.data());
460 if(d.ws.ping_cb_)
461 d.ws.ping_cb_(true, payload);
462 d.fb.reset();
463 d.state = do_read_fh;
464 break;
465 }
466 BOOST_ASSERT(d.fh.op == opcode::close);
467 {
468 detail::read(d.ws.cr_, d.fb.data(), code);
469 if(code != close_code::none)
470 {
471 // protocol error
472 d.state = do_fail;
473 break;
474 }
475 if(! d.ws.wr_close_)
476 {
477 auto cr = d.ws.cr_;
478 if(cr.code == close_code::none)
479 cr.code = close_code::normal;
480 cr.reason = "";
481 d.fb.reset();
482 d.ws.template write_close<
483 static_streambuf>(d.fb, cr);
484 if(d.ws.wr_block_)
485 {
486 // suspend
487 d.state = do_close_resume;
488 d.ws.rd_op_.template emplace<
489 read_frame_op>(std::move(*this));
490 return;
491 }
492 d.state = do_close;
493 break;
494 }
495 d.state = do_teardown;
496 break;
497 }
498
499 //------------------------------------------------------------------
500
501 case do_pong_resume:
502 BOOST_ASSERT(! d.ws.wr_block_);
503 d.ws.wr_block_ = &d;
504 d.state = do_pong_resume + 1;
505 d.ws.get_io_service().post(bind_handler(
506 std::move(*this), ec, bytes_transferred));
507 return;
508
509 case do_pong_resume + 1:
510 if(d.ws.failed_)
511 {
512 // call handler
513 ec = boost::asio::error::operation_aborted;
514 goto upcall;
515 }
516 // [[fallthrough]]
517
518 //------------------------------------------------------------------
519
520 case do_pong:
521 if(d.ws.wr_close_)
522 {
523 // ignore ping when closing
524 if(d.ws.wr_block_)
525 {
526 BOOST_ASSERT(d.ws.wr_block_ == &d);
527 d.ws.wr_block_ = nullptr;
528 }
529 d.fb.reset();
530 d.state = do_read_fh;
531 break;
532 }
533 // send pong
534 if(! d.ws.wr_block_)
535 d.ws.wr_block_ = &d;
536 else
537 BOOST_ASSERT(d.ws.wr_block_ == &d);
538 d.state = do_pong + 1;
539 boost::asio::async_write(d.ws.stream_,
540 d.fb.data(), std::move(*this));
541 return;
542
543 case do_pong + 1:
544 d.fb.reset();
545 d.state = do_read_fh;
546 d.ws.wr_block_ = nullptr;
547 break;
548
549 //------------------------------------------------------------------
550
551 case do_close_resume:
552 BOOST_ASSERT(! d.ws.wr_block_);
553 d.ws.wr_block_ = &d;
554 d.state = do_close_resume + 1;
555 // The current context is safe but might not be
556 // the same as the one for this operation (since
557 // we are being called from a write operation).
558 // Call post to make sure we are invoked the same
559 // way as the final handler for this operation.
560 d.ws.get_io_service().post(bind_handler(
561 std::move(*this), ec, bytes_transferred));
562 return;
563
564 case do_close_resume + 1:
565 BOOST_ASSERT(d.ws.wr_block_ == &d);
566 if(d.ws.failed_)
567 {
568 // call handler
569 ec = boost::asio::error::operation_aborted;
570 goto upcall;
571 }
572 if(d.ws.wr_close_)
573 {
574 // call handler
575 ec = error::closed;
576 goto upcall;
577 }
578 d.state = do_close;
579 break;
580
581 //------------------------------------------------------------------
582
583 case do_close:
584 if(! d.ws.wr_block_)
585 d.ws.wr_block_ = &d;
586 else
587 BOOST_ASSERT(d.ws.wr_block_ == &d);
588 d.state = do_teardown;
589 d.ws.wr_close_ = true;
590 boost::asio::async_write(d.ws.stream_,
591 d.fb.data(), std::move(*this));
592 return;
593
594 //------------------------------------------------------------------
595
596 case do_teardown:
597 d.state = do_teardown + 1;
598 websocket_helpers::call_async_teardown(
599 d.ws.next_layer(), std::move(*this));
600 return;
601
602 case do_teardown + 1:
603 // call handler
604 ec = error::closed;
605 goto upcall;
606
607 //------------------------------------------------------------------
608
609 case do_fail:
610 if(d.ws.wr_close_)
611 {
612 d.state = do_fail + 4;
613 break;
614 }
615 d.fb.reset();
616 d.ws.template write_close<
617 static_streambuf>(d.fb, code);
618 if(d.ws.wr_block_)
619 {
620 // suspend
621 d.state = do_fail + 2;
622 d.ws.rd_op_.template emplace<
623 read_frame_op>(std::move(*this));
624 return;
625 }
626 // fall through
627
628 case do_fail + 1:
629 d.ws.failed_ = true;
630 // send close frame
631 d.state = do_fail + 4;
632 d.ws.wr_close_ = true;
633 BOOST_ASSERT(! d.ws.wr_block_);
634 d.ws.wr_block_ = &d;
635 boost::asio::async_write(d.ws.stream_,
636 d.fb.data(), std::move(*this));
637 return;
638
639 case do_fail + 2:
640 d.state = do_fail + 3;
641 d.ws.get_io_service().post(bind_handler(
642 std::move(*this), ec, bytes_transferred));
643 return;
644
645 case do_fail + 3:
646 if(d.ws.failed_)
647 {
648 d.state = do_fail + 5;
649 break;
650 }
651 d.state = do_fail + 1;
652 break;
653
654 case do_fail + 4:
655 d.state = do_fail + 5;
656 websocket_helpers::call_async_teardown(
657 d.ws.next_layer(), std::move(*this));
658 return;
659
660 case do_fail + 5:
661 // call handler
662 ec = error::failed;
663 goto upcall;
664
665 //------------------------------------------------------------------
666
667 case do_call_handler:
668 goto upcall;
669 }
670 }
671 while(! ec);
672 }
673upcall:
674 if(d.ws.wr_block_ == &d)
675 d.ws.wr_block_ = nullptr;
676 d.ws.ping_op_.maybe_invoke() ||
677 d.ws.wr_op_.maybe_invoke();
678 d_.invoke(ec);
679}
680
681template<class NextLayer>
682template<class DynamicBuffer, class ReadHandler>
683typename async_completion<
684 ReadHandler, void(error_code)>::result_type
685stream<NextLayer>::
686async_read_frame(frame_info& fi,
687 DynamicBuffer& dynabuf, ReadHandler&& handler)
688{
689 static_assert(is_AsyncStream<next_layer_type>::value,
690 "AsyncStream requirements requirements not met");
691 static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
692 "DynamicBuffer requirements not met");
693 beast::async_completion<
694 ReadHandler, void(error_code)> completion{handler};
695 read_frame_op<DynamicBuffer, decltype(completion.handler)>{
696 completion.handler, *this, fi, dynabuf};
697 return completion.result.get();
698}
699
700template<class NextLayer>
701template<class DynamicBuffer>
702void
703stream<NextLayer>::
704read_frame(frame_info& fi, DynamicBuffer& dynabuf)
705{
706 static_assert(is_SyncStream<next_layer_type>::value,
707 "SyncStream requirements not met");
708 static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
709 "DynamicBuffer requirements not met");
710 error_code ec;
711 read_frame(fi, dynabuf, ec);
712 if(ec)
713 throw system_error{ec};
714}
715
716template<class NextLayer>
717template<class DynamicBuffer>
718void
719stream<NextLayer>::
720read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
721{
722 static_assert(is_SyncStream<next_layer_type>::value,
723 "SyncStream requirements not met");
724 static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
725 "DynamicBuffer requirements not met");
726 using beast::detail::clamp;
727 using boost::asio::buffer;
728 using boost::asio::buffer_cast;
729 using boost::asio::buffer_size;
730 close_code code{};
731 for(;;)
732 {
733 // Read frame header
734 detail::frame_header fh;
735 detail::frame_streambuf fb;
736 {
737 fb.commit(boost::asio::read(
738 stream_, fb.prepare(2), ec));
739 failed_ = ec != 0;
740 if(failed_)
741 return;
742 {
743 auto const n = read_fh1(fh, fb, code);
744 if(code != close_code::none)
745 goto do_close;
746 if(n > 0)
747 {
748 fb.commit(boost::asio::read(
749 stream_, fb.prepare(n), ec));
750 failed_ = ec != 0;
751 if(failed_)
752 return;
753 }
754 }
755 read_fh2(fh, fb, code);
756
757 failed_ = ec != 0;
758 if(failed_)
759 return;
760 if(code != close_code::none)
761 goto do_close;
762 }
763 if(detail::is_control(fh.op))
764 {
765 // Read control frame payload
766 if(fh.len > 0)
767 {
768 auto const mb = fb.prepare(
769 static_cast<std::size_t>(fh.len));
770 fb.commit(boost::asio::read(stream_, mb, ec));
771 failed_ = ec != 0;
772 if(failed_)
773 return;
774 if(fh.mask)
775 {
776 detail::prepared_key key;
777 detail::prepare_key(key, fh.key);
778 detail::mask_inplace(mb, key);
779 }
780 fb.commit(static_cast<std::size_t>(fh.len));
781 }
782 // Process control frame
783 if(fh.op == opcode::ping)
784 {
785 ping_data payload;
786 detail::read(payload, fb.data());
787 fb.reset();
788 if(ping_cb_)
789 ping_cb_(false, payload);
790 write_ping<static_streambuf>(
791 fb, opcode::pong, payload);
792 boost::asio::write(stream_, fb.data(), ec);
793 failed_ = ec != 0;
794 if(failed_)
795 return;
796 continue;
797 }
798 else if(fh.op == opcode::pong)
799 {
800 ping_data payload;
801 detail::read(payload, fb.data());
802 if(ping_cb_)
803 ping_cb_(true, payload);
804 continue;
805 }
806 BOOST_ASSERT(fh.op == opcode::close);
807 {
808 detail::read(cr_, fb.data(), code);
809 if(code != close_code::none)
810 goto do_close;
811 if(! wr_close_)
812 {
813 auto cr = cr_;
814 if(cr.code == close_code::none)
815 cr.code = close_code::normal;
816 cr.reason = "";
817 fb.reset();
818 wr_close_ = true;
819 write_close<static_streambuf>(fb, cr);
820 boost::asio::write(stream_, fb.data(), ec);
821 failed_ = ec != 0;
822 if(failed_)
823 return;
824 }
825 goto do_close;
826 }
827 }
828 if(fh.op != opcode::cont)
829 rd_begin();
830 if(fh.len == 0 && ! fh.fin)
831 {
832 // empty frame
833 continue;
834 }
835 auto remain = fh.len;
836 detail::prepared_key key;
837 if(fh.mask)
838 detail::prepare_key(key, fh.key);
839 if(! pmd_ || ! pmd_->rd_set)
840 {
841 // Enforce message size limit
842 if(rd_msg_max_ && fh.len >
843 rd_msg_max_ - rd_.size)
844 {
845 code = close_code::too_big;
846 goto do_close;
847 }
848 rd_.size += fh.len;
849 // Read message frame payload
850 while(remain > 0)
851 {
852 auto b =
853 dynabuf.prepare(clamp(remain));
854 auto const bytes_transferred =
855 stream_.read_some(b, ec);
856 failed_ = ec != 0;
857 if(failed_)
858 return;
859 BOOST_ASSERT(bytes_transferred > 0);
860 remain -= bytes_transferred;
861 auto const pb = prepare_buffers(
862 bytes_transferred, b);
863 if(fh.mask)
864 detail::mask_inplace(pb, key);
865 if(rd_.op == opcode::text)
866 {
867 if(! rd_.utf8.write(pb) ||
868 (remain == 0 && fh.fin &&
869 ! rd_.utf8.finish()))
870 {
871 code = close_code::bad_payload;
872 goto do_close;
873 }
874 }
875 dynabuf.commit(bytes_transferred);
876 }
877 }
878 else
879 {
880 // Read compressed message frame payload:
881 // inflate even if fh.len == 0, otherwise we
882 // never emit the end-of-stream deflate block.
883 for(;;)
884 {
885 auto const bytes_transferred =
886 stream_.read_some(buffer(rd_.buf.get(),
887 clamp(remain, rd_.buf_size)), ec);
888 failed_ = ec != 0;
889 if(failed_)
890 return;
891 remain -= bytes_transferred;
892 auto const in = buffer(
893 rd_.buf.get(), bytes_transferred);
894 if(fh.mask)
895 detail::mask_inplace(in, key);
896 auto const prev = dynabuf.size();
897 detail::inflate(pmd_->zi, dynabuf, in, ec);
898 failed_ = ec != 0;
899 if(failed_)
900 return;
901 if(remain == 0 && fh.fin)
902 {
903 static std::uint8_t constexpr
904 empty_block[4] = {
905 0x00, 0x00, 0xff, 0xff };
906 detail::inflate(pmd_->zi, dynabuf,
907 buffer(&empty_block[0], 4), ec);
908 failed_ = ec != 0;
909 if(failed_)
910 return;
911 }
912 if(rd_.op == opcode::text)
913 {
914 consuming_buffers<typename
915 DynamicBuffer::const_buffers_type
916 > cb{dynabuf.data()};
917 cb.consume(prev);
918 if(! rd_.utf8.write(cb) || (
919 remain == 0 && fh.fin &&
920 ! rd_.utf8.finish()))
921 {
922 code = close_code::bad_payload;
923 goto do_close;
924 }
925 }
926 if(remain == 0)
927 break;
928 }
929 if(fh.fin && (
930 (role_ == detail::role_type::client &&
931 pmd_config_.server_no_context_takeover) ||
932 (role_ == detail::role_type::server &&
933 pmd_config_.client_no_context_takeover)))
934 pmd_->zi.reset();
935 }
936 fi.op = rd_.op;
937 fi.fin = fh.fin;
938 return;
939 }
940do_close:
941 if(code != close_code::none)
942 {
943 // Fail the connection (per rfc6455)
944 if(! wr_close_)
945 {
946 wr_close_ = true;
947 detail::frame_streambuf fb;
948 write_close<static_streambuf>(fb, code);
949 boost::asio::write(stream_, fb.data(), ec);
950 failed_ = ec != 0;
951 if(failed_)
952 return;
953 }
954 websocket_helpers::call_teardown(next_layer(), ec);
955 failed_ = ec != 0;
956 if(failed_)
957 return;
958 ec = error::failed;
959 failed_ = true;
960 return;
961 }
962 if(! ec)
963 websocket_helpers::call_teardown(next_layer(), ec);
964 if(! ec)
965 ec = error::closed;
966 failed_ = ec != 0;
967}
968
969//------------------------------------------------------------------------------
970
971// read an entire message
972//
973template<class NextLayer>
974template<class DynamicBuffer, class Handler>
975class stream<NextLayer>::read_op
976{
977 struct data
978 {
979 bool cont;
980 stream<NextLayer>& ws;
981 opcode& op;
982 DynamicBuffer& db;
983 frame_info fi;
984 int state = 0;
985
986 data(Handler& handler,
987 stream<NextLayer>& ws_, opcode& op_,
988 DynamicBuffer& sb_)
989 : cont(beast_asio_helpers::
990 is_continuation(handler))
991 , ws(ws_)
992 , op(op_)
993 , db(sb_)
994 {
995 }
996 };
997
998 handler_ptr<data, Handler> d_;
999
1000public:
1001 read_op(read_op&&) = default;
1002 read_op(read_op const&) = default;
1003
1004 template<class DeducedHandler, class... Args>
1005 read_op(DeducedHandler&& h,
1006 stream<NextLayer>& ws, Args&&... args)
1007 : d_(std::forward<DeducedHandler>(h),
1008 ws, std::forward<Args>(args)...)
1009 {
1010 (*this)(error_code{}, false);
1011 }
1012
1013 void operator()(
1014 error_code const& ec, bool again = true);
1015
1016 friend
1017 void* asio_handler_allocate(
1018 std::size_t size, read_op* op)
1019 {
1020 return beast_asio_helpers::
1021 allocate(size, op->d_.handler());
1022 }
1023
1024 friend
1025 void asio_handler_deallocate(
1026 void* p, std::size_t size, read_op* op)
1027 {
1028 return beast_asio_helpers::
1029 deallocate(p, size, op->d_.handler());
1030 }
1031
1032 friend
1033 bool asio_handler_is_continuation(read_op* op)
1034 {
1035 return op->d_->cont;
1036 }
1037
1038 template<class Function>
1039 friend
1040 void asio_handler_invoke(Function&& f, read_op* op)
1041 {
1042 return beast_asio_helpers::
1043 invoke(f, op->d_.handler());
1044 }
1045};
1046
1047template<class NextLayer>
1048template<class DynamicBuffer, class Handler>
1049void
1050stream<NextLayer>::read_op<DynamicBuffer, Handler>::
1051operator()(error_code const& ec, bool again)
1052{
1053 auto& d = *d_;
1054 d.cont = d.cont || again;
1055 while(! ec)
1056 {
1057 switch(d.state)
1058 {
1059 case 0:
1060 // read payload
1061 d.state = 1;
1062 d.ws.async_read_frame(
1063 d.fi, d.db, std::move(*this));
1064 return;
1065
1066 // got payload
1067 case 1:
1068 d.op = d.fi.op;
1069 if(d.fi.fin)
1070 goto upcall;
1071 d.state = 0;
1072 break;
1073 }
1074 }
1075upcall:
1076 d_.invoke(ec);
1077}
1078
1079template<class NextLayer>
1080template<class DynamicBuffer, class ReadHandler>
1081typename async_completion<
1082 ReadHandler, void(error_code)>::result_type
1083stream<NextLayer>::
1084async_read(opcode& op,
1085 DynamicBuffer& dynabuf, ReadHandler&& handler)
1086{
1087 static_assert(is_AsyncStream<next_layer_type>::value,
1088 "AsyncStream requirements requirements not met");
1089 static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
1090 "DynamicBuffer requirements not met");
1091 beast::async_completion<
1092 ReadHandler, void(error_code)
1093 > completion{handler};
1094 read_op<DynamicBuffer, decltype(completion.handler)>{
1095 completion.handler, *this, op, dynabuf};
1096 return completion.result.get();
1097}
1098
1099template<class NextLayer>
1100template<class DynamicBuffer>
1101void
1102stream<NextLayer>::
1103read(opcode& op, DynamicBuffer& dynabuf)
1104{
1105 static_assert(is_SyncStream<next_layer_type>::value,
1106 "SyncStream requirements not met");
1107 static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
1108 "DynamicBuffer requirements not met");
1109 error_code ec;
1110 read(op, dynabuf, ec);
1111 if(ec)
1112 throw system_error{ec};
1113}
1114
1115template<class NextLayer>
1116template<class DynamicBuffer>
1117void
1118stream<NextLayer>::
1119read(opcode& op, DynamicBuffer& dynabuf, error_code& ec)
1120{
1121 static_assert(is_SyncStream<next_layer_type>::value,
1122 "SyncStream requirements not met");
1123 static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
1124 "DynamicBuffer requirements not met");
1125 frame_info fi;
1126 for(;;)
1127 {
1128 read_frame(fi, dynabuf, ec);
1129 if(ec)
1130 break;
1131 op = fi.op;
1132 if(fi.fin)
1133 break;
1134 }
1135}
1136
1137//------------------------------------------------------------------------------
1138
1139} // websocket
1140} // beast
1141
1142#endif