]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/beast/test/extras/include/boost/beast/test/stream.hpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / beast / test / extras / include / boost / beast / test / stream.hpp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_TEST_STREAM_HPP
11 #define BOOST_BEAST_TEST_STREAM_HPP
12
13 #include <boost/beast/core/bind_handler.hpp>
14 #include <boost/beast/core/buffers_prefix.hpp>
15 #include <boost/beast/core/flat_buffer.hpp>
16 #include <boost/beast/core/string.hpp>
17 #include <boost/beast/core/type_traits.hpp>
18 #include <boost/beast/websocket/teardown.hpp>
19 #include <boost/beast/test/fail_counter.hpp>
20 #include <boost/asio/async_result.hpp>
21 #include <boost/asio/buffer.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/post.hpp>
24 #include <boost/assert.hpp>
25 #include <boost/optional.hpp>
26 #include <boost/throw_exception.hpp>
27 #include <condition_variable>
28 #include <limits>
29 #include <memory>
30 #include <mutex>
31 #include <utility>
32
33 namespace boost {
34 namespace beast {
35 namespace test {
36
37 /** A bidirectional in-memory communication channel
38
39 An instance of this class provides a client and server
40 endpoint that are automatically connected to each other
41 similarly to a connected socket.
42
43 Test pipes are used to facilitate writing unit tests
44 where the behavior of the transport is tightly controlled
45 to help illuminate all code paths (for code coverage)
46 */
47 class stream
48 {
49 struct read_op
50 {
51 virtual ~read_op() = default;
52 virtual void operator()() = 0;
53 };
54
55 template<class Handler, class Buffers>
56 class read_op_impl;
57
58 enum class status
59 {
60 ok,
61 eof,
62 reset
63 };
64
65 struct state
66 {
67 friend class stream;
68
69 std::mutex m;
70 flat_buffer b;
71 std::condition_variable cv;
72 std::unique_ptr<read_op> op;
73 boost::asio::io_context& ioc;
74 status code = status::ok;
75 fail_counter* fc = nullptr;
76 std::size_t nread = 0;
77 std::size_t nwrite = 0;
78 std::size_t read_max =
79 (std::numeric_limits<std::size_t>::max)();
80 std::size_t write_max =
81 (std::numeric_limits<std::size_t>::max)();
82
83 ~state()
84 {
85 BOOST_ASSERT(! op);
86 }
87
88 explicit
89 state(
90 boost::asio::io_context& ioc_,
91 fail_counter* fc_)
92 : ioc(ioc_)
93 , fc(fc_)
94 {
95 }
96
97 void
98 on_write()
99 {
100 if(op)
101 {
102 std::unique_ptr<read_op> op_ = std::move(op);
103 op_->operator()();
104 }
105 else
106 {
107 cv.notify_all();
108 }
109 }
110 };
111
112 std::shared_ptr<state> in_;
113 std::weak_ptr<state> out_;
114
115 public:
116 using buffer_type = flat_buffer;
117
118 /// The type of the lowest layer.
119 using lowest_layer_type = stream;
120
121 /// Destructor
122 ~stream()
123 {
124 {
125 std::unique_lock<std::mutex> lock{in_->m};
126 in_->op.reset();
127 }
128 auto out = out_.lock();
129 if(out)
130 {
131 std::unique_lock<std::mutex> lock{out->m};
132 if(out->code == status::ok)
133 {
134 out->code = status::reset;
135 out->on_write();
136 }
137 }
138 }
139
140 /// Constructor
141 stream(stream&& other)
142 {
143 auto in = std::make_shared<state>(
144 other.in_->ioc, other.in_->fc);
145 in_ = std::move(other.in_);
146 out_ = std::move(other.out_);
147 other.in_ = in;
148 }
149
150 /// Assignment
151 stream&
152 operator=(stream&& other)
153 {
154 auto in = std::make_shared<state>(
155 other.in_->ioc, other.in_->fc);
156 in_ = std::move(other.in_);
157 out_ = std::move(other.out_);
158 other.in_ = in;
159 return *this;
160 }
161
162 /// Constructor
163 explicit
164 stream(boost::asio::io_context& ioc)
165 : in_(std::make_shared<state>(ioc, nullptr))
166 {
167 }
168
169 /// Constructor
170 stream(
171 boost::asio::io_context& ioc,
172 fail_counter& fc)
173 : in_(std::make_shared<state>(ioc, &fc))
174 {
175 }
176
177 /// Constructor
178 stream(
179 boost::asio::io_context& ioc,
180 string_view s)
181 : in_(std::make_shared<state>(ioc, nullptr))
182 {
183 using boost::asio::buffer;
184 using boost::asio::buffer_copy;
185 in_->b.commit(buffer_copy(
186 in_->b.prepare(s.size()),
187 buffer(s.data(), s.size())));
188 }
189
190 /// Constructor
191 stream(
192 boost::asio::io_context& ioc,
193 fail_counter& fc,
194 string_view s)
195 : in_(std::make_shared<state>(ioc, &fc))
196 {
197 using boost::asio::buffer;
198 using boost::asio::buffer_copy;
199 in_->b.commit(buffer_copy(
200 in_->b.prepare(s.size()),
201 buffer(s.data(), s.size())));
202 }
203
204 /// Establish a connection
205 void
206 connect(stream& remote)
207 {
208 BOOST_ASSERT(! out_.lock());
209 BOOST_ASSERT(! remote.out_.lock());
210 out_ = remote.in_;
211 remote.out_ = in_;
212 }
213
214 /// The type of the executor associated with the object.
215 using executor_type =
216 boost::asio::io_context::executor_type;
217
218 /// Return the executor associated with the object.
219 boost::asio::io_context::executor_type
220 get_executor() noexcept
221 {
222 return in_->ioc.get_executor();
223 };
224
225 /** Get a reference to the lowest layer
226
227 This function returns a reference to the lowest layer
228 in a stack of stream layers.
229
230 @return A reference to the lowest layer in the stack of
231 stream layers.
232 */
233 lowest_layer_type&
234 lowest_layer()
235 {
236 return *this;
237 }
238
239 /** Get a reference to the lowest layer
240
241 This function returns a reference to the lowest layer
242 in a stack of stream layers.
243
244 @return A reference to the lowest layer in the stack of
245 stream layers. Ownership is not transferred to the caller.
246 */
247 lowest_layer_type const&
248 lowest_layer() const
249 {
250 return *this;
251 }
252
253 /// Set the maximum number of bytes returned by read_some
254 void
255 read_size(std::size_t n)
256 {
257 in_->read_max = n;
258 }
259
260 /// Set the maximum number of bytes returned by write_some
261 void
262 write_size(std::size_t n)
263 {
264 in_->write_max = n;
265 }
266
267 /// Direct input buffer access
268 buffer_type&
269 buffer()
270 {
271 return in_->b;
272 }
273
274 /// Returns a string view representing the pending input data
275 string_view
276 str() const
277 {
278 auto const bs = in_->b.data();
279 if(boost::asio::buffer_size(bs) == 0)
280 return {};
281 auto const b = buffers_front(bs);
282 return {reinterpret_cast<char const*>(b.data()), b.size()};
283 }
284
285 /// Appends a string to the pending input data
286 void
287 append(string_view s)
288 {
289 using boost::asio::buffer;
290 using boost::asio::buffer_copy;
291 std::lock_guard<std::mutex> lock{in_->m};
292 in_->b.commit(buffer_copy(
293 in_->b.prepare(s.size()),
294 buffer(s.data(), s.size())));
295 }
296
297 /// Clear the pending input area
298 void
299 clear()
300 {
301 std::lock_guard<std::mutex> lock{in_->m};
302 in_->b.consume(in_->b.size());
303 }
304
305 /// Return the number of reads
306 std::size_t
307 nread() const
308 {
309 return in_->nread;
310 }
311
312 /// Return the number of writes
313 std::size_t
314 nwrite() const
315 {
316 return in_->nwrite;
317 }
318
319 /** Close the stream.
320
321 The other end of the connection will see
322 `error::eof` after reading all the remaining data.
323 */
324 void
325 close();
326
327 /** Close the other end of the stream.
328
329 This end of the connection will see
330 `error::eof` after reading all the remaining data.
331 */
332 void
333 close_remote();
334
335 template<class MutableBufferSequence>
336 std::size_t
337 read_some(MutableBufferSequence const& buffers);
338
339 template<class MutableBufferSequence>
340 std::size_t
341 read_some(MutableBufferSequence const& buffers,
342 error_code& ec);
343
344 template<class MutableBufferSequence, class ReadHandler>
345 BOOST_ASIO_INITFN_RESULT_TYPE(
346 ReadHandler, void(error_code, std::size_t))
347 async_read_some(MutableBufferSequence const& buffers,
348 ReadHandler&& handler);
349
350 template<class ConstBufferSequence>
351 std::size_t
352 write_some(ConstBufferSequence const& buffers);
353
354 template<class ConstBufferSequence>
355 std::size_t
356 write_some(
357 ConstBufferSequence const& buffers, error_code&);
358
359 template<class ConstBufferSequence, class WriteHandler>
360 BOOST_ASIO_INITFN_RESULT_TYPE(
361 WriteHandler, void(error_code, std::size_t))
362 async_write_some(ConstBufferSequence const& buffers,
363 WriteHandler&& handler);
364
365 friend
366 void
367 teardown(websocket::role_type,
368 stream& s, boost::system::error_code& ec);
369
370 template<class TeardownHandler>
371 friend
372 void
373 async_teardown(websocket::role_type role,
374 stream& s, TeardownHandler&& handler);
375 };
376
377 //------------------------------------------------------------------------------
378
379 inline
380 void
381 stream::
382 close()
383 {
384 BOOST_ASSERT(! in_->op);
385 auto out = out_.lock();
386 if(! out)
387 return;
388 std::lock_guard<std::mutex> lock{out->m};
389 if(out->code == status::ok)
390 {
391 out->code = status::eof;
392 out->on_write();
393 }
394 }
395
396 inline
397 void
398 stream::
399 close_remote()
400 {
401 std::lock_guard<std::mutex> lock{in_->m};
402 if(in_->code == status::ok)
403 {
404 in_->code = status::eof;
405 in_->on_write();
406 }
407 }
408
409 template<class MutableBufferSequence>
410 std::size_t
411 stream::
412 read_some(MutableBufferSequence const& buffers)
413 {
414 static_assert(boost::asio::is_mutable_buffer_sequence<
415 MutableBufferSequence>::value,
416 "MutableBufferSequence requirements not met");
417 error_code ec;
418 auto const n = read_some(buffers, ec);
419 if(ec)
420 BOOST_THROW_EXCEPTION(system_error{ec});
421 return n;
422 }
423
424 template<class MutableBufferSequence>
425 std::size_t
426 stream::
427 read_some(MutableBufferSequence const& buffers,
428 error_code& ec)
429 {
430 static_assert(boost::asio::is_mutable_buffer_sequence<
431 MutableBufferSequence>::value,
432 "MutableBufferSequence requirements not met");
433 using boost::asio::buffer_copy;
434 using boost::asio::buffer_size;
435 BOOST_ASSERT(buffer_size(buffers) > 0);
436 if(in_->fc && in_->fc->fail(ec))
437 return 0;
438 std::unique_lock<std::mutex> lock{in_->m};
439 BOOST_ASSERT(! in_->op);
440 in_->cv.wait(lock,
441 [&]()
442 {
443 return
444 in_->b.size() > 0 ||
445 in_->code != status::ok;
446 });
447 std::size_t bytes_transferred;
448 if(in_->b.size() > 0)
449 {
450 ec.assign(0, ec.category());
451 bytes_transferred = buffer_copy(
452 buffers, in_->b.data(), in_->read_max);
453 in_->b.consume(bytes_transferred);
454 }
455 else
456 {
457 BOOST_ASSERT(in_->code != status::ok);
458 bytes_transferred = 0;
459 if(in_->code == status::eof)
460 ec = boost::asio::error::eof;
461 else if(in_->code == status::reset)
462 ec = boost::asio::error::connection_reset;
463 }
464 ++in_->nread;
465 return bytes_transferred;
466 }
467
468 template<class MutableBufferSequence, class ReadHandler>
469 BOOST_ASIO_INITFN_RESULT_TYPE(
470 ReadHandler, void(error_code, std::size_t))
471 stream::
472 async_read_some(
473 MutableBufferSequence const& buffers,
474 ReadHandler&& handler)
475 {
476 static_assert(boost::asio::is_mutable_buffer_sequence<
477 MutableBufferSequence>::value,
478 "MutableBufferSequence requirements not met");
479 using boost::asio::buffer_copy;
480 using boost::asio::buffer_size;
481 BOOST_ASSERT(buffer_size(buffers) > 0);
482 boost::asio::async_completion<ReadHandler,
483 void(error_code, std::size_t)> init{handler};
484 if(in_->fc)
485 {
486 error_code ec;
487 if(in_->fc->fail(ec))
488 return boost::asio::post(
489 in_->ioc.get_executor(),
490 bind_handler(
491 init.completion_handler,
492 ec,
493 0));
494 }
495 {
496 std::unique_lock<std::mutex> lock{in_->m};
497 BOOST_ASSERT(! in_->op);
498 if(buffer_size(buffers) == 0 ||
499 buffer_size(in_->b.data()) > 0)
500 {
501 auto const bytes_transferred = buffer_copy(
502 buffers, in_->b.data(), in_->read_max);
503 in_->b.consume(bytes_transferred);
504 lock.unlock();
505 ++in_->nread;
506 boost::asio::post(
507 in_->ioc.get_executor(),
508 bind_handler(
509 init.completion_handler,
510 error_code{},
511 bytes_transferred));
512 }
513 else if(in_->code != status::ok)
514 {
515 lock.unlock();
516 ++in_->nread;
517 error_code ec;
518 if(in_->code == status::eof)
519 ec = boost::asio::error::eof;
520 else if(in_->code == status::reset)
521 ec = boost::asio::error::connection_reset;
522 boost::asio::post(
523 in_->ioc.get_executor(),
524 bind_handler(
525 init.completion_handler,
526 ec,
527 0));
528 }
529 else
530 {
531 in_->op.reset(new read_op_impl<BOOST_ASIO_HANDLER_TYPE(
532 ReadHandler, void(error_code, std::size_t)),
533 MutableBufferSequence>{*in_, buffers,
534 init.completion_handler});
535 }
536 }
537 return init.result.get();
538 }
539
540 template<class ConstBufferSequence>
541 std::size_t
542 stream::
543 write_some(ConstBufferSequence const& buffers)
544 {
545 static_assert(boost::asio::is_const_buffer_sequence<
546 ConstBufferSequence>::value,
547 "ConstBufferSequence requirements not met");
548 error_code ec;
549 auto const bytes_transferred =
550 write_some(buffers, ec);
551 if(ec)
552 BOOST_THROW_EXCEPTION(system_error{ec});
553 return bytes_transferred;
554 }
555
556 template<class ConstBufferSequence>
557 std::size_t
558 stream::
559 write_some(
560 ConstBufferSequence const& buffers, error_code& ec)
561 {
562 static_assert(boost::asio::is_const_buffer_sequence<
563 ConstBufferSequence>::value,
564 "ConstBufferSequence requirements not met");
565 using boost::asio::buffer_copy;
566 using boost::asio::buffer_size;
567 auto out = out_.lock();
568 if(! out)
569 {
570 ec = boost::asio::error::connection_reset;
571 return 0;
572 }
573 BOOST_ASSERT(out->code == status::ok);
574 if(in_->fc && in_->fc->fail(ec))
575 return 0;
576 auto const n = (std::min)(
577 buffer_size(buffers), in_->write_max);
578 std::unique_lock<std::mutex> lock{out->m};
579 auto const bytes_transferred =
580 buffer_copy(out->b.prepare(n), buffers);
581 out->b.commit(bytes_transferred);
582 out->on_write();
583 lock.unlock();
584 ++in_->nwrite;
585 ec.assign(0, ec.category());
586 return bytes_transferred;
587 }
588
589 template<class ConstBufferSequence, class WriteHandler>
590 BOOST_ASIO_INITFN_RESULT_TYPE(
591 WriteHandler, void(error_code, std::size_t))
592 stream::
593 async_write_some(ConstBufferSequence const& buffers,
594 WriteHandler&& handler)
595 {
596 static_assert(boost::asio::is_const_buffer_sequence<
597 ConstBufferSequence>::value,
598 "ConstBufferSequence requirements not met");
599 using boost::asio::buffer_copy;
600 using boost::asio::buffer_size;
601 boost::asio::async_completion<WriteHandler,
602 void(error_code, std::size_t)> init{handler};
603 auto out = out_.lock();
604 if(! out)
605 return boost::asio::post(
606 in_->ioc.get_executor(),
607 bind_handler(
608 init.completion_handler,
609 boost::asio::error::connection_reset,
610 0));
611 BOOST_ASSERT(out->code == status::ok);
612 if(in_->fc)
613 {
614 error_code ec;
615 if(in_->fc->fail(ec))
616 return boost::asio::post(
617 in_->ioc.get_executor(),
618 bind_handler(
619 init.completion_handler,
620 ec,
621 0));
622 }
623 auto const n =
624 (std::min)(buffer_size(buffers), in_->write_max);
625 std::unique_lock<std::mutex> lock{out->m};
626 auto const bytes_transferred =
627 buffer_copy(out->b.prepare(n), buffers);
628 out->b.commit(bytes_transferred);
629 out->on_write();
630 lock.unlock();
631 ++in_->nwrite;
632 boost::asio::post(
633 in_->ioc.get_executor(),
634 bind_handler(
635 init.completion_handler,
636 error_code{},
637 bytes_transferred));
638 return init.result.get();
639 }
640
641 inline
642 void
643 teardown(
644 websocket::role_type,
645 stream& s,
646 boost::system::error_code& ec)
647 {
648 if( s.in_->fc &&
649 s.in_->fc->fail(ec))
650 return;
651
652 s.close();
653
654 if( s.in_->fc &&
655 s.in_->fc->fail(ec))
656 ec = boost::asio::error::eof;
657 else
658 ec.assign(0, ec.category());
659 }
660
661 template<class TeardownHandler>
662 inline
663 void
664 async_teardown(
665 websocket::role_type,
666 stream& s,
667 TeardownHandler&& handler)
668 {
669 error_code ec;
670 if( s.in_->fc &&
671 s.in_->fc->fail(ec))
672 return boost::asio::post(
673 s.get_executor(),
674 bind_handler(std::move(handler), ec));
675 s.close();
676 if( s.in_->fc &&
677 s.in_->fc->fail(ec))
678 ec = boost::asio::error::eof;
679 else
680 ec.assign(0, ec.category());
681
682 boost::asio::post(
683 s.get_executor(),
684 bind_handler(std::move(handler), ec));
685 }
686
687 //------------------------------------------------------------------------------
688
689 template<class Handler, class Buffers>
690 class stream::read_op_impl : public stream::read_op
691 {
692 class lambda
693 {
694 state& s_;
695 Buffers b_;
696 Handler h_;
697 boost::optional<
698 boost::asio::executor_work_guard<
699 boost::asio::io_context::executor_type>> work_;
700
701 public:
702 lambda(lambda&&) = default;
703 lambda(lambda const&) = default;
704
705 lambda(state& s, Buffers const& b, Handler&& h)
706 : s_(s)
707 , b_(b)
708 , h_(std::move(h))
709 , work_(s_.ioc.get_executor())
710 {
711 }
712
713 lambda(state& s, Buffers const& b, Handler const& h)
714 : s_(s)
715 , b_(b)
716 , h_(h)
717 , work_(s_.ioc.get_executor())
718 {
719 }
720
721 void
722 post()
723 {
724 boost::asio::post(
725 s_.ioc.get_executor(),
726 std::move(*this));
727 work_ = boost::none;
728 }
729
730 void
731 operator()()
732 {
733 using boost::asio::buffer_copy;
734 using boost::asio::buffer_size;
735 std::unique_lock<std::mutex> lock{s_.m};
736 BOOST_ASSERT(! s_.op);
737 if(s_.b.size() > 0)
738 {
739 auto const bytes_transferred = buffer_copy(
740 b_, s_.b.data(), s_.read_max);
741 s_.b.consume(bytes_transferred);
742 auto& s = s_;
743 Handler h{std::move(h_)};
744 lock.unlock();
745 ++s.nread;
746 boost::asio::post(
747 s.ioc.get_executor(),
748 bind_handler(
749 std::move(h),
750 error_code{},
751 bytes_transferred));
752 }
753 else
754 {
755 BOOST_ASSERT(s_.code != status::ok);
756 auto& s = s_;
757 Handler h{std::move(h_)};
758 lock.unlock();
759 ++s.nread;
760 error_code ec;
761 if(s.code == status::eof)
762 ec = boost::asio::error::eof;
763 else if(s.code == status::reset)
764 ec = boost::asio::error::connection_reset;
765 boost::asio::post(
766 s.ioc.get_executor(),
767 bind_handler(std::move(h), ec, 0));
768 }
769 }
770 };
771
772 lambda fn_;
773
774 public:
775 read_op_impl(state& s, Buffers const& b, Handler&& h)
776 : fn_(s, b, std::move(h))
777 {
778 }
779
780 read_op_impl(state& s, Buffers const& b, Handler const& h)
781 : fn_(s, b, h)
782 {
783 }
784
785 void
786 operator()() override
787 {
788 fn_.post();
789 }
790 };
791
792 /// Create and return a connected stream
793 inline
794 stream
795 connect(stream& to)
796 {
797 stream from{to.get_executor().context()};
798 from.connect(to);
799 return from;
800 }
801
802 /// Create and return a connected stream
803 template<class Arg1, class... ArgN>
804 stream
805 connect(stream& to, Arg1&& arg1, ArgN&&... argn)
806 {
807 stream from{
808 std::forward<Arg1>(arg1),
809 std::forward<ArgN>(argn)...};
810 from.connect(to);
811 return from;
812 }
813
814 } // test
815 } // beast
816 } // boost
817
818 #endif