]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/beast/test/extras/include/boost/beast/test/stream.hpp
Add patch for failing prerm scripts
[ceph.git] / ceph / src / boost / libs / beast / test / extras / include / boost / beast / test / stream.hpp
CommitLineData
b32b8144
FG
1//
2// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_TEST_STREAM_HPP
11#define BOOST_BEAST_TEST_STREAM_HPP
12
13#include <boost/beast/core/bind_handler.hpp>
14#include <boost/beast/core/buffers_prefix.hpp>
15#include <boost/beast/core/flat_buffer.hpp>
16#include <boost/beast/core/string.hpp>
17#include <boost/beast/core/type_traits.hpp>
18#include <boost/beast/websocket/teardown.hpp>
19#include <boost/beast/test/fail_counter.hpp>
20#include <boost/asio/async_result.hpp>
21#include <boost/asio/buffer.hpp>
22#include <boost/asio/io_context.hpp>
23#include <boost/asio/post.hpp>
24#include <boost/assert.hpp>
25#include <boost/optional.hpp>
26#include <boost/throw_exception.hpp>
27#include <condition_variable>
28#include <limits>
29#include <memory>
30#include <mutex>
31#include <utility>
32
33namespace boost {
34namespace beast {
35namespace test {
36
37/** A bidirectional in-memory communication channel
38
39 An instance of this class provides a client and server
40 endpoint that are automatically connected to each other
41 similarly to a connected socket.
42
43 Test pipes are used to facilitate writing unit tests
44 where the behavior of the transport is tightly controlled
45 to help illuminate all code paths (for code coverage)
46*/
47class stream
48{
49 struct read_op
50 {
51 virtual ~read_op() = default;
52 virtual void operator()() = 0;
53 };
54
55 template<class Handler, class Buffers>
56 class read_op_impl;
57
58 enum class status
59 {
60 ok,
61 eof,
62 reset
63 };
64
65 struct state
66 {
67 friend class stream;
68
69 std::mutex m;
70 flat_buffer b;
71 std::condition_variable cv;
72 std::unique_ptr<read_op> op;
73 boost::asio::io_context& ioc;
74 status code = status::ok;
75 fail_counter* fc = nullptr;
76 std::size_t nread = 0;
77 std::size_t nwrite = 0;
78 std::size_t read_max =
79 (std::numeric_limits<std::size_t>::max)();
80 std::size_t write_max =
81 (std::numeric_limits<std::size_t>::max)();
82
83 ~state()
84 {
85 BOOST_ASSERT(! op);
86 }
87
88 explicit
89 state(
90 boost::asio::io_context& ioc_,
91 fail_counter* fc_)
92 : ioc(ioc_)
93 , fc(fc_)
94 {
95 }
96
97 void
98 on_write()
99 {
100 if(op)
101 {
102 std::unique_ptr<read_op> op_ = std::move(op);
103 op_->operator()();
104 }
105 else
106 {
107 cv.notify_all();
108 }
109 }
110 };
111
112 std::shared_ptr<state> in_;
113 std::weak_ptr<state> out_;
114
115public:
116 using buffer_type = flat_buffer;
117
118 /// The type of the lowest layer.
119 using lowest_layer_type = stream;
120
121 /// Destructor
122 ~stream()
123 {
124 {
125 std::unique_lock<std::mutex> lock{in_->m};
126 in_->op.reset();
127 }
128 auto out = out_.lock();
129 if(out)
130 {
131 std::unique_lock<std::mutex> lock{out->m};
132 if(out->code == status::ok)
133 {
134 out->code = status::reset;
135 out->on_write();
136 }
137 }
138 }
139
140 /// Constructor
141 stream(stream&& other)
142 {
143 auto in = std::make_shared<state>(
144 other.in_->ioc, other.in_->fc);
145 in_ = std::move(other.in_);
146 out_ = std::move(other.out_);
147 other.in_ = in;
148 }
149
150 /// Assignment
151 stream&
152 operator=(stream&& other)
153 {
154 auto in = std::make_shared<state>(
155 other.in_->ioc, other.in_->fc);
156 in_ = std::move(other.in_);
157 out_ = std::move(other.out_);
158 other.in_ = in;
159 return *this;
160 }
161
162 /// Constructor
163 explicit
164 stream(boost::asio::io_context& ioc)
165 : in_(std::make_shared<state>(ioc, nullptr))
166 {
167 }
168
169 /// Constructor
170 stream(
171 boost::asio::io_context& ioc,
172 fail_counter& fc)
173 : in_(std::make_shared<state>(ioc, &fc))
174 {
175 }
176
177 /// Constructor
178 stream(
179 boost::asio::io_context& ioc,
180 string_view s)
181 : in_(std::make_shared<state>(ioc, nullptr))
182 {
183 using boost::asio::buffer;
184 using boost::asio::buffer_copy;
185 in_->b.commit(buffer_copy(
186 in_->b.prepare(s.size()),
187 buffer(s.data(), s.size())));
188 }
189
190 /// Constructor
191 stream(
192 boost::asio::io_context& ioc,
193 fail_counter& fc,
194 string_view s)
195 : in_(std::make_shared<state>(ioc, &fc))
196 {
197 using boost::asio::buffer;
198 using boost::asio::buffer_copy;
199 in_->b.commit(buffer_copy(
200 in_->b.prepare(s.size()),
201 buffer(s.data(), s.size())));
202 }
203
204 /// Establish a connection
205 void
206 connect(stream& remote)
207 {
208 BOOST_ASSERT(! out_.lock());
209 BOOST_ASSERT(! remote.out_.lock());
210 out_ = remote.in_;
211 remote.out_ = in_;
212 }
213
214 /// The type of the executor associated with the object.
215 using executor_type =
216 boost::asio::io_context::executor_type;
217
218 /// Return the executor associated with the object.
219 boost::asio::io_context::executor_type
220 get_executor() noexcept
221 {
222 return in_->ioc.get_executor();
223 };
224
225 /** Get a reference to the lowest layer
226
227 This function returns a reference to the lowest layer
228 in a stack of stream layers.
229
230 @return A reference to the lowest layer in the stack of
231 stream layers.
232 */
233 lowest_layer_type&
234 lowest_layer()
235 {
236 return *this;
237 }
238
239 /** Get a reference to the lowest layer
240
241 This function returns a reference to the lowest layer
242 in a stack of stream layers.
243
244 @return A reference to the lowest layer in the stack of
245 stream layers. Ownership is not transferred to the caller.
246 */
247 lowest_layer_type const&
248 lowest_layer() const
249 {
250 return *this;
251 }
252
253 /// Set the maximum number of bytes returned by read_some
254 void
255 read_size(std::size_t n)
256 {
257 in_->read_max = n;
258 }
259
260 /// Set the maximum number of bytes returned by write_some
261 void
262 write_size(std::size_t n)
263 {
264 in_->write_max = n;
265 }
266
267 /// Direct input buffer access
268 buffer_type&
269 buffer()
270 {
271 return in_->b;
272 }
273
274 /// Returns a string view representing the pending input data
275 string_view
276 str() const
277 {
278 auto const bs = in_->b.data();
279 if(boost::asio::buffer_size(bs) == 0)
280 return {};
281 auto const b = buffers_front(bs);
282 return {reinterpret_cast<char const*>(b.data()), b.size()};
283 }
284
285 /// Appends a string to the pending input data
286 void
287 append(string_view s)
288 {
289 using boost::asio::buffer;
290 using boost::asio::buffer_copy;
291 std::lock_guard<std::mutex> lock{in_->m};
292 in_->b.commit(buffer_copy(
293 in_->b.prepare(s.size()),
294 buffer(s.data(), s.size())));
295 }
296
297 /// Clear the pending input area
298 void
299 clear()
300 {
301 std::lock_guard<std::mutex> lock{in_->m};
302 in_->b.consume(in_->b.size());
303 }
304
305 /// Return the number of reads
306 std::size_t
307 nread() const
308 {
309 return in_->nread;
310 }
311
312 /// Return the number of writes
313 std::size_t
314 nwrite() const
315 {
316 return in_->nwrite;
317 }
318
319 /** Close the stream.
320
321 The other end of the connection will see
322 `error::eof` after reading all the remaining data.
323 */
324 void
325 close();
326
327 /** Close the other end of the stream.
328
329 This end of the connection will see
330 `error::eof` after reading all the remaining data.
331 */
332 void
333 close_remote();
334
335 template<class MutableBufferSequence>
336 std::size_t
337 read_some(MutableBufferSequence const& buffers);
338
339 template<class MutableBufferSequence>
340 std::size_t
341 read_some(MutableBufferSequence const& buffers,
342 error_code& ec);
343
344 template<class MutableBufferSequence, class ReadHandler>
345 BOOST_ASIO_INITFN_RESULT_TYPE(
346 ReadHandler, void(error_code, std::size_t))
347 async_read_some(MutableBufferSequence const& buffers,
348 ReadHandler&& handler);
349
350 template<class ConstBufferSequence>
351 std::size_t
352 write_some(ConstBufferSequence const& buffers);
353
354 template<class ConstBufferSequence>
355 std::size_t
356 write_some(
357 ConstBufferSequence const& buffers, error_code&);
358
359 template<class ConstBufferSequence, class WriteHandler>
360 BOOST_ASIO_INITFN_RESULT_TYPE(
361 WriteHandler, void(error_code, std::size_t))
362 async_write_some(ConstBufferSequence const& buffers,
363 WriteHandler&& handler);
364
365 friend
366 void
367 teardown(websocket::role_type,
368 stream& s, boost::system::error_code& ec);
369
370 template<class TeardownHandler>
371 friend
372 void
373 async_teardown(websocket::role_type role,
374 stream& s, TeardownHandler&& handler);
375};
376
377//------------------------------------------------------------------------------
378
379inline
380void
381stream::
382close()
383{
384 BOOST_ASSERT(! in_->op);
385 auto out = out_.lock();
386 if(! out)
387 return;
388 std::lock_guard<std::mutex> lock{out->m};
389 if(out->code == status::ok)
390 {
391 out->code = status::eof;
392 out->on_write();
393 }
394}
395
396inline
397void
398stream::
399close_remote()
400{
401 std::lock_guard<std::mutex> lock{in_->m};
402 if(in_->code == status::ok)
403 {
404 in_->code = status::eof;
405 in_->on_write();
406 }
407}
408
409template<class MutableBufferSequence>
410std::size_t
411stream::
412read_some(MutableBufferSequence const& buffers)
413{
414 static_assert(boost::asio::is_mutable_buffer_sequence<
415 MutableBufferSequence>::value,
416 "MutableBufferSequence requirements not met");
417 error_code ec;
418 auto const n = read_some(buffers, ec);
419 if(ec)
420 BOOST_THROW_EXCEPTION(system_error{ec});
421 return n;
422}
423
424template<class MutableBufferSequence>
425std::size_t
426stream::
427read_some(MutableBufferSequence const& buffers,
428 error_code& ec)
429{
430 static_assert(boost::asio::is_mutable_buffer_sequence<
431 MutableBufferSequence>::value,
432 "MutableBufferSequence requirements not met");
433 using boost::asio::buffer_copy;
434 using boost::asio::buffer_size;
b32b8144
FG
435 if(in_->fc && in_->fc->fail(ec))
436 return 0;
11fdf7f2
TL
437 if(buffer_size(buffers) == 0)
438 {
439 ec.clear();
440 return 0;
441 }
b32b8144
FG
442 std::unique_lock<std::mutex> lock{in_->m};
443 BOOST_ASSERT(! in_->op);
444 in_->cv.wait(lock,
445 [&]()
446 {
447 return
448 in_->b.size() > 0 ||
449 in_->code != status::ok;
450 });
451 std::size_t bytes_transferred;
452 if(in_->b.size() > 0)
453 {
454 ec.assign(0, ec.category());
455 bytes_transferred = buffer_copy(
456 buffers, in_->b.data(), in_->read_max);
457 in_->b.consume(bytes_transferred);
458 }
459 else
460 {
461 BOOST_ASSERT(in_->code != status::ok);
462 bytes_transferred = 0;
463 if(in_->code == status::eof)
464 ec = boost::asio::error::eof;
465 else if(in_->code == status::reset)
466 ec = boost::asio::error::connection_reset;
467 }
468 ++in_->nread;
469 return bytes_transferred;
470}
471
472template<class MutableBufferSequence, class ReadHandler>
473BOOST_ASIO_INITFN_RESULT_TYPE(
474 ReadHandler, void(error_code, std::size_t))
475stream::
476async_read_some(
477 MutableBufferSequence const& buffers,
478 ReadHandler&& handler)
479{
480 static_assert(boost::asio::is_mutable_buffer_sequence<
481 MutableBufferSequence>::value,
482 "MutableBufferSequence requirements not met");
483 using boost::asio::buffer_copy;
484 using boost::asio::buffer_size;
11fdf7f2
TL
485 BOOST_BEAST_HANDLER_INIT(
486 ReadHandler, void(error_code, std::size_t));
b32b8144
FG
487 if(in_->fc)
488 {
489 error_code ec;
490 if(in_->fc->fail(ec))
491 return boost::asio::post(
492 in_->ioc.get_executor(),
493 bind_handler(
11fdf7f2 494 std::move(init.completion_handler),
b32b8144
FG
495 ec,
496 0));
497 }
498 {
499 std::unique_lock<std::mutex> lock{in_->m};
500 BOOST_ASSERT(! in_->op);
501 if(buffer_size(buffers) == 0 ||
502 buffer_size(in_->b.data()) > 0)
503 {
504 auto const bytes_transferred = buffer_copy(
505 buffers, in_->b.data(), in_->read_max);
506 in_->b.consume(bytes_transferred);
507 lock.unlock();
508 ++in_->nread;
509 boost::asio::post(
510 in_->ioc.get_executor(),
511 bind_handler(
11fdf7f2 512 std::move(init.completion_handler),
b32b8144
FG
513 error_code{},
514 bytes_transferred));
515 }
516 else if(in_->code != status::ok)
517 {
518 lock.unlock();
519 ++in_->nread;
520 error_code ec;
521 if(in_->code == status::eof)
522 ec = boost::asio::error::eof;
523 else if(in_->code == status::reset)
524 ec = boost::asio::error::connection_reset;
525 boost::asio::post(
526 in_->ioc.get_executor(),
527 bind_handler(
11fdf7f2 528 std::move(init.completion_handler),
b32b8144
FG
529 ec,
530 0));
531 }
532 else
533 {
534 in_->op.reset(new read_op_impl<BOOST_ASIO_HANDLER_TYPE(
535 ReadHandler, void(error_code, std::size_t)),
536 MutableBufferSequence>{*in_, buffers,
11fdf7f2 537 std::move(init.completion_handler)});
b32b8144
FG
538 }
539 }
540 return init.result.get();
541}
542
543template<class ConstBufferSequence>
544std::size_t
545stream::
546write_some(ConstBufferSequence const& buffers)
547{
548 static_assert(boost::asio::is_const_buffer_sequence<
549 ConstBufferSequence>::value,
550 "ConstBufferSequence requirements not met");
551 error_code ec;
552 auto const bytes_transferred =
553 write_some(buffers, ec);
554 if(ec)
555 BOOST_THROW_EXCEPTION(system_error{ec});
556 return bytes_transferred;
557}
558
559template<class ConstBufferSequence>
560std::size_t
561stream::
562write_some(
563 ConstBufferSequence const& buffers, error_code& ec)
564{
565 static_assert(boost::asio::is_const_buffer_sequence<
566 ConstBufferSequence>::value,
567 "ConstBufferSequence requirements not met");
568 using boost::asio::buffer_copy;
569 using boost::asio::buffer_size;
570 auto out = out_.lock();
571 if(! out)
572 {
573 ec = boost::asio::error::connection_reset;
574 return 0;
575 }
576 BOOST_ASSERT(out->code == status::ok);
577 if(in_->fc && in_->fc->fail(ec))
578 return 0;
579 auto const n = (std::min)(
580 buffer_size(buffers), in_->write_max);
581 std::unique_lock<std::mutex> lock{out->m};
582 auto const bytes_transferred =
583 buffer_copy(out->b.prepare(n), buffers);
584 out->b.commit(bytes_transferred);
585 out->on_write();
586 lock.unlock();
587 ++in_->nwrite;
588 ec.assign(0, ec.category());
589 return bytes_transferred;
590}
591
592template<class ConstBufferSequence, class WriteHandler>
593BOOST_ASIO_INITFN_RESULT_TYPE(
594 WriteHandler, void(error_code, std::size_t))
595stream::
596async_write_some(ConstBufferSequence const& buffers,
597 WriteHandler&& handler)
598{
599 static_assert(boost::asio::is_const_buffer_sequence<
600 ConstBufferSequence>::value,
601 "ConstBufferSequence requirements not met");
602 using boost::asio::buffer_copy;
603 using boost::asio::buffer_size;
11fdf7f2
TL
604 BOOST_BEAST_HANDLER_INIT(
605 WriteHandler, void(error_code, std::size_t));
b32b8144
FG
606 auto out = out_.lock();
607 if(! out)
608 return boost::asio::post(
609 in_->ioc.get_executor(),
610 bind_handler(
11fdf7f2 611 std::move(init.completion_handler),
b32b8144
FG
612 boost::asio::error::connection_reset,
613 0));
614 BOOST_ASSERT(out->code == status::ok);
615 if(in_->fc)
616 {
617 error_code ec;
618 if(in_->fc->fail(ec))
619 return boost::asio::post(
620 in_->ioc.get_executor(),
621 bind_handler(
11fdf7f2 622 std::move(init.completion_handler),
b32b8144
FG
623 ec,
624 0));
625 }
626 auto const n =
627 (std::min)(buffer_size(buffers), in_->write_max);
628 std::unique_lock<std::mutex> lock{out->m};
629 auto const bytes_transferred =
630 buffer_copy(out->b.prepare(n), buffers);
631 out->b.commit(bytes_transferred);
632 out->on_write();
633 lock.unlock();
634 ++in_->nwrite;
635 boost::asio::post(
636 in_->ioc.get_executor(),
637 bind_handler(
11fdf7f2 638 std::move(init.completion_handler),
b32b8144
FG
639 error_code{},
640 bytes_transferred));
641 return init.result.get();
642}
643
644inline
645void
646teardown(
647 websocket::role_type,
648 stream& s,
649 boost::system::error_code& ec)
650{
651 if( s.in_->fc &&
652 s.in_->fc->fail(ec))
653 return;
654
655 s.close();
656
657 if( s.in_->fc &&
658 s.in_->fc->fail(ec))
659 ec = boost::asio::error::eof;
660 else
661 ec.assign(0, ec.category());
662}
663
664template<class TeardownHandler>
665inline
666void
667async_teardown(
668 websocket::role_type,
669 stream& s,
670 TeardownHandler&& handler)
671{
672 error_code ec;
673 if( s.in_->fc &&
674 s.in_->fc->fail(ec))
675 return boost::asio::post(
676 s.get_executor(),
677 bind_handler(std::move(handler), ec));
678 s.close();
679 if( s.in_->fc &&
680 s.in_->fc->fail(ec))
681 ec = boost::asio::error::eof;
682 else
683 ec.assign(0, ec.category());
684
685 boost::asio::post(
686 s.get_executor(),
687 bind_handler(std::move(handler), ec));
688}
689
690//------------------------------------------------------------------------------
691
692template<class Handler, class Buffers>
693class stream::read_op_impl : public stream::read_op
694{
695 class lambda
696 {
697 state& s_;
698 Buffers b_;
699 Handler h_;
700 boost::optional<
701 boost::asio::executor_work_guard<
702 boost::asio::io_context::executor_type>> work_;
703
704 public:
705 lambda(lambda&&) = default;
706 lambda(lambda const&) = default;
707
11fdf7f2
TL
708 template<class DeducedHandler>
709 lambda(state& s, Buffers const& b, DeducedHandler&& h)
b32b8144
FG
710 : s_(s)
711 , b_(b)
11fdf7f2 712 , h_(std::forward<DeducedHandler>(h))
b32b8144
FG
713 , work_(s_.ioc.get_executor())
714 {
715 }
716
717 void
718 post()
719 {
720 boost::asio::post(
721 s_.ioc.get_executor(),
722 std::move(*this));
723 work_ = boost::none;
724 }
725
726 void
727 operator()()
728 {
729 using boost::asio::buffer_copy;
730 using boost::asio::buffer_size;
731 std::unique_lock<std::mutex> lock{s_.m};
732 BOOST_ASSERT(! s_.op);
733 if(s_.b.size() > 0)
734 {
735 auto const bytes_transferred = buffer_copy(
736 b_, s_.b.data(), s_.read_max);
737 s_.b.consume(bytes_transferred);
738 auto& s = s_;
739 Handler h{std::move(h_)};
740 lock.unlock();
741 ++s.nread;
742 boost::asio::post(
743 s.ioc.get_executor(),
744 bind_handler(
745 std::move(h),
746 error_code{},
747 bytes_transferred));
748 }
749 else
750 {
751 BOOST_ASSERT(s_.code != status::ok);
752 auto& s = s_;
753 Handler h{std::move(h_)};
754 lock.unlock();
755 ++s.nread;
756 error_code ec;
757 if(s.code == status::eof)
758 ec = boost::asio::error::eof;
759 else if(s.code == status::reset)
760 ec = boost::asio::error::connection_reset;
761 boost::asio::post(
762 s.ioc.get_executor(),
763 bind_handler(std::move(h), ec, 0));
764 }
765 }
766 };
767
768 lambda fn_;
769
770public:
11fdf7f2
TL
771 template<class DeducedHandler>
772 read_op_impl(state& s, Buffers const& b, DeducedHandler&& h)
773 : fn_(s, b, std::forward<DeducedHandler>(h))
b32b8144
FG
774 {
775 }
776
777 void
778 operator()() override
779 {
780 fn_.post();
781 }
782};
783
784/// Create and return a connected stream
785inline
786stream
787connect(stream& to)
788{
789 stream from{to.get_executor().context()};
790 from.connect(to);
791 return from;
792}
793
794/// Create and return a connected stream
795template<class Arg1, class... ArgN>
796stream
797connect(stream& to, Arg1&& arg1, ArgN&&... argn)
798{
799 stream from{
800 std::forward<Arg1>(arg1),
801 std::forward<ArgN>(argn)...};
802 from.connect(to);
803 return from;
804}
805
806} // test
807} // beast
808} // boost
809
810#endif