2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_TEST_STREAM_HPP
11 #define BOOST_BEAST_TEST_STREAM_HPP
13 #include <boost/beast/core/bind_handler.hpp>
14 #include <boost/beast/core/buffers_prefix.hpp>
15 #include <boost/beast/core/flat_buffer.hpp>
16 #include <boost/beast/core/string.hpp>
17 #include <boost/beast/core/type_traits.hpp>
18 #include <boost/beast/websocket/teardown.hpp>
19 #include <boost/beast/test/fail_counter.hpp>
20 #include <boost/asio/async_result.hpp>
21 #include <boost/asio/buffer.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/post.hpp>
24 #include <boost/assert.hpp>
25 #include <boost/optional.hpp>
26 #include <boost/throw_exception.hpp>
27 #include <condition_variable>
37 /** A bidirectional in-memory communication channel
39 An instance of this class provides a client and server
40 endpoint that are automatically connected to each other
41 similarly to a connected socket.
43 Test pipes are used to facilitate writing unit tests
44 where the behavior of the transport is tightly controlled
45 to help illuminate all code paths (for code coverage)
51 virtual ~read_op() = default;
52 virtual void operator()() = 0;
55 template<class Handler, class Buffers>
71 std::condition_variable cv;
72 std::unique_ptr<read_op> op;
73 boost::asio::io_context& ioc;
74 status code = status::ok;
75 fail_counter* fc = nullptr;
76 std::size_t nread = 0;
77 std::size_t nwrite = 0;
78 std::size_t read_max =
79 (std::numeric_limits<std::size_t>::max)();
80 std::size_t write_max =
81 (std::numeric_limits<std::size_t>::max)();
90 boost::asio::io_context& ioc_,
102 std::unique_ptr<read_op> op_ = std::move(op);
112 std::shared_ptr<state> in_;
113 std::weak_ptr<state> out_;
116 using buffer_type = flat_buffer;
118 /// The type of the lowest layer.
119 using lowest_layer_type = stream;
125 std::unique_lock<std::mutex> lock{in_->m};
128 auto out = out_.lock();
131 std::unique_lock<std::mutex> lock{out->m};
132 if(out->code == status::ok)
134 out->code = status::reset;
141 stream(stream&& other)
143 auto in = std::make_shared<state>(
144 other.in_->ioc, other.in_->fc);
145 in_ = std::move(other.in_);
146 out_ = std::move(other.out_);
152 operator=(stream&& other)
154 auto in = std::make_shared<state>(
155 other.in_->ioc, other.in_->fc);
156 in_ = std::move(other.in_);
157 out_ = std::move(other.out_);
164 stream(boost::asio::io_context& ioc)
165 : in_(std::make_shared<state>(ioc, nullptr))
171 boost::asio::io_context& ioc,
173 : in_(std::make_shared<state>(ioc, &fc))
179 boost::asio::io_context& ioc,
181 : in_(std::make_shared<state>(ioc, nullptr))
183 using boost::asio::buffer;
184 using boost::asio::buffer_copy;
185 in_->b.commit(buffer_copy(
186 in_->b.prepare(s.size()),
187 buffer(s.data(), s.size())));
192 boost::asio::io_context& ioc,
195 : in_(std::make_shared<state>(ioc, &fc))
197 using boost::asio::buffer;
198 using boost::asio::buffer_copy;
199 in_->b.commit(buffer_copy(
200 in_->b.prepare(s.size()),
201 buffer(s.data(), s.size())));
204 /// Establish a connection
206 connect(stream& remote)
208 BOOST_ASSERT(! out_.lock());
209 BOOST_ASSERT(! remote.out_.lock());
214 /// The type of the executor associated with the object.
215 using executor_type =
216 boost::asio::io_context::executor_type;
218 /// Return the executor associated with the object.
219 boost::asio::io_context::executor_type
220 get_executor() noexcept
222 return in_->ioc.get_executor();
225 /** Get a reference to the lowest layer
227 This function returns a reference to the lowest layer
228 in a stack of stream layers.
230 @return A reference to the lowest layer in the stack of
239 /** Get a reference to the lowest layer
241 This function returns a reference to the lowest layer
242 in a stack of stream layers.
244 @return A reference to the lowest layer in the stack of
245 stream layers. Ownership is not transferred to the caller.
247 lowest_layer_type const&
253 /// Set the maximum number of bytes returned by read_some
255 read_size(std::size_t n)
260 /// Set the maximum number of bytes returned by write_some
262 write_size(std::size_t n)
267 /// Direct input buffer access
274 /// Returns a string view representing the pending input data
278 auto const bs = in_->b.data();
279 if(boost::asio::buffer_size(bs) == 0)
281 auto const b = buffers_front(bs);
282 return {reinterpret_cast<char const*>(b.data()), b.size()};
285 /// Appends a string to the pending input data
287 append(string_view s)
289 using boost::asio::buffer;
290 using boost::asio::buffer_copy;
291 std::lock_guard<std::mutex> lock{in_->m};
292 in_->b.commit(buffer_copy(
293 in_->b.prepare(s.size()),
294 buffer(s.data(), s.size())));
297 /// Clear the pending input area
301 std::lock_guard<std::mutex> lock{in_->m};
302 in_->b.consume(in_->b.size());
305 /// Return the number of reads
312 /// Return the number of writes
319 /** Close the stream.
321 The other end of the connection will see
322 `error::eof` after reading all the remaining data.
327 /** Close the other end of the stream.
329 This end of the connection will see
330 `error::eof` after reading all the remaining data.
335 template<class MutableBufferSequence>
337 read_some(MutableBufferSequence const& buffers);
339 template<class MutableBufferSequence>
341 read_some(MutableBufferSequence const& buffers,
344 template<class MutableBufferSequence, class ReadHandler>
345 BOOST_ASIO_INITFN_RESULT_TYPE(
346 ReadHandler, void(error_code, std::size_t))
347 async_read_some(MutableBufferSequence const& buffers,
348 ReadHandler&& handler);
350 template<class ConstBufferSequence>
352 write_some(ConstBufferSequence const& buffers);
354 template<class ConstBufferSequence>
357 ConstBufferSequence const& buffers, error_code&);
359 template<class ConstBufferSequence, class WriteHandler>
360 BOOST_ASIO_INITFN_RESULT_TYPE(
361 WriteHandler, void(error_code, std::size_t))
362 async_write_some(ConstBufferSequence const& buffers,
363 WriteHandler&& handler);
367 teardown(websocket::role_type,
368 stream& s, boost::system::error_code& ec);
370 template<class TeardownHandler>
373 async_teardown(websocket::role_type role,
374 stream& s, TeardownHandler&& handler);
377 //------------------------------------------------------------------------------
384 BOOST_ASSERT(! in_->op);
385 auto out = out_.lock();
388 std::lock_guard<std::mutex> lock{out->m};
389 if(out->code == status::ok)
391 out->code = status::eof;
401 std::lock_guard<std::mutex> lock{in_->m};
402 if(in_->code == status::ok)
404 in_->code = status::eof;
409 template<class MutableBufferSequence>
412 read_some(MutableBufferSequence const& buffers)
414 static_assert(boost::asio::is_mutable_buffer_sequence<
415 MutableBufferSequence>::value,
416 "MutableBufferSequence requirements not met");
418 auto const n = read_some(buffers, ec);
420 BOOST_THROW_EXCEPTION(system_error{ec});
424 template<class MutableBufferSequence>
427 read_some(MutableBufferSequence const& buffers,
430 static_assert(boost::asio::is_mutable_buffer_sequence<
431 MutableBufferSequence>::value,
432 "MutableBufferSequence requirements not met");
433 using boost::asio::buffer_copy;
434 using boost::asio::buffer_size;
435 if(in_->fc && in_->fc->fail(ec))
437 if(buffer_size(buffers) == 0)
442 std::unique_lock<std::mutex> lock{in_->m};
443 BOOST_ASSERT(! in_->op);
449 in_->code != status::ok;
451 std::size_t bytes_transferred;
452 if(in_->b.size() > 0)
454 ec.assign(0, ec.category());
455 bytes_transferred = buffer_copy(
456 buffers, in_->b.data(), in_->read_max);
457 in_->b.consume(bytes_transferred);
461 BOOST_ASSERT(in_->code != status::ok);
462 bytes_transferred = 0;
463 if(in_->code == status::eof)
464 ec = boost::asio::error::eof;
465 else if(in_->code == status::reset)
466 ec = boost::asio::error::connection_reset;
469 return bytes_transferred;
472 template<class MutableBufferSequence, class ReadHandler>
473 BOOST_ASIO_INITFN_RESULT_TYPE(
474 ReadHandler, void(error_code, std::size_t))
477 MutableBufferSequence const& buffers,
478 ReadHandler&& handler)
480 static_assert(boost::asio::is_mutable_buffer_sequence<
481 MutableBufferSequence>::value,
482 "MutableBufferSequence requirements not met");
483 using boost::asio::buffer_copy;
484 using boost::asio::buffer_size;
485 BOOST_BEAST_HANDLER_INIT(
486 ReadHandler, void(error_code, std::size_t));
490 if(in_->fc->fail(ec))
491 return boost::asio::post(
492 in_->ioc.get_executor(),
494 std::move(init.completion_handler),
499 std::unique_lock<std::mutex> lock{in_->m};
500 BOOST_ASSERT(! in_->op);
501 if(buffer_size(buffers) == 0 ||
502 buffer_size(in_->b.data()) > 0)
504 auto const bytes_transferred = buffer_copy(
505 buffers, in_->b.data(), in_->read_max);
506 in_->b.consume(bytes_transferred);
510 in_->ioc.get_executor(),
512 std::move(init.completion_handler),
516 else if(in_->code != status::ok)
521 if(in_->code == status::eof)
522 ec = boost::asio::error::eof;
523 else if(in_->code == status::reset)
524 ec = boost::asio::error::connection_reset;
526 in_->ioc.get_executor(),
528 std::move(init.completion_handler),
534 in_->op.reset(new read_op_impl<BOOST_ASIO_HANDLER_TYPE(
535 ReadHandler, void(error_code, std::size_t)),
536 MutableBufferSequence>{*in_, buffers,
537 std::move(init.completion_handler)});
540 return init.result.get();
543 template<class ConstBufferSequence>
546 write_some(ConstBufferSequence const& buffers)
548 static_assert(boost::asio::is_const_buffer_sequence<
549 ConstBufferSequence>::value,
550 "ConstBufferSequence requirements not met");
552 auto const bytes_transferred =
553 write_some(buffers, ec);
555 BOOST_THROW_EXCEPTION(system_error{ec});
556 return bytes_transferred;
559 template<class ConstBufferSequence>
563 ConstBufferSequence const& buffers, error_code& ec)
565 static_assert(boost::asio::is_const_buffer_sequence<
566 ConstBufferSequence>::value,
567 "ConstBufferSequence requirements not met");
568 using boost::asio::buffer_copy;
569 using boost::asio::buffer_size;
570 auto out = out_.lock();
573 ec = boost::asio::error::connection_reset;
576 BOOST_ASSERT(out->code == status::ok);
577 if(in_->fc && in_->fc->fail(ec))
579 auto const n = (std::min)(
580 buffer_size(buffers), in_->write_max);
581 std::unique_lock<std::mutex> lock{out->m};
582 auto const bytes_transferred =
583 buffer_copy(out->b.prepare(n), buffers);
584 out->b.commit(bytes_transferred);
588 ec.assign(0, ec.category());
589 return bytes_transferred;
592 template<class ConstBufferSequence, class WriteHandler>
593 BOOST_ASIO_INITFN_RESULT_TYPE(
594 WriteHandler, void(error_code, std::size_t))
596 async_write_some(ConstBufferSequence const& buffers,
597 WriteHandler&& handler)
599 static_assert(boost::asio::is_const_buffer_sequence<
600 ConstBufferSequence>::value,
601 "ConstBufferSequence requirements not met");
602 using boost::asio::buffer_copy;
603 using boost::asio::buffer_size;
604 BOOST_BEAST_HANDLER_INIT(
605 WriteHandler, void(error_code, std::size_t));
606 auto out = out_.lock();
608 return boost::asio::post(
609 in_->ioc.get_executor(),
611 std::move(init.completion_handler),
612 boost::asio::error::connection_reset,
614 BOOST_ASSERT(out->code == status::ok);
618 if(in_->fc->fail(ec))
619 return boost::asio::post(
620 in_->ioc.get_executor(),
622 std::move(init.completion_handler),
627 (std::min)(buffer_size(buffers), in_->write_max);
628 std::unique_lock<std::mutex> lock{out->m};
629 auto const bytes_transferred =
630 buffer_copy(out->b.prepare(n), buffers);
631 out->b.commit(bytes_transferred);
636 in_->ioc.get_executor(),
638 std::move(init.completion_handler),
641 return init.result.get();
647 websocket::role_type,
649 boost::system::error_code& ec)
659 ec = boost::asio::error::eof;
661 ec.assign(0, ec.category());
664 template<class TeardownHandler>
668 websocket::role_type,
670 TeardownHandler&& handler)
675 return boost::asio::post(
677 bind_handler(std::move(handler), ec));
681 ec = boost::asio::error::eof;
683 ec.assign(0, ec.category());
687 bind_handler(std::move(handler), ec));
690 //------------------------------------------------------------------------------
692 template<class Handler, class Buffers>
693 class stream::read_op_impl : public stream::read_op
701 boost::asio::executor_work_guard<
702 boost::asio::io_context::executor_type>> work_;
705 lambda(lambda&&) = default;
706 lambda(lambda const&) = default;
708 template<class DeducedHandler>
709 lambda(state& s, Buffers const& b, DeducedHandler&& h)
712 , h_(std::forward<DeducedHandler>(h))
713 , work_(s_.ioc.get_executor())
721 s_.ioc.get_executor(),
729 using boost::asio::buffer_copy;
730 using boost::asio::buffer_size;
731 std::unique_lock<std::mutex> lock{s_.m};
732 BOOST_ASSERT(! s_.op);
735 auto const bytes_transferred = buffer_copy(
736 b_, s_.b.data(), s_.read_max);
737 s_.b.consume(bytes_transferred);
739 Handler h{std::move(h_)};
743 s.ioc.get_executor(),
751 BOOST_ASSERT(s_.code != status::ok);
753 Handler h{std::move(h_)};
757 if(s.code == status::eof)
758 ec = boost::asio::error::eof;
759 else if(s.code == status::reset)
760 ec = boost::asio::error::connection_reset;
762 s.ioc.get_executor(),
763 bind_handler(std::move(h), ec, 0));
771 template<class DeducedHandler>
772 read_op_impl(state& s, Buffers const& b, DeducedHandler&& h)
773 : fn_(s, b, std::forward<DeducedHandler>(h))
778 operator()() override
784 /// Create and return a connected stream
789 stream from{to.get_executor().context()};
794 /// Create and return a connected stream
795 template<class Arg1, class... ArgN>
797 connect(stream& to, Arg1&& arg1, ArgN&&... argn)
800 std::forward<Arg1>(arg1),
801 std::forward<ArgN>(argn)...};