2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_TEST_STREAM_HPP
11 #define BOOST_BEAST_TEST_STREAM_HPP
13 #include <boost/beast/core/bind_handler.hpp>
14 #include <boost/beast/core/buffers_prefix.hpp>
15 #include <boost/beast/core/flat_buffer.hpp>
16 #include <boost/beast/core/string.hpp>
17 #include <boost/beast/core/type_traits.hpp>
18 #include <boost/beast/websocket/teardown.hpp>
19 #include <boost/beast/test/fail_counter.hpp>
20 #include <boost/asio/async_result.hpp>
21 #include <boost/asio/buffer.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/post.hpp>
24 #include <boost/assert.hpp>
25 #include <boost/optional.hpp>
26 #include <boost/throw_exception.hpp>
27 #include <condition_variable>
37 /** A bidirectional in-memory communication channel
39 An instance of this class provides a client and server
40 endpoint that are automatically connected to each other
41 similarly to a connected socket.
43 Test pipes are used to facilitate writing unit tests
44 where the behavior of the transport is tightly controlled
45 to help illuminate all code paths (for code coverage)
51 virtual ~read_op() = default;
52 virtual void operator()() = 0;
55 template<class Handler, class Buffers>
71 std::condition_variable cv;
72 std::unique_ptr<read_op> op;
73 boost::asio::io_context& ioc;
74 status code = status::ok;
75 fail_counter* fc = nullptr;
76 std::size_t nread = 0;
77 std::size_t nwrite = 0;
78 std::size_t read_max =
79 (std::numeric_limits<std::size_t>::max)();
80 std::size_t write_max =
81 (std::numeric_limits<std::size_t>::max)();
90 boost::asio::io_context& ioc_,
102 std::unique_ptr<read_op> op_ = std::move(op);
112 std::shared_ptr<state> in_;
113 std::weak_ptr<state> out_;
116 using buffer_type = flat_buffer;
118 /// The type of the lowest layer.
119 using lowest_layer_type = stream;
125 std::unique_lock<std::mutex> lock{in_->m};
128 auto out = out_.lock();
131 std::unique_lock<std::mutex> lock{out->m};
132 if(out->code == status::ok)
134 out->code = status::reset;
141 stream(stream&& other)
143 auto in = std::make_shared<state>(
144 other.in_->ioc, other.in_->fc);
145 in_ = std::move(other.in_);
146 out_ = std::move(other.out_);
152 operator=(stream&& other)
154 auto in = std::make_shared<state>(
155 other.in_->ioc, other.in_->fc);
156 in_ = std::move(other.in_);
157 out_ = std::move(other.out_);
164 stream(boost::asio::io_context& ioc)
165 : in_(std::make_shared<state>(ioc, nullptr))
171 boost::asio::io_context& ioc,
173 : in_(std::make_shared<state>(ioc, &fc))
179 boost::asio::io_context& ioc,
181 : in_(std::make_shared<state>(ioc, nullptr))
183 using boost::asio::buffer;
184 using boost::asio::buffer_copy;
185 in_->b.commit(buffer_copy(
186 in_->b.prepare(s.size()),
187 buffer(s.data(), s.size())));
192 boost::asio::io_context& ioc,
195 : in_(std::make_shared<state>(ioc, &fc))
197 using boost::asio::buffer;
198 using boost::asio::buffer_copy;
199 in_->b.commit(buffer_copy(
200 in_->b.prepare(s.size()),
201 buffer(s.data(), s.size())));
204 /// Establish a connection
206 connect(stream& remote)
208 BOOST_ASSERT(! out_.lock());
209 BOOST_ASSERT(! remote.out_.lock());
214 /// The type of the executor associated with the object.
215 using executor_type =
216 boost::asio::io_context::executor_type;
218 /// Return the executor associated with the object.
219 boost::asio::io_context::executor_type
220 get_executor() noexcept
222 return in_->ioc.get_executor();
225 /** Get a reference to the lowest layer
227 This function returns a reference to the lowest layer
228 in a stack of stream layers.
230 @return A reference to the lowest layer in the stack of
239 /** Get a reference to the lowest layer
241 This function returns a reference to the lowest layer
242 in a stack of stream layers.
244 @return A reference to the lowest layer in the stack of
245 stream layers. Ownership is not transferred to the caller.
247 lowest_layer_type const&
253 /// Set the maximum number of bytes returned by read_some
255 read_size(std::size_t n)
260 /// Set the maximum number of bytes returned by write_some
262 write_size(std::size_t n)
267 /// Direct input buffer access
274 /// Returns a string view representing the pending input data
278 auto const bs = in_->b.data();
279 if(boost::asio::buffer_size(bs) == 0)
281 auto const b = buffers_front(bs);
282 return {reinterpret_cast<char const*>(b.data()), b.size()};
285 /// Appends a string to the pending input data
287 append(string_view s)
289 using boost::asio::buffer;
290 using boost::asio::buffer_copy;
291 std::lock_guard<std::mutex> lock{in_->m};
292 in_->b.commit(buffer_copy(
293 in_->b.prepare(s.size()),
294 buffer(s.data(), s.size())));
297 /// Clear the pending input area
301 std::lock_guard<std::mutex> lock{in_->m};
302 in_->b.consume(in_->b.size());
305 /// Return the number of reads
312 /// Return the number of writes
319 /** Close the stream.
321 The other end of the connection will see
322 `error::eof` after reading all the remaining data.
327 /** Close the other end of the stream.
329 This end of the connection will see
330 `error::eof` after reading all the remaining data.
335 template<class MutableBufferSequence>
337 read_some(MutableBufferSequence const& buffers);
339 template<class MutableBufferSequence>
341 read_some(MutableBufferSequence const& buffers,
344 template<class MutableBufferSequence, class ReadHandler>
345 BOOST_ASIO_INITFN_RESULT_TYPE(
346 ReadHandler, void(error_code, std::size_t))
347 async_read_some(MutableBufferSequence const& buffers,
348 ReadHandler&& handler);
350 template<class ConstBufferSequence>
352 write_some(ConstBufferSequence const& buffers);
354 template<class ConstBufferSequence>
357 ConstBufferSequence const& buffers, error_code&);
359 template<class ConstBufferSequence, class WriteHandler>
360 BOOST_ASIO_INITFN_RESULT_TYPE(
361 WriteHandler, void(error_code, std::size_t))
362 async_write_some(ConstBufferSequence const& buffers,
363 WriteHandler&& handler);
367 teardown(websocket::role_type,
368 stream& s, boost::system::error_code& ec);
370 template<class TeardownHandler>
373 async_teardown(websocket::role_type role,
374 stream& s, TeardownHandler&& handler);
377 //------------------------------------------------------------------------------
384 BOOST_ASSERT(! in_->op);
385 auto out = out_.lock();
388 std::lock_guard<std::mutex> lock{out->m};
389 if(out->code == status::ok)
391 out->code = status::eof;
401 std::lock_guard<std::mutex> lock{in_->m};
402 if(in_->code == status::ok)
404 in_->code = status::eof;
409 template<class MutableBufferSequence>
412 read_some(MutableBufferSequence const& buffers)
414 static_assert(boost::asio::is_mutable_buffer_sequence<
415 MutableBufferSequence>::value,
416 "MutableBufferSequence requirements not met");
418 auto const n = read_some(buffers, ec);
420 BOOST_THROW_EXCEPTION(system_error{ec});
424 template<class MutableBufferSequence>
427 read_some(MutableBufferSequence const& buffers,
430 static_assert(boost::asio::is_mutable_buffer_sequence<
431 MutableBufferSequence>::value,
432 "MutableBufferSequence requirements not met");
433 using boost::asio::buffer_copy;
434 using boost::asio::buffer_size;
435 BOOST_ASSERT(buffer_size(buffers) > 0);
436 if(in_->fc && in_->fc->fail(ec))
438 std::unique_lock<std::mutex> lock{in_->m};
439 BOOST_ASSERT(! in_->op);
445 in_->code != status::ok;
447 std::size_t bytes_transferred;
448 if(in_->b.size() > 0)
450 ec.assign(0, ec.category());
451 bytes_transferred = buffer_copy(
452 buffers, in_->b.data(), in_->read_max);
453 in_->b.consume(bytes_transferred);
457 BOOST_ASSERT(in_->code != status::ok);
458 bytes_transferred = 0;
459 if(in_->code == status::eof)
460 ec = boost::asio::error::eof;
461 else if(in_->code == status::reset)
462 ec = boost::asio::error::connection_reset;
465 return bytes_transferred;
468 template<class MutableBufferSequence, class ReadHandler>
469 BOOST_ASIO_INITFN_RESULT_TYPE(
470 ReadHandler, void(error_code, std::size_t))
473 MutableBufferSequence const& buffers,
474 ReadHandler&& handler)
476 static_assert(boost::asio::is_mutable_buffer_sequence<
477 MutableBufferSequence>::value,
478 "MutableBufferSequence requirements not met");
479 using boost::asio::buffer_copy;
480 using boost::asio::buffer_size;
481 BOOST_ASSERT(buffer_size(buffers) > 0);
482 boost::asio::async_completion<ReadHandler,
483 void(error_code, std::size_t)> init{handler};
487 if(in_->fc->fail(ec))
488 return boost::asio::post(
489 in_->ioc.get_executor(),
491 init.completion_handler,
496 std::unique_lock<std::mutex> lock{in_->m};
497 BOOST_ASSERT(! in_->op);
498 if(buffer_size(buffers) == 0 ||
499 buffer_size(in_->b.data()) > 0)
501 auto const bytes_transferred = buffer_copy(
502 buffers, in_->b.data(), in_->read_max);
503 in_->b.consume(bytes_transferred);
507 in_->ioc.get_executor(),
509 init.completion_handler,
513 else if(in_->code != status::ok)
518 if(in_->code == status::eof)
519 ec = boost::asio::error::eof;
520 else if(in_->code == status::reset)
521 ec = boost::asio::error::connection_reset;
523 in_->ioc.get_executor(),
525 init.completion_handler,
531 in_->op.reset(new read_op_impl<BOOST_ASIO_HANDLER_TYPE(
532 ReadHandler, void(error_code, std::size_t)),
533 MutableBufferSequence>{*in_, buffers,
534 init.completion_handler});
537 return init.result.get();
540 template<class ConstBufferSequence>
543 write_some(ConstBufferSequence const& buffers)
545 static_assert(boost::asio::is_const_buffer_sequence<
546 ConstBufferSequence>::value,
547 "ConstBufferSequence requirements not met");
549 auto const bytes_transferred =
550 write_some(buffers, ec);
552 BOOST_THROW_EXCEPTION(system_error{ec});
553 return bytes_transferred;
556 template<class ConstBufferSequence>
560 ConstBufferSequence const& buffers, error_code& ec)
562 static_assert(boost::asio::is_const_buffer_sequence<
563 ConstBufferSequence>::value,
564 "ConstBufferSequence requirements not met");
565 using boost::asio::buffer_copy;
566 using boost::asio::buffer_size;
567 auto out = out_.lock();
570 ec = boost::asio::error::connection_reset;
573 BOOST_ASSERT(out->code == status::ok);
574 if(in_->fc && in_->fc->fail(ec))
576 auto const n = (std::min)(
577 buffer_size(buffers), in_->write_max);
578 std::unique_lock<std::mutex> lock{out->m};
579 auto const bytes_transferred =
580 buffer_copy(out->b.prepare(n), buffers);
581 out->b.commit(bytes_transferred);
585 ec.assign(0, ec.category());
586 return bytes_transferred;
589 template<class ConstBufferSequence, class WriteHandler>
590 BOOST_ASIO_INITFN_RESULT_TYPE(
591 WriteHandler, void(error_code, std::size_t))
593 async_write_some(ConstBufferSequence const& buffers,
594 WriteHandler&& handler)
596 static_assert(boost::asio::is_const_buffer_sequence<
597 ConstBufferSequence>::value,
598 "ConstBufferSequence requirements not met");
599 using boost::asio::buffer_copy;
600 using boost::asio::buffer_size;
601 boost::asio::async_completion<WriteHandler,
602 void(error_code, std::size_t)> init{handler};
603 auto out = out_.lock();
605 return boost::asio::post(
606 in_->ioc.get_executor(),
608 init.completion_handler,
609 boost::asio::error::connection_reset,
611 BOOST_ASSERT(out->code == status::ok);
615 if(in_->fc->fail(ec))
616 return boost::asio::post(
617 in_->ioc.get_executor(),
619 init.completion_handler,
624 (std::min)(buffer_size(buffers), in_->write_max);
625 std::unique_lock<std::mutex> lock{out->m};
626 auto const bytes_transferred =
627 buffer_copy(out->b.prepare(n), buffers);
628 out->b.commit(bytes_transferred);
633 in_->ioc.get_executor(),
635 init.completion_handler,
638 return init.result.get();
644 websocket::role_type,
646 boost::system::error_code& ec)
656 ec = boost::asio::error::eof;
658 ec.assign(0, ec.category());
661 template<class TeardownHandler>
665 websocket::role_type,
667 TeardownHandler&& handler)
672 return boost::asio::post(
674 bind_handler(std::move(handler), ec));
678 ec = boost::asio::error::eof;
680 ec.assign(0, ec.category());
684 bind_handler(std::move(handler), ec));
687 //------------------------------------------------------------------------------
689 template<class Handler, class Buffers>
690 class stream::read_op_impl : public stream::read_op
698 boost::asio::executor_work_guard<
699 boost::asio::io_context::executor_type>> work_;
702 lambda(lambda&&) = default;
703 lambda(lambda const&) = default;
705 lambda(state& s, Buffers const& b, Handler&& h)
709 , work_(s_.ioc.get_executor())
713 lambda(state& s, Buffers const& b, Handler const& h)
717 , work_(s_.ioc.get_executor())
725 s_.ioc.get_executor(),
733 using boost::asio::buffer_copy;
734 using boost::asio::buffer_size;
735 std::unique_lock<std::mutex> lock{s_.m};
736 BOOST_ASSERT(! s_.op);
739 auto const bytes_transferred = buffer_copy(
740 b_, s_.b.data(), s_.read_max);
741 s_.b.consume(bytes_transferred);
743 Handler h{std::move(h_)};
747 s.ioc.get_executor(),
755 BOOST_ASSERT(s_.code != status::ok);
757 Handler h{std::move(h_)};
761 if(s.code == status::eof)
762 ec = boost::asio::error::eof;
763 else if(s.code == status::reset)
764 ec = boost::asio::error::connection_reset;
766 s.ioc.get_executor(),
767 bind_handler(std::move(h), ec, 0));
775 read_op_impl(state& s, Buffers const& b, Handler&& h)
776 : fn_(s, b, std::move(h))
780 read_op_impl(state& s, Buffers const& b, Handler const& h)
786 operator()() override
792 /// Create and return a connected stream
797 stream from{to.get_executor().context()};
802 /// Create and return a connected stream
803 template<class Arg1, class... ArgN>
805 connect(stream& to, Arg1&& arg1, ArgN&&... argn)
808 std::forward<Arg1>(arg1),
809 std::forward<ArgN>(argn)...};