2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
11 #define BOOST_BEAST_TEST_IMPL_STREAM_IPP
13 #include <boost/beast/_experimental/test/stream.hpp>
14 #include <boost/beast/core/bind_handler.hpp>
15 #include <boost/beast/core/buffer_traits.hpp>
16 #include <boost/make_shared.hpp>
24 //------------------------------------------------------------------------------
28 service(net::execution_context& ctx)
29 : beast::detail::service_base<service>(ctx)
30 , sp_(boost::make_shared<service_impl>())
39 std::vector<std::unique_ptr<read_op_base>> v;
40 std::lock_guard<std::mutex> g1(sp_->m_);
41 v.reserve(sp_->v_.size());
44 std::lock_guard<std::mutex> g2(p->m);
45 v.emplace_back(std::move(p->op));
46 p->code = status::eof;
55 test::fail_count* fc) ->
56 boost::shared_ptr<state>
58 auto& svc = net::use_service<service>(ctx);
59 auto sp = boost::make_shared<state>(ctx, svc.sp_, fc);
60 std::lock_guard<std::mutex> g(svc.sp_->m_);
61 svc.sp_->v_.push_back(sp.get());
70 std::lock_guard<std::mutex> g(m_);
73 &impl) = std::move(v_.back());
77 //------------------------------------------------------------------------------
79 void stream::initiate_read(
80 boost::shared_ptr<state> const& in_,
81 std::unique_ptr<stream::read_op_base>&& op,
84 std::unique_lock<std::mutex> lock(in_->m);
87 if(in_->op != nullptr)
88 BOOST_THROW_EXCEPTION(
89 std::logic_error{"in_->op != nullptr"});
93 if(in_->fc && in_->fc->fail(ec))
100 // A request to read 0 bytes from a stream is a no-op.
101 if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0)
109 if(in_->code != status::ok)
112 (*op)(net::error::eof);
116 // complete when bytes available or closed
117 in_->op = std::move(op);
123 net::io_context& ioc_,
124 boost::weak_ptr<service_impl> wp_,
136 // cancel outstanding read
138 (*op)(net::error::operation_aborted);
148 // If this goes off, it means the lifetime of a test::stream object
149 // extended beyond the lifetime of the associated execution context.
162 auto op_ = std::move(op);
163 op_->operator()(error_code{});
176 std::unique_ptr<read_op_base> p;
178 std::lock_guard<std::mutex> lock(m);
183 (*p)(net::error::operation_aborted);
186 //------------------------------------------------------------------------------
196 stream(stream&& other)
198 auto in = service::make_impl(
199 other.in_->ioc, other.in_->fc);
200 in_ = std::move(other.in_);
201 out_ = std::move(other.out_);
207 operator=(stream&& other)
210 auto in = service::make_impl(
211 other.in_->ioc, other.in_->fc);
213 in_ = std::move(other.in_);
214 out_ = std::move(other.out_);
219 //------------------------------------------------------------------------------
222 stream(net::io_context& ioc)
223 : in_(service::make_impl(ioc, nullptr))
229 net::io_context& ioc,
231 : in_(service::make_impl(ioc, &fc))
237 net::io_context& ioc,
239 : in_(service::make_impl(ioc, nullptr))
241 in_->b.commit(net::buffer_copy(
242 in_->b.prepare(s.size()),
243 net::buffer(s.data(), s.size())));
248 net::io_context& ioc,
251 : in_(service::make_impl(ioc, &fc))
253 in_->b.commit(net::buffer_copy(
254 in_->b.prepare(s.size()),
255 net::buffer(s.data(), s.size())));
260 connect(stream& remote)
262 BOOST_ASSERT(! out_.lock());
263 BOOST_ASSERT(! remote.out_.lock());
264 std::lock(in_->m, remote.in_->m);
265 std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock};
266 std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock};
269 in_->code = status::ok;
270 remote.in_->code = status::ok;
277 auto const bs = in_->b.data();
278 if(buffer_bytes(bs) == 0)
280 net::const_buffer const b = *net::buffer_sequence_begin(bs);
281 return {static_cast<char const*>(b.data()), b.size()};
286 append(string_view s)
288 std::lock_guard<std::mutex> lock{in_->m};
289 in_->b.commit(net::buffer_copy(
290 in_->b.prepare(s.size()),
291 net::buffer(s.data(), s.size())));
298 std::lock_guard<std::mutex> lock{in_->m};
299 in_->b.consume(in_->b.size());
310 auto out = out_.lock();
316 std::lock_guard<std::mutex> lock(out->m);
317 if(out->code == status::ok)
319 out->code = status::eof;
330 std::lock_guard<std::mutex> lock{in_->m};
331 if(in_->code == status::ok)
333 in_->code = status::eof;
342 boost::system::error_code& ec)
352 ec = net::error::eof;
357 //------------------------------------------------------------------------------
362 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
363 stream from{net::query(to.get_executor(), net::execution::context)};
364 #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
365 stream from{to.get_executor().context()};
366 #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
372 connect(stream& s1, stream& s2)