2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
11 #define BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
13 #include <boost/beast/core/async_base.hpp>
14 #include <boost/beast/core/buffer_traits.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/websocket/teardown.hpp>
17 #include <boost/asio/coroutine.hpp>
18 #include <boost/assert.hpp>
19 #include <boost/make_shared.hpp>
20 #include <boost/core/exchange.hpp>
22 #include <type_traits>
28 //------------------------------------------------------------------------------
30 template<class Protocol, class Executor, class RatePolicy>
31 template<class... Args>
32 basic_stream<Protocol, Executor, RatePolicy>::
34 impl_type(std::false_type, Args&&... args)
35 : socket(std::forward<Args>(args)...)
43 template<class Protocol, class Executor, class RatePolicy>
44 template<class RatePolicy_, class... Args>
45 basic_stream<Protocol, Executor, RatePolicy>::
47 impl_type(std::true_type,
48 RatePolicy_&& policy, Args&&... args)
49 : boost::empty_value<RatePolicy>(
50 boost::empty_init_t{},
51 std::forward<RatePolicy_>(policy))
52 , socket(std::forward<Args>(args)...)
60 template<class Protocol, class Executor, class RatePolicy>
61 template<class Executor2>
63 basic_stream<Protocol, Executor, RatePolicy>::
65 on_timer(Executor2 const& ex2)
67 BOOST_ASSERT(waiting > 0);
69 // the last waiter starts the new slice
73 // update the expiration time
74 BOOST_VERIFY(timer.expires_after(
75 std::chrono::seconds(1)) == 0);
77 rate_policy_access::on_timer(policy());
79 struct handler : boost::empty_value<Executor2>
81 boost::weak_ptr<impl_type> wp;
83 using executor_type = Executor2;
86 get_executor() const noexcept
93 boost::shared_ptr<impl_type> const& sp)
94 : boost::empty_value<Executor2>(
95 boost::empty_init_t{}, ex2)
101 operator()(error_code ec)
106 if(ec == net::error::operation_aborted)
111 sp->on_timer(this->get());
115 // wait on the timer again
117 timer.async_wait(handler(ex2, this->shared_from_this()));
120 template<class Protocol, class Executor, class RatePolicy>
122 basic_stream<Protocol, Executor, RatePolicy>::
126 // If assert goes off, it means that there are
127 // already read or write (or connect) operations
128 // outstanding, so there is nothing to apply
129 // the expiration time to!
131 BOOST_ASSERT(! read.pending || ! write.pending);
135 read.timer.expires_at(never()) == 0);
139 write.timer.expires_at(never()) == 0);
142 template<class Protocol, class Executor, class RatePolicy>
144 basic_stream<Protocol, Executor, RatePolicy>::
161 //------------------------------------------------------------------------------
163 template<class Protocol, class Executor, class RatePolicy>
164 template<class Executor2>
165 struct basic_stream<Protocol, Executor, RatePolicy>::
168 using executor_type = Executor2;
171 boost::weak_ptr<impl_type> wp;
175 executor_type get_executor() const noexcept
181 operator()(error_code ec)
184 if(ec == net::error::operation_aborted)
195 if(tick < state.tick)
197 BOOST_ASSERT(tick == state.tick);
200 BOOST_ASSERT(! state.timeout);
202 state.timeout = true;
206 //------------------------------------------------------------------------------
208 template<class Protocol, class Executor, class RatePolicy>
209 struct basic_stream<Protocol, Executor, RatePolicy>::ops
212 template<bool isRead, class Buffers, class Handler>
214 : public async_base<Handler, Executor>
215 , public boost::asio::coroutine
217 boost::shared_ptr<impl_type> impl_;
221 using is_read = std::integral_constant<bool, isRead>;
236 return rate_policy_access::
237 available_read_bytes(impl_->policy());
239 return rate_policy_access::
240 available_write_bytes(impl_->policy());
244 transfer_bytes(std::size_t n)
248 transfer_read_bytes(impl_->policy(), n);
251 transfer_write_bytes(impl_->policy(), n);
256 std::size_t amount, std::true_type)
258 impl_->socket.async_read_some(
259 beast::buffers_prefix(amount, b_),
265 std::size_t amount, std::false_type)
267 impl_->socket.async_write_some(
268 beast::buffers_prefix(amount, b_),
272 static bool never_pending_;
275 template<class Handler_>
280 : async_base<Handler, Executor>(
281 std::forward<Handler_>(h), s.get_executor())
286 if (buffer_bytes(b_) == 0 && state().pending)
289 // Corner case discovered in https://github.com/boostorg/beast/issues/2065
290 // Enclosing SSL stream wishes to complete a 0-length write early by
291 // executing a 0-length read against the underlying stream.
292 // This can occur even if an existing async_read is in progress.
293 // In this specific case, we will complete the async op with no error
294 // in order to prevent assertions and/or internal corruption of the basic_stream
295 this->complete(false, error_code(), 0);
299 pg_.assign(state().pending);
307 std::size_t bytes_transferred = 0)
309 BOOST_ASIO_CORO_REENTER(*this)
311 // handle empty buffers
312 if(detail::buffers_empty(b_))
314 // make sure we perform the no-op
315 BOOST_ASIO_CORO_YIELD
317 BOOST_ASIO_HANDLER_LOCATION((
319 (isRead ? "basic_stream::async_read_some"
320 : "basic_stream::async_write_some")));
322 async_perform(0, is_read{});
324 // apply the timeout manually, otherwise
325 // behavior varies across platforms.
326 if(state().timer.expiry() <= clock_type::now())
329 ec = beast::error::timeout;
334 // if a timeout is active, wait on the timer
335 if(state().timer.expiry() != never())
337 BOOST_ASIO_HANDLER_LOCATION((
339 (isRead ? "basic_stream::async_read_some"
340 : "basic_stream::async_write_some")));
342 state().timer.async_wait(
343 timeout_handler<decltype(this->get_executor())>{
347 this->get_executor()});
350 // check rate limit, maybe wait
352 amount = available_bytes();
356 BOOST_ASIO_CORO_YIELD
358 BOOST_ASIO_HANDLER_LOCATION((
360 (isRead ? "basic_stream::async_read_some"
361 : "basic_stream::async_write_some")));
363 impl_->timer.async_wait(std::move(*this));
367 // socket was closed, or a timeout
369 net::error::operation_aborted);
370 // timeout handler invoked?
373 // yes, socket already closed
374 ec = beast::error::timeout;
375 state().timeout = false;
379 impl_->on_timer(this->get_executor());
381 // Allow at least one byte, otherwise
382 // bytes_transferred could be 0.
383 amount = std::max<std::size_t>(
384 available_bytes(), 1);
387 BOOST_ASIO_CORO_YIELD
389 BOOST_ASIO_HANDLER_LOCATION((
391 (isRead ? "basic_stream::async_read_some"
392 : "basic_stream::async_write_some")));
394 async_perform(amount, is_read{});
397 if(state().timer.expiry() != never())
401 // try cancelling timer
403 state().timer.cancel();
406 // timeout handler invoked?
409 // yes, socket already closed
410 ec = beast::error::timeout;
411 state().timeout = false;
416 BOOST_ASSERT(n == 1);
417 BOOST_ASSERT(! state().timeout);
423 transfer_bytes(bytes_transferred);
424 this->complete_now(ec, bytes_transferred);
429 template<class Handler>
431 : public async_base<Handler, Executor>
433 boost::shared_ptr<impl_type> impl_;
444 template<class Handler_>
449 : async_base<Handler, Executor>(
450 std::forward<Handler_>(h), s.get_executor())
452 , pg0_(impl_->read.pending)
453 , pg1_(impl_->write.pending)
455 if(state().timer.expiry() != stream_base::never())
457 BOOST_ASIO_HANDLER_LOCATION((
459 "basic_stream::async_connect"));
461 impl_->write.timer.async_wait(
462 timeout_handler<decltype(this->get_executor())>{
466 this->get_executor()});
469 BOOST_ASIO_HANDLER_LOCATION((
471 "basic_stream::async_connect"));
473 impl_->socket.async_connect(
474 ep, std::move(*this));
475 // *this is now moved-from
479 class Endpoints, class Condition,
484 Endpoints const& eps,
485 Condition const& cond)
486 : async_base<Handler, Executor>(
487 std::forward<Handler_>(h), s.get_executor())
489 , pg0_(impl_->read.pending)
490 , pg1_(impl_->write.pending)
492 if(state().timer.expiry() != stream_base::never())
494 BOOST_ASIO_HANDLER_LOCATION((
496 "basic_stream::async_connect"));
498 impl_->write.timer.async_wait(
499 timeout_handler<decltype(this->get_executor())>{
503 this->get_executor()});
506 BOOST_ASIO_HANDLER_LOCATION((
508 "basic_stream::async_connect"));
510 net::async_connect(impl_->socket,
511 eps, cond, std::move(*this));
512 // *this is now moved-from
516 class Iterator, class Condition,
521 Iterator begin, Iterator end,
522 Condition const& cond)
523 : async_base<Handler, Executor>(
524 std::forward<Handler_>(h), s.get_executor())
526 , pg0_(impl_->read.pending)
527 , pg1_(impl_->write.pending)
529 if(state().timer.expiry() != stream_base::never())
531 BOOST_ASIO_HANDLER_LOCATION((
533 "basic_stream::async_connect"));
535 impl_->write.timer.async_wait(
536 timeout_handler<decltype(this->get_executor())>{
540 this->get_executor()});
543 BOOST_ASIO_HANDLER_LOCATION((
545 "basic_stream::async_connect"));
547 net::async_connect(impl_->socket,
548 begin, end, cond, std::move(*this));
549 // *this is now moved-from
552 template<class... Args>
554 operator()(error_code ec, Args&&... args)
556 if(state().timer.expiry() != stream_base::never())
560 // try cancelling timer
562 impl_->write.timer.cancel();
565 // timeout handler invoked?
568 // yes, socket already closed
569 ec = beast::error::timeout;
570 state().timeout = false;
575 BOOST_ASSERT(n == 1);
576 BOOST_ASSERT(! state().timeout);
582 this->complete_now(ec, std::forward<Args>(args)...);
588 template<class ReadHandler, class Buffers>
595 // If you get an error on the following line it means
596 // that your handler does not meet the documented type
597 // requirements for the handler.
600 detail::is_invocable<ReadHandler,
601 void(error_code, std::size_t)>::value,
602 "ReadHandler type requirements not met");
607 typename std::decay<ReadHandler>::type>(
608 std::forward<ReadHandler>(h), *s, b);
614 template<class WriteHandler, class Buffers>
621 // If you get an error on the following line it means
622 // that your handler does not meet the documented type
623 // requirements for the handler.
626 detail::is_invocable<WriteHandler,
627 void(error_code, std::size_t)>::value,
628 "WriteHandler type requirements not met");
633 typename std::decay<WriteHandler>::type>(
634 std::forward<WriteHandler>(h), *s, b);
638 struct run_connect_op
640 template<class ConnectHandler>
645 endpoint_type const& ep)
647 // If you get an error on the following line it means
648 // that your handler does not meet the documented type
649 // requirements for the handler.
652 detail::is_invocable<ConnectHandler,
653 void(error_code)>::value,
654 "ConnectHandler type requirements not met");
656 connect_op<typename std::decay<ConnectHandler>::type>(
657 std::forward<ConnectHandler>(h), *s, ep);
661 struct run_connect_range_op
664 class RangeConnectHandler,
665 class EndpointSequence,
669 RangeConnectHandler&& h,
671 EndpointSequence const& eps,
672 Condition const& cond)
674 // If you get an error on the following line it means
675 // that your handler does not meet the documented type
676 // requirements for the handler.
679 detail::is_invocable<RangeConnectHandler,
680 void(error_code, typename Protocol::endpoint)>::value,
681 "RangeConnectHandler type requirements not met");
683 connect_op<typename std::decay<RangeConnectHandler>::type>(
684 std::forward<RangeConnectHandler>(h), *s, eps, cond);
688 struct run_connect_iter_op
691 class IteratorConnectHandler,
696 IteratorConnectHandler&& h,
698 Iterator begin, Iterator end,
699 Condition const& cond)
701 // If you get an error on the following line it means
702 // that your handler does not meet the documented type
703 // requirements for the handler.
706 detail::is_invocable<IteratorConnectHandler,
707 void(error_code, Iterator)>::value,
708 "IteratorConnectHandler type requirements not met");
710 connect_op<typename std::decay<IteratorConnectHandler>::type>(
711 std::forward<IteratorConnectHandler>(h), *s, begin, end, cond);
717 //------------------------------------------------------------------------------
719 template<class Protocol, class Executor, class RatePolicy>
720 basic_stream<Protocol, Executor, RatePolicy>::
723 // the shared object can outlive *this,
724 // cancel any operations so the shared
725 // object is destroyed as soon as possible.
729 template<class Protocol, class Executor, class RatePolicy>
730 template<class Arg0, class... Args, class>
731 basic_stream<Protocol, Executor, RatePolicy>::
732 basic_stream(Arg0&& arg0, Args&&... args)
733 : impl_(boost::make_shared<impl_type>(
735 std::forward<Arg0>(arg0),
736 std::forward<Args>(args)...))
740 template<class Protocol, class Executor, class RatePolicy>
741 template<class RatePolicy_, class Arg0, class... Args, class>
742 basic_stream<Protocol, Executor, RatePolicy>::
744 RatePolicy_&& policy, Arg0&& arg0, Args&&... args)
745 : impl_(boost::make_shared<impl_type>(
747 std::forward<RatePolicy_>(policy),
748 std::forward<Arg0>(arg0),
749 std::forward<Args>(args)...))
753 template<class Protocol, class Executor, class RatePolicy>
754 basic_stream<Protocol, Executor, RatePolicy>::
755 basic_stream(basic_stream&& other)
756 : impl_(boost::make_shared<impl_type>(
757 std::move(*other.impl_)))
759 // Explainer: Asio's sockets provide the guarantee that a moved-from socket
760 // will be in a state as-if newly created. i.e.:
761 // * having the same (valid) executor
762 // * the socket shall not be open
763 // We provide the same guarantee by moving the impl rather than the pointer
764 // controlling its lifetime.
767 //------------------------------------------------------------------------------
769 template<class Protocol, class Executor, class RatePolicy>
771 basic_stream<Protocol, Executor, RatePolicy>::
776 return std::move(impl_->socket);
779 template<class Protocol, class Executor, class RatePolicy>
781 basic_stream<Protocol, Executor, RatePolicy>::
782 expires_after(net::steady_timer::duration expiry_time)
784 // If assert goes off, it means that there are
785 // already read or write (or connect) operations
786 // outstanding, so there is nothing to apply
787 // the expiration time to!
790 ! impl_->read.pending ||
791 ! impl_->write.pending);
793 if(! impl_->read.pending)
795 impl_->read.timer.expires_after(
798 if(! impl_->write.pending)
800 impl_->write.timer.expires_after(
804 template<class Protocol, class Executor, class RatePolicy>
806 basic_stream<Protocol, Executor, RatePolicy>::
808 net::steady_timer::time_point expiry_time)
810 // If assert goes off, it means that there are
811 // already read or write (or connect) operations
812 // outstanding, so there is nothing to apply
813 // the expiration time to!
816 ! impl_->read.pending ||
817 ! impl_->write.pending);
819 if(! impl_->read.pending)
821 impl_->read.timer.expires_at(
824 if(! impl_->write.pending)
826 impl_->write.timer.expires_at(
830 template<class Protocol, class Executor, class RatePolicy>
832 basic_stream<Protocol, Executor, RatePolicy>::
838 template<class Protocol, class Executor, class RatePolicy>
840 basic_stream<Protocol, Executor, RatePolicy>::
844 impl_->socket.cancel(ec);
845 impl_->timer.cancel();
848 template<class Protocol, class Executor, class RatePolicy>
850 basic_stream<Protocol, Executor, RatePolicy>::
856 //------------------------------------------------------------------------------
858 template<class Protocol, class Executor, class RatePolicy>
859 template<BOOST_BEAST_ASYNC_TPARAM1 ConnectHandler>
860 BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)
861 basic_stream<Protocol, Executor, RatePolicy>::
863 endpoint_type const& ep,
864 ConnectHandler&& handler)
866 return net::async_initiate<
869 typename ops::run_connect_op{},
875 template<class Protocol, class Executor, class RatePolicy>
877 class EndpointSequence,
878 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
880 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void(error_code, typename Protocol::endpoint))
881 basic_stream<Protocol, Executor, RatePolicy>::
883 EndpointSequence const& endpoints,
884 RangeConnectHandler&& handler)
886 return net::async_initiate<
888 void(error_code, typename Protocol::endpoint)>(
889 typename ops::run_connect_range_op{},
893 detail::any_endpoint{});
896 template<class Protocol, class Executor, class RatePolicy>
898 class EndpointSequence,
899 class ConnectCondition,
900 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
902 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void (error_code, typename Protocol::endpoint))
903 basic_stream<Protocol, Executor, RatePolicy>::
905 EndpointSequence const& endpoints,
906 ConnectCondition connect_condition,
907 RangeConnectHandler&& handler)
909 return net::async_initiate<
911 void(error_code, typename Protocol::endpoint)>(
912 typename ops::run_connect_range_op{},
919 template<class Protocol, class Executor, class RatePolicy>
922 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler>
923 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
924 basic_stream<Protocol, Executor, RatePolicy>::
926 Iterator begin, Iterator end,
927 IteratorConnectHandler&& handler)
929 return net::async_initiate<
930 IteratorConnectHandler,
931 void(error_code, Iterator)>(
932 typename ops::run_connect_iter_op{},
936 detail::any_endpoint{});
939 template<class Protocol, class Executor, class RatePolicy>
942 class ConnectCondition,
943 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler>
944 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
945 basic_stream<Protocol, Executor, RatePolicy>::
947 Iterator begin, Iterator end,
948 ConnectCondition connect_condition,
949 IteratorConnectHandler&& handler)
951 return net::async_initiate<
952 IteratorConnectHandler,
953 void(error_code, Iterator)>(
954 typename ops::run_connect_iter_op{},
961 //------------------------------------------------------------------------------
963 template<class Protocol, class Executor, class RatePolicy>
964 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
965 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
966 basic_stream<Protocol, Executor, RatePolicy>::
968 MutableBufferSequence const& buffers,
969 ReadHandler&& handler)
971 static_assert(net::is_mutable_buffer_sequence<
972 MutableBufferSequence>::value,
973 "MutableBufferSequence type requirements not met");
974 return net::async_initiate<
976 void(error_code, std::size_t)>(
977 typename ops::run_read_op{},
983 template<class Protocol, class Executor, class RatePolicy>
984 template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
985 BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
986 basic_stream<Protocol, Executor, RatePolicy>::
988 ConstBufferSequence const& buffers,
989 WriteHandler&& handler)
991 static_assert(net::is_const_buffer_sequence<
992 ConstBufferSequence>::value,
993 "ConstBufferSequence type requirements not met");
994 return net::async_initiate<
996 void(error_code, std::size_t)>(
997 typename ops::run_write_op{},
1003 //------------------------------------------------------------------------------
1005 // Customization points
1008 #if ! BOOST_BEAST_DOXYGEN
1011 class Protocol, class Executor, class RatePolicy>
1014 basic_stream<Protocol, Executor, RatePolicy>& stream)
1017 stream.socket().close(ec);
1021 class Protocol, class Executor, class RatePolicy>
1025 basic_stream<Protocol, Executor, RatePolicy>& stream,
1028 using beast::websocket::teardown;
1029 teardown(role, stream.socket(), ec);
1033 class Protocol, class Executor, class RatePolicy,
1034 class TeardownHandler>
1038 basic_stream<Protocol, Executor, RatePolicy>& stream,
1039 TeardownHandler&& handler)
1041 using beast::websocket::async_teardown;
1042 async_teardown(role, stream.socket(),
1043 std::forward<TeardownHandler>(handler));