]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/beast/_experimental/test/impl/stream.ipp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / boost / beast / _experimental / test / impl / stream.ipp
CommitLineData
92f5a8d4
TL
1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
11#define BOOST_BEAST_TEST_IMPL_STREAM_IPP
12
13#include <boost/beast/_experimental/test/stream.hpp>
14#include <boost/beast/core/bind_handler.hpp>
15#include <boost/beast/core/buffer_traits.hpp>
16#include <boost/make_shared.hpp>
17#include <stdexcept>
18#include <vector>
19
20namespace boost {
21namespace beast {
22namespace test {
23
24//------------------------------------------------------------------------------
25
26stream::
27service::
28service(net::execution_context& ctx)
29 : beast::detail::service_base<service>(ctx)
30 , sp_(boost::make_shared<service_impl>())
31{
32}
33
34void
35stream::
36service::
37shutdown()
38{
39 std::vector<std::unique_ptr<read_op_base>> v;
40 std::lock_guard<std::mutex> g1(sp_->m_);
41 v.reserve(sp_->v_.size());
42 for(auto p : sp_->v_)
43 {
44 std::lock_guard<std::mutex> g2(p->m);
45 v.emplace_back(std::move(p->op));
46 p->code = status::eof;
47 }
48}
49
50auto
51stream::
52service::
53make_impl(
54 net::io_context& ctx,
55 test::fail_count* fc) ->
56 boost::shared_ptr<state>
57{
58 auto& svc = net::use_service<service>(ctx);
59 auto sp = boost::make_shared<state>(ctx, svc.sp_, fc);
60 std::lock_guard<std::mutex> g(svc.sp_->m_);
61 svc.sp_->v_.push_back(sp.get());
62 return sp;
63}
64
65void
66stream::
67service_impl::
68remove(state& impl)
69{
70 std::lock_guard<std::mutex> g(m_);
71 *std::find(
72 v_.begin(), v_.end(),
73 &impl) = std::move(v_.back());
74 v_.pop_back();
75}
76
77//------------------------------------------------------------------------------
78
79void stream::initiate_read(
80 boost::shared_ptr<state> const& in_,
81 std::unique_ptr<stream::read_op_base>&& op,
82 std::size_t buf_size)
83{
84 std::unique_lock<std::mutex> lock(in_->m);
85
86 ++in_->nread;
87 if(in_->op != nullptr)
88 BOOST_THROW_EXCEPTION(
89 std::logic_error{"in_->op != nullptr"});
90
91 // test failure
92 error_code ec;
93 if(in_->fc && in_->fc->fail(ec))
94 {
95 lock.unlock();
96 (*op)(ec);
97 return;
98 }
99
100 // A request to read 0 bytes from a stream is a no-op.
101 if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0)
102 {
103 lock.unlock();
104 (*op)(ec);
105 return;
106 }
107
108 // deliver error
109 if(in_->code != status::ok)
110 {
111 lock.unlock();
112 (*op)(net::error::eof);
113 return;
114 }
115
116 // complete when bytes available or closed
117 in_->op = std::move(op);
118}
119
120stream::
121state::
122state(
123 net::io_context& ioc_,
124 boost::weak_ptr<service_impl> wp_,
125 fail_count* fc_)
126 : ioc(ioc_)
127 , wp(std::move(wp_))
128 , fc(fc_)
129{
130}
131
132stream::
133state::
134~state()
135{
136 // cancel outstanding read
137 if(op != nullptr)
138 (*op)(net::error::operation_aborted);
139}
140
141void
142stream::
143state::
144remove() noexcept
145{
146 auto sp = wp.lock();
147
148 // If this goes off, it means the lifetime of a test::stream object
149 // extended beyond the lifetime of the associated execution context.
150 BOOST_ASSERT(sp);
151
152 sp->remove(*this);
153}
154
155void
156stream::
157state::
158notify_read()
159{
160 if(op)
161 {
162 auto op_ = std::move(op);
163 op_->operator()(error_code{});
164 }
165 else
166 {
167 cv.notify_all();
168 }
169}
170
171void
172stream::
173state::
174cancel_read()
175{
176 std::unique_ptr<read_op_base> p;
177 {
178 std::lock_guard<std::mutex> lock(m);
179 code = status::eof;
180 p = std::move(op);
181 }
182 if(p != nullptr)
183 (*p)(net::error::operation_aborted);
184}
185
186//------------------------------------------------------------------------------
187
188stream::
189~stream()
190{
191 close();
192 in_->remove();
193}
194
195stream::
196stream(stream&& other)
197{
198 auto in = service::make_impl(
199 other.in_->ioc, other.in_->fc);
200 in_ = std::move(other.in_);
201 out_ = std::move(other.out_);
202 other.in_ = in;
203}
204
205stream&
206stream::
207operator=(stream&& other)
208{
209 close();
210 auto in = service::make_impl(
211 other.in_->ioc, other.in_->fc);
212 in_->remove();
213 in_ = std::move(other.in_);
214 out_ = std::move(other.out_);
215 other.in_ = in;
216 return *this;
217}
218
219//------------------------------------------------------------------------------
220
221stream::
222stream(net::io_context& ioc)
223 : in_(service::make_impl(ioc, nullptr))
224{
225}
226
227stream::
228stream(
229 net::io_context& ioc,
230 fail_count& fc)
231 : in_(service::make_impl(ioc, &fc))
232{
233}
234
235stream::
236stream(
237 net::io_context& ioc,
238 string_view s)
239 : in_(service::make_impl(ioc, nullptr))
240{
241 in_->b.commit(net::buffer_copy(
242 in_->b.prepare(s.size()),
243 net::buffer(s.data(), s.size())));
244}
245
246stream::
247stream(
248 net::io_context& ioc,
249 fail_count& fc,
250 string_view s)
251 : in_(service::make_impl(ioc, &fc))
252{
253 in_->b.commit(net::buffer_copy(
254 in_->b.prepare(s.size()),
255 net::buffer(s.data(), s.size())));
256}
257
258void
259stream::
260connect(stream& remote)
261{
262 BOOST_ASSERT(! out_.lock());
263 BOOST_ASSERT(! remote.out_.lock());
264 std::lock(in_->m, remote.in_->m);
265 std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock};
266 std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock};
267 out_ = remote.in_;
268 remote.out_ = in_;
269 in_->code = status::ok;
270 remote.in_->code = status::ok;
271}
272
273string_view
274stream::
275str() const
276{
277 auto const bs = in_->b.data();
278 if(buffer_bytes(bs) == 0)
279 return {};
280 net::const_buffer const b = *net::buffer_sequence_begin(bs);
281 return {static_cast<char const*>(b.data()), b.size()};
282}
283
284void
285stream::
286append(string_view s)
287{
288 std::lock_guard<std::mutex> lock{in_->m};
289 in_->b.commit(net::buffer_copy(
290 in_->b.prepare(s.size()),
291 net::buffer(s.data(), s.size())));
292}
293
294void
295stream::
296clear()
297{
298 std::lock_guard<std::mutex> lock{in_->m};
299 in_->b.consume(in_->b.size());
300}
301
302void
303stream::
304close()
305{
306 in_->cancel_read();
307
308 // disconnect
309 {
310 auto out = out_.lock();
311 out_.reset();
312
313 // notify peer
314 if(out)
315 {
316 std::lock_guard<std::mutex> lock(out->m);
317 if(out->code == status::ok)
318 {
319 out->code = status::eof;
320 out->notify_read();
321 }
322 }
323 }
324}
325
326void
327stream::
328close_remote()
329{
330 std::lock_guard<std::mutex> lock{in_->m};
331 if(in_->code == status::ok)
332 {
333 in_->code = status::eof;
334 in_->notify_read();
335 }
336}
337
338void
339teardown(
340 role_type,
341 stream& s,
342 boost::system::error_code& ec)
343{
344 if( s.in_->fc &&
345 s.in_->fc->fail(ec))
346 return;
347
348 s.close();
349
350 if( s.in_->fc &&
351 s.in_->fc->fail(ec))
352 ec = net::error::eof;
353 else
354 ec = {};
355}
356
357//------------------------------------------------------------------------------
358
359stream
360connect(stream& to)
361{
20effc67
TL
362#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
363 stream from{net::query(to.get_executor(), net::execution::context)};
364#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
92f5a8d4 365 stream from{to.get_executor().context()};
20effc67 366#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
92f5a8d4
TL
367 from.connect(to);
368 return from;
369}
370
371void
372connect(stream& s1, stream& s2)
373{
374 s1.connect(s2);
375}
376
377} // test
378} // beast
379} // boost
380
381#endif