2 // Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BEAST_WEBSOCKET_IMPL_WRITE_IPP
9 #define BEAST_WEBSOCKET_IMPL_WRITE_IPP
11 #include <beast/core/bind_handler.hpp>
12 #include <beast/core/buffer_cat.hpp>
13 #include <beast/core/buffer_concepts.hpp>
14 #include <beast/core/consuming_buffers.hpp>
15 #include <beast/core/handler_helpers.hpp>
16 #include <beast/core/handler_ptr.hpp>
17 #include <beast/core/prepare_buffers.hpp>
18 #include <beast/core/static_streambuf.hpp>
19 #include <beast/core/stream_concepts.hpp>
20 #include <beast/core/detail/clamp.hpp>
21 #include <beast/websocket/detail/frame.hpp>
22 #include <boost/assert.hpp>
29 template<class NextLayer>
30 template<class Buffers, class Handler>
31 class stream<NextLayer>::write_frame_op
37 stream<NextLayer>& ws;
38 consuming_buffers<Buffers> cb;
40 detail::frame_header fh;
41 detail::fh_streambuf fh_buf;
42 detail::prepared_key key;
47 data(Handler& handler_, stream<NextLayer>& ws_,
48 bool fin_, Buffers const& bs)
50 , cont(beast_asio_helpers::
51 is_continuation(handler))
59 handler_ptr<data, Handler> d_;
62 write_frame_op(write_frame_op&&) = default;
63 write_frame_op(write_frame_op const&) = default;
65 template<class DeducedHandler, class... Args>
66 write_frame_op(DeducedHandler&& h,
67 stream<NextLayer>& ws, Args&&... args)
68 : d_(std::forward<DeducedHandler>(h),
69 ws, std::forward<Args>(args)...)
71 (*this)(error_code{}, 0, false);
76 (*this)(error_code{}, 0, true);
79 void operator()(error_code const& ec)
84 void operator()(error_code ec,
85 std::size_t bytes_transferred);
87 void operator()(error_code ec,
88 std::size_t bytes_transferred, bool again);
91 void* asio_handler_allocate(
92 std::size_t size, write_frame_op* op)
94 return beast_asio_helpers::
95 allocate(size, op->d_.handler());
99 void asio_handler_deallocate(
100 void* p, std::size_t size, write_frame_op* op)
102 return beast_asio_helpers::
103 deallocate(p, size, op->d_.handler());
107 bool asio_handler_is_continuation(write_frame_op* op)
112 template<class Function>
114 void asio_handler_invoke(Function&& f, write_frame_op* op)
116 return beast_asio_helpers::
117 invoke(f, op->d_.handler());
121 template<class NextLayer>
122 template<class Buffers, class Handler>
125 write_frame_op<Buffers, Handler>::
126 operator()(error_code ec, std::size_t bytes_transferred)
131 (*this)(ec, bytes_transferred, true);
134 template<class NextLayer>
135 template<class Buffers, class Handler>
138 write_frame_op<Buffers, Handler>::
139 operator()(error_code ec,
140 std::size_t bytes_transferred, bool again)
142 using beast::detail::clamp;
143 using boost::asio::buffer;
144 using boost::asio::buffer_copy;
145 using boost::asio::buffer_size;
149 do_nomask_nofrag = 20,
154 do_maybe_suspend = 80,
158 d.cont = d.cont || again;
169 d.fh.rsv1 = d.ws.wr_.compress;
177 d.fh.op = d.ws.wr_.cont ?
178 opcode::cont : d.ws.wr_opcode_;
180 d.ws.role_ == detail::role_type::client;
182 // entry_state determines which algorithm
183 // we will use to send. If we suspend, we
184 // will transition to entry_state + 1 on
186 if(d.ws.wr_.compress)
188 d.entry_state = do_deflate;
192 if(! d.ws.wr_.autofrag)
194 d.entry_state = do_nomask_nofrag;
198 BOOST_ASSERT(d.ws.wr_.buf_size != 0);
199 d.remain = buffer_size(d.cb);
200 if(d.remain > d.ws.wr_.buf_size)
201 d.entry_state = do_nomask_frag;
203 d.entry_state = do_nomask_nofrag;
208 if(! d.ws.wr_.autofrag)
210 d.entry_state = do_mask_nofrag;
214 BOOST_ASSERT(d.ws.wr_.buf_size != 0);
215 d.remain = buffer_size(d.cb);
216 if(d.remain > d.ws.wr_.buf_size)
217 d.entry_state = do_mask_frag;
219 d.entry_state = do_mask_nofrag;
222 d.state = do_maybe_suspend;
225 //----------------------------------------------------------------------
227 case do_nomask_nofrag:
228 BOOST_ASSERT(! d.ws.wr_block_);
232 case do_nomask_nofrag + 1:
234 BOOST_ASSERT(d.ws.wr_block_ == &d);
236 d.fh.len = buffer_size(d.cb);
237 detail::write<static_streambuf>(
239 d.ws.wr_.cont = ! d.fin;
242 boost::asio::async_write(d.ws.stream_,
243 buffer_cat(d.fh_buf.data(), d.cb),
248 //----------------------------------------------------------------------
251 BOOST_ASSERT(! d.ws.wr_block_);
255 case do_nomask_frag + 1:
257 BOOST_ASSERT(d.ws.wr_block_ == &d);
258 auto const n = clamp(
259 d.remain, d.ws.wr_.buf_size);
262 d.fh.fin = d.fin ? d.remain == 0 : false;
263 detail::write<static_streambuf>(
265 d.ws.wr_.cont = ! d.fin;
267 d.state = d.remain == 0 ?
268 do_upcall : do_nomask_frag + 2;
269 boost::asio::async_write(d.ws.stream_,
270 buffer_cat(d.fh_buf.data(),
271 prepare_buffers(n, d.cb)),
276 case do_nomask_frag + 2:
278 bytes_transferred - d.fh_buf.size());
280 d.fh.op = opcode::cont;
281 if(d.ws.wr_block_ == &d)
282 d.ws.wr_block_ = nullptr;
283 // Allow outgoing control frames to
284 // be sent in between message frames:
285 if(d.ws.rd_op_.maybe_invoke() ||
286 d.ws.ping_op_.maybe_invoke())
288 d.state = do_maybe_suspend;
289 d.ws.get_io_service().post(
293 d.state = d.entry_state;
296 //----------------------------------------------------------------------
299 BOOST_ASSERT(! d.ws.wr_block_);
303 case do_mask_nofrag + 1:
305 BOOST_ASSERT(d.ws.wr_block_ == &d);
306 d.remain = buffer_size(d.cb);
309 d.fh.key = d.ws.maskgen_();
310 detail::prepare_key(d.key, d.fh.key);
311 detail::write<static_streambuf>(
314 clamp(d.remain, d.ws.wr_.buf_size);
316 buffer(d.ws.wr_.buf.get(), n);
317 buffer_copy(b, d.cb);
318 detail::mask_inplace(b, d.key);
320 d.ws.wr_.cont = ! d.fin;
321 // Send frame header and partial payload
322 d.state = d.remain == 0 ?
323 do_upcall : do_mask_nofrag + 2;
324 boost::asio::async_write(d.ws.stream_,
325 buffer_cat(d.fh_buf.data(), b),
330 case do_mask_nofrag + 2:
332 d.cb.consume(d.ws.wr_.buf_size);
334 clamp(d.remain, d.ws.wr_.buf_size);
336 buffer(d.ws.wr_.buf.get(), n);
337 buffer_copy(b, d.cb);
338 detail::mask_inplace(b, d.key);
340 // Send parial payload
343 boost::asio::async_write(
344 d.ws.stream_, b, std::move(*this));
348 //----------------------------------------------------------------------
351 BOOST_ASSERT(! d.ws.wr_block_);
355 case do_mask_frag + 1:
357 BOOST_ASSERT(d.ws.wr_block_ == &d);
358 auto const n = clamp(
359 d.remain, d.ws.wr_.buf_size);
362 d.fh.key = d.ws.maskgen_();
363 d.fh.fin = d.fin ? d.remain == 0 : false;
364 detail::prepare_key(d.key, d.fh.key);
365 auto const b = buffer(
366 d.ws.wr_.buf.get(), n);
367 buffer_copy(b, d.cb);
368 detail::mask_inplace(b, d.key);
369 detail::write<static_streambuf>(
371 d.ws.wr_.cont = ! d.fin;
373 d.state = d.remain == 0 ?
374 do_upcall : do_mask_frag + 2;
375 boost::asio::async_write(d.ws.stream_,
376 buffer_cat(d.fh_buf.data(), b),
381 case do_mask_frag + 2:
383 bytes_transferred - d.fh_buf.size());
385 d.fh.op = opcode::cont;
386 BOOST_ASSERT(d.ws.wr_block_ == &d);
387 d.ws.wr_block_ = nullptr;
388 // Allow outgoing control frames to
389 // be sent in between message frames:
390 if(d.ws.rd_op_.maybe_invoke() ||
391 d.ws.ping_op_.maybe_invoke())
393 d.state = do_maybe_suspend;
394 d.ws.get_io_service().post(
398 d.state = d.entry_state;
401 //----------------------------------------------------------------------
404 BOOST_ASSERT(! d.ws.wr_block_);
410 BOOST_ASSERT(d.ws.wr_block_ == &d);
411 auto b = buffer(d.ws.wr_.buf.get(),
413 auto const more = detail::deflate(
414 d.ws.pmd_->zo, b, d.cb, d.fin, ec);
415 d.ws.failed_ = ec != 0;
418 auto const n = buffer_size(b);
421 // The input was consumed, but there
422 // is no output due to compression
424 BOOST_ASSERT(! d.fin);
425 BOOST_ASSERT(buffer_size(d.cb) == 0);
427 // We can skip the dispatch if the
428 // asynchronous initiation function is
429 // not on call stack but its hard to
430 // figure out so be safe and dispatch.
432 d.ws.get_io_service().post(std::move(*this));
437 d.fh.key = d.ws.maskgen_();
438 detail::prepared_key key;
439 detail::prepare_key(key, d.fh.key);
440 detail::mask_inplace(b, key);
444 detail::fh_streambuf fh_buf;
445 detail::write<static_streambuf>(fh_buf, d.fh);
446 d.ws.wr_.cont = ! d.fin;
449 do_deflate + 2 : do_deflate + 3;
450 boost::asio::async_write(d.ws.stream_,
451 buffer_cat(fh_buf.data(), b),
457 d.fh.op = opcode::cont;
459 BOOST_ASSERT(d.ws.wr_block_ == &d);
460 d.ws.wr_block_ = nullptr;
461 // Allow outgoing control frames to
462 // be sent in between message frames:
463 if(d.ws.rd_op_.maybe_invoke() ||
464 d.ws.ping_op_.maybe_invoke())
466 d.state = do_maybe_suspend;
467 d.ws.get_io_service().post(
471 d.state = d.entry_state;
476 (d.ws.role_ == detail::role_type::client &&
477 d.ws.pmd_config_.client_no_context_takeover) ||
478 (d.ws.role_ == detail::role_type::server &&
479 d.ws.pmd_config_.server_no_context_takeover)))
480 d.ws.pmd_->zo.reset();
483 //----------------------------------------------------------------------
485 case do_maybe_suspend:
490 d.state = do_maybe_suspend + 1;
491 d.ws.wr_op_.template emplace<
492 write_frame_op>(std::move(*this));
495 if(d.ws.failed_ || d.ws.wr_close_)
499 d.ws.get_io_service().post(
500 bind_handler(std::move(*this),
501 boost::asio::error::operation_aborted));
504 d.state = d.entry_state;
508 case do_maybe_suspend + 1:
509 BOOST_ASSERT(! d.ws.wr_block_);
511 d.state = do_maybe_suspend + 2;
512 // The current context is safe but might not be
513 // the same as the one for this operation (since
514 // we are being called from a write operation).
515 // Call post to make sure we are invoked the same
516 // way as the final handler for this operation.
517 d.ws.get_io_service().post(bind_handler(
518 std::move(*this), ec));
521 case do_maybe_suspend + 2:
522 BOOST_ASSERT(d.ws.wr_block_ == &d);
523 if(d.ws.failed_ || d.ws.wr_close_)
526 ec = boost::asio::error::operation_aborted;
529 d.state = d.entry_state + 1;
532 //----------------------------------------------------------------------
539 if(d.ws.wr_block_ == &d)
540 d.ws.wr_block_ = nullptr;
541 d.ws.rd_op_.maybe_invoke() ||
542 d.ws.ping_op_.maybe_invoke();
546 template<class NextLayer>
547 template<class ConstBufferSequence, class WriteHandler>
548 typename async_completion<
549 WriteHandler, void(error_code)>::result_type
551 async_write_frame(bool fin,
552 ConstBufferSequence const& bs, WriteHandler&& handler)
554 static_assert(is_AsyncStream<next_layer_type>::value,
555 "AsyncStream requirements not met");
556 static_assert(beast::is_ConstBufferSequence<
557 ConstBufferSequence>::value,
558 "ConstBufferSequence requirements not met");
559 beast::async_completion<
560 WriteHandler, void(error_code)
561 > completion{handler};
562 write_frame_op<ConstBufferSequence, decltype(
563 completion.handler)>{completion.handler,
565 return completion.result.get();
568 template<class NextLayer>
569 template<class ConstBufferSequence>
572 write_frame(bool fin, ConstBufferSequence const& buffers)
574 static_assert(is_SyncStream<next_layer_type>::value,
575 "SyncStream requirements not met");
576 static_assert(beast::is_ConstBufferSequence<
577 ConstBufferSequence>::value,
578 "ConstBufferSequence requirements not met");
580 write_frame(fin, buffers, ec);
582 throw system_error{ec};
585 template<class NextLayer>
586 template<class ConstBufferSequence>
589 write_frame(bool fin,
590 ConstBufferSequence const& buffers, error_code& ec)
592 static_assert(is_SyncStream<next_layer_type>::value,
593 "SyncStream requirements not met");
594 static_assert(beast::is_ConstBufferSequence<
595 ConstBufferSequence>::value,
596 "ConstBufferSequence requirements not met");
597 using beast::detail::clamp;
598 using boost::asio::buffer;
599 using boost::asio::buffer_copy;
600 using boost::asio::buffer_size;
601 detail::frame_header fh;
605 fh.rsv1 = wr_.compress;
613 fh.op = wr_.cont ? opcode::cont : wr_opcode_;
614 fh.mask = role_ == detail::role_type::client;
615 auto remain = buffer_size(buffers);
619 ConstBufferSequence> cb{buffers};
623 wr_.buf.get(), wr_.buf_size);
624 auto const more = detail::deflate(
625 pmd_->zo, b, cb, fin, ec);
629 auto const n = buffer_size(b);
632 // The input was consumed, but there
633 // is no output due to compression
636 BOOST_ASSERT(buffer_size(cb) == 0);
643 detail::prepared_key key;
644 detail::prepare_key(key, fh.key);
645 detail::mask_inplace(b, key);
649 detail::fh_streambuf fh_buf;
650 detail::write<static_streambuf>(fh_buf, fh);
652 boost::asio::write(stream_,
653 buffer_cat(fh_buf.data(), b), ec);
659 fh.op = opcode::cont;
663 (role_ == detail::role_type::client &&
664 pmd_config_.client_no_context_takeover) ||
665 (role_ == detail::role_type::server &&
666 pmd_config_.server_no_context_takeover)))
674 // no mask, no autofrag
677 detail::fh_streambuf fh_buf;
678 detail::write<static_streambuf>(fh_buf, fh);
680 boost::asio::write(stream_,
681 buffer_cat(fh_buf.data(), buffers), ec);
689 BOOST_ASSERT(wr_.buf_size != 0);
691 ConstBufferSequence> cb{buffers};
694 auto const n = clamp(remain, wr_.buf_size);
697 fh.fin = fin ? remain == 0 : false;
698 detail::fh_streambuf fh_buf;
699 detail::write<static_streambuf>(fh_buf, fh);
701 boost::asio::write(stream_,
702 buffer_cat(fh_buf.data(),
703 prepare_buffers(n, cb)), ec);
709 fh.op = opcode::cont;
721 detail::prepared_key key;
722 detail::prepare_key(key, fh.key);
723 detail::fh_streambuf fh_buf;
724 detail::write<static_streambuf>(fh_buf, fh);
726 ConstBufferSequence> cb{buffers};
728 auto const n = clamp(remain, wr_.buf_size);
729 auto const b = buffer(wr_.buf.get(), n);
733 detail::mask_inplace(b, key);
735 boost::asio::write(stream_,
736 buffer_cat(fh_buf.data(), b), ec);
743 auto const n = clamp(remain, wr_.buf_size);
744 auto const b = buffer(wr_.buf.get(), n);
748 detail::mask_inplace(b, key);
749 boost::asio::write(stream_, b, ec);
758 BOOST_ASSERT(wr_.buf_size != 0);
760 ConstBufferSequence> cb{buffers};
764 detail::prepared_key key;
765 detail::prepare_key(key, fh.key);
766 auto const n = clamp(remain, wr_.buf_size);
767 auto const b = buffer(wr_.buf.get(), n);
769 detail::mask_inplace(b, key);
772 fh.fin = fin ? remain == 0 : false;
774 detail::fh_streambuf fh_buf;
775 detail::write<static_streambuf>(fh_buf, fh);
776 boost::asio::write(stream_,
777 buffer_cat(fh_buf.data(), b), ec);
783 fh.op = opcode::cont;
790 //------------------------------------------------------------------------------
792 template<class NextLayer>
793 template<class Buffers, class Handler>
794 class stream<NextLayer>::write_op
799 stream<NextLayer>& ws;
800 consuming_buffers<Buffers> cb;
804 data(Handler& handler, stream<NextLayer>& ws_,
806 : cont(beast_asio_helpers::
807 is_continuation(handler))
810 , remain(boost::asio::buffer_size(cb))
815 handler_ptr<data, Handler> d_;
818 write_op(write_op&&) = default;
819 write_op(write_op const&) = default;
821 template<class DeducedHandler, class... Args>
823 write_op(DeducedHandler&& h,
824 stream<NextLayer>& ws, Args&&... args)
825 : d_(std::forward<DeducedHandler>(h),
826 ws, std::forward<Args>(args)...)
828 (*this)(error_code{}, false);
831 void operator()(error_code ec, bool again = true);
834 void* asio_handler_allocate(
835 std::size_t size, write_op* op)
837 return beast_asio_helpers::
838 allocate(size, op->d_.handler());
842 void asio_handler_deallocate(
843 void* p, std::size_t size, write_op* op)
845 return beast_asio_helpers::
846 deallocate(p, size, op->d_.handler());
850 bool asio_handler_is_continuation(write_op* op)
855 template<class Function>
857 void asio_handler_invoke(Function&& f, write_op* op)
859 return beast_asio_helpers::
860 invoke(f, op->d_.handler());
864 template<class NextLayer>
865 template<class Buffers, class Handler>
868 write_op<Buffers, Handler>::
869 operator()(error_code ec, bool again)
872 d.cont = d.cont || again;
879 auto const n = d.remain;
881 auto const fin = d.remain <= 0;
884 auto const pb = prepare_buffers(n, d.cb);
886 d.ws.async_write_frame(fin, pb, std::move(*this));
897 template<class NextLayer>
898 template<class ConstBufferSequence, class WriteHandler>
899 typename async_completion<
900 WriteHandler, void(error_code)>::result_type
902 async_write(ConstBufferSequence const& bs, WriteHandler&& handler)
904 static_assert(is_AsyncStream<next_layer_type>::value,
905 "AsyncStream requirements not met");
906 static_assert(beast::is_ConstBufferSequence<
907 ConstBufferSequence>::value,
908 "ConstBufferSequence requirements not met");
909 beast::async_completion<
910 WriteHandler, void(error_code)> completion{handler};
911 write_op<ConstBufferSequence, decltype(completion.handler)>{
912 completion.handler, *this, bs};
913 return completion.result.get();
916 template<class NextLayer>
917 template<class ConstBufferSequence>
920 write(ConstBufferSequence const& buffers)
922 static_assert(is_SyncStream<next_layer_type>::value,
923 "SyncStream requirements not met");
924 static_assert(beast::is_ConstBufferSequence<
925 ConstBufferSequence>::value,
926 "ConstBufferSequence requirements not met");
930 throw system_error{ec};
933 template<class NextLayer>
934 template<class ConstBufferSequence>
937 write(ConstBufferSequence const& buffers, error_code& ec)
939 static_assert(is_SyncStream<next_layer_type>::value,
940 "SyncStream requirements not met");
941 static_assert(beast::is_ConstBufferSequence<
942 ConstBufferSequence>::value,
943 "ConstBufferSequence requirements not met");
944 write_frame(true, buffers, ec);