]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/beast/_experimental/test/impl/stream.ipp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / boost / beast / _experimental / test / impl / stream.ipp
1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
11 #define BOOST_BEAST_TEST_IMPL_STREAM_IPP
12
13 #include <boost/beast/_experimental/test/stream.hpp>
14 #include <boost/beast/core/bind_handler.hpp>
15 #include <boost/beast/core/buffer_traits.hpp>
16 #include <boost/make_shared.hpp>
17 #include <stdexcept>
18 #include <vector>
19
20 namespace boost {
21 namespace beast {
22 namespace test {
23
24 //------------------------------------------------------------------------------
25
26 stream::
27 service::
28 service(net::execution_context& ctx)
29 : beast::detail::service_base<service>(ctx)
30 , sp_(boost::make_shared<service_impl>())
31 {
32 }
33
34 void
35 stream::
36 service::
37 shutdown()
38 {
39 std::vector<std::unique_ptr<read_op_base>> v;
40 std::lock_guard<std::mutex> g1(sp_->m_);
41 v.reserve(sp_->v_.size());
42 for(auto p : sp_->v_)
43 {
44 std::lock_guard<std::mutex> g2(p->m);
45 v.emplace_back(std::move(p->op));
46 p->code = status::eof;
47 }
48 }
49
50 auto
51 stream::
52 service::
53 make_impl(
54 net::io_context& ctx,
55 test::fail_count* fc) ->
56 boost::shared_ptr<state>
57 {
58 auto& svc = net::use_service<service>(ctx);
59 auto sp = boost::make_shared<state>(ctx, svc.sp_, fc);
60 std::lock_guard<std::mutex> g(svc.sp_->m_);
61 svc.sp_->v_.push_back(sp.get());
62 return sp;
63 }
64
65 void
66 stream::
67 service_impl::
68 remove(state& impl)
69 {
70 std::lock_guard<std::mutex> g(m_);
71 *std::find(
72 v_.begin(), v_.end(),
73 &impl) = std::move(v_.back());
74 v_.pop_back();
75 }
76
77 //------------------------------------------------------------------------------
78
79 void stream::initiate_read(
80 boost::shared_ptr<state> const& in_,
81 std::unique_ptr<stream::read_op_base>&& op,
82 std::size_t buf_size)
83 {
84 std::unique_lock<std::mutex> lock(in_->m);
85
86 ++in_->nread;
87 if(in_->op != nullptr)
88 BOOST_THROW_EXCEPTION(
89 std::logic_error{"in_->op != nullptr"});
90
91 // test failure
92 error_code ec;
93 if(in_->fc && in_->fc->fail(ec))
94 {
95 lock.unlock();
96 (*op)(ec);
97 return;
98 }
99
100 // A request to read 0 bytes from a stream is a no-op.
101 if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0)
102 {
103 lock.unlock();
104 (*op)(ec);
105 return;
106 }
107
108 // deliver error
109 if(in_->code != status::ok)
110 {
111 lock.unlock();
112 (*op)(net::error::eof);
113 return;
114 }
115
116 // complete when bytes available or closed
117 in_->op = std::move(op);
118 }
119
120 stream::
121 state::
122 state(
123 net::io_context& ioc_,
124 boost::weak_ptr<service_impl> wp_,
125 fail_count* fc_)
126 : ioc(ioc_)
127 , wp(std::move(wp_))
128 , fc(fc_)
129 {
130 }
131
132 stream::
133 state::
134 ~state()
135 {
136 // cancel outstanding read
137 if(op != nullptr)
138 (*op)(net::error::operation_aborted);
139 }
140
141 void
142 stream::
143 state::
144 remove() noexcept
145 {
146 auto sp = wp.lock();
147
148 // If this goes off, it means the lifetime of a test::stream object
149 // extended beyond the lifetime of the associated execution context.
150 BOOST_ASSERT(sp);
151
152 sp->remove(*this);
153 }
154
155 void
156 stream::
157 state::
158 notify_read()
159 {
160 if(op)
161 {
162 auto op_ = std::move(op);
163 op_->operator()(error_code{});
164 }
165 else
166 {
167 cv.notify_all();
168 }
169 }
170
171 void
172 stream::
173 state::
174 cancel_read()
175 {
176 std::unique_ptr<read_op_base> p;
177 {
178 std::lock_guard<std::mutex> lock(m);
179 code = status::eof;
180 p = std::move(op);
181 }
182 if(p != nullptr)
183 (*p)(net::error::operation_aborted);
184 }
185
186 //------------------------------------------------------------------------------
187
188 stream::
189 ~stream()
190 {
191 close();
192 in_->remove();
193 }
194
195 stream::
196 stream(stream&& other)
197 {
198 auto in = service::make_impl(
199 other.in_->ioc, other.in_->fc);
200 in_ = std::move(other.in_);
201 out_ = std::move(other.out_);
202 other.in_ = in;
203 }
204
205 stream&
206 stream::
207 operator=(stream&& other)
208 {
209 close();
210 auto in = service::make_impl(
211 other.in_->ioc, other.in_->fc);
212 in_->remove();
213 in_ = std::move(other.in_);
214 out_ = std::move(other.out_);
215 other.in_ = in;
216 return *this;
217 }
218
219 //------------------------------------------------------------------------------
220
221 stream::
222 stream(net::io_context& ioc)
223 : in_(service::make_impl(ioc, nullptr))
224 {
225 }
226
227 stream::
228 stream(
229 net::io_context& ioc,
230 fail_count& fc)
231 : in_(service::make_impl(ioc, &fc))
232 {
233 }
234
235 stream::
236 stream(
237 net::io_context& ioc,
238 string_view s)
239 : in_(service::make_impl(ioc, nullptr))
240 {
241 in_->b.commit(net::buffer_copy(
242 in_->b.prepare(s.size()),
243 net::buffer(s.data(), s.size())));
244 }
245
246 stream::
247 stream(
248 net::io_context& ioc,
249 fail_count& fc,
250 string_view s)
251 : in_(service::make_impl(ioc, &fc))
252 {
253 in_->b.commit(net::buffer_copy(
254 in_->b.prepare(s.size()),
255 net::buffer(s.data(), s.size())));
256 }
257
258 void
259 stream::
260 connect(stream& remote)
261 {
262 BOOST_ASSERT(! out_.lock());
263 BOOST_ASSERT(! remote.out_.lock());
264 std::lock(in_->m, remote.in_->m);
265 std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock};
266 std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock};
267 out_ = remote.in_;
268 remote.out_ = in_;
269 in_->code = status::ok;
270 remote.in_->code = status::ok;
271 }
272
273 string_view
274 stream::
275 str() const
276 {
277 auto const bs = in_->b.data();
278 if(buffer_bytes(bs) == 0)
279 return {};
280 net::const_buffer const b = *net::buffer_sequence_begin(bs);
281 return {static_cast<char const*>(b.data()), b.size()};
282 }
283
284 void
285 stream::
286 append(string_view s)
287 {
288 std::lock_guard<std::mutex> lock{in_->m};
289 in_->b.commit(net::buffer_copy(
290 in_->b.prepare(s.size()),
291 net::buffer(s.data(), s.size())));
292 }
293
294 void
295 stream::
296 clear()
297 {
298 std::lock_guard<std::mutex> lock{in_->m};
299 in_->b.consume(in_->b.size());
300 }
301
302 void
303 stream::
304 close()
305 {
306 in_->cancel_read();
307
308 // disconnect
309 {
310 auto out = out_.lock();
311 out_.reset();
312
313 // notify peer
314 if(out)
315 {
316 std::lock_guard<std::mutex> lock(out->m);
317 if(out->code == status::ok)
318 {
319 out->code = status::eof;
320 out->notify_read();
321 }
322 }
323 }
324 }
325
326 void
327 stream::
328 close_remote()
329 {
330 std::lock_guard<std::mutex> lock{in_->m};
331 if(in_->code == status::ok)
332 {
333 in_->code = status::eof;
334 in_->notify_read();
335 }
336 }
337
338 void
339 teardown(
340 role_type,
341 stream& s,
342 boost::system::error_code& ec)
343 {
344 if( s.in_->fc &&
345 s.in_->fc->fail(ec))
346 return;
347
348 s.close();
349
350 if( s.in_->fc &&
351 s.in_->fc->fail(ec))
352 ec = net::error::eof;
353 else
354 ec = {};
355 }
356
357 //------------------------------------------------------------------------------
358
359 stream
360 connect(stream& to)
361 {
362 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
363 stream from{net::query(to.get_executor(), net::execution::context)};
364 #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
365 stream from{to.get_executor().context()};
366 #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
367 from.connect(to);
368 return from;
369 }
370
371 void
372 connect(stream& s1, stream& s2)
373 {
374 s1.connect(s2);
375 }
376
377 } // test
378 } // beast
379 } // boost
380
381 #endif