]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/asio/experimental/detail/channel_service.hpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / boost / asio / experimental / detail / channel_service.hpp
CommitLineData
1e59de90
TL
1//
2// experimental/detail/channel_service.hpp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
12#define BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19#include <boost/asio/associated_cancellation_slot.hpp>
20#include <boost/asio/cancellation_type.hpp>
21#include <boost/asio/detail/mutex.hpp>
22#include <boost/asio/detail/op_queue.hpp>
23#include <boost/asio/execution_context.hpp>
24#include <boost/asio/experimental/detail/channel_message.hpp>
25#include <boost/asio/experimental/detail/channel_receive_op.hpp>
26#include <boost/asio/experimental/detail/channel_send_op.hpp>
27#include <boost/asio/experimental/detail/has_signature.hpp>
28
29#include <boost/asio/detail/push_options.hpp>
30
31namespace boost {
32namespace asio {
33namespace experimental {
34namespace detail {
35
36template <typename Mutex>
37class channel_service
38 : public boost::asio::detail::execution_context_service_base<
39 channel_service<Mutex> >
40{
41public:
42 // Possible states for a channel end.
43 enum state
44 {
45 buffer = 0,
46 waiter = 1,
47 block = 2,
48 closed = 3
49 };
50
51 // The base implementation type of all channels.
52 struct base_implementation_type
53 {
54 // Default constructor.
55 base_implementation_type()
56 : receive_state_(block),
57 send_state_(block),
58 max_buffer_size_(0),
59 next_(0),
60 prev_(0)
61 {
62 }
63
64 // The current state of the channel.
65 state receive_state_ : 16;
66 state send_state_ : 16;
67
68 // The maximum number of elements that may be buffered in the channel.
69 std::size_t max_buffer_size_;
70
71 // The operations that are waiting on the channel.
72 boost::asio::detail::op_queue<channel_operation> waiters_;
73
74 // Pointers to adjacent channel implementations in linked list.
75 base_implementation_type* next_;
76 base_implementation_type* prev_;
77
78 // The mutex type to protect the internal implementation.
79 mutable Mutex mutex_;
80 };
81
82 // The implementation for a specific value type.
83 template <typename Traits, typename... Signatures>
84 struct implementation_type;
85
86 // Constructor.
87 channel_service(execution_context& ctx);
88
89 // Destroy all user-defined handler objects owned by the service.
90 void shutdown();
91
92 // Construct a new channel implementation.
93 void construct(base_implementation_type& impl, std::size_t max_buffer_size);
94
95 // Destroy a channel implementation.
96 template <typename Traits, typename... Signatures>
97 void destroy(implementation_type<Traits, Signatures...>& impl);
98
99 // Move-construct a new channel implementation.
100 template <typename Traits, typename... Signatures>
101 void move_construct(implementation_type<Traits, Signatures...>& impl,
102 implementation_type<Traits, Signatures...>& other_impl);
103
104 // Move-assign from another channel implementation.
105 template <typename Traits, typename... Signatures>
106 void move_assign(implementation_type<Traits, Signatures...>& impl,
107 channel_service& other_service,
108 implementation_type<Traits, Signatures...>& other_impl);
109
110 // Get the capacity of the channel.
111 std::size_t capacity(
112 const base_implementation_type& impl) const BOOST_ASIO_NOEXCEPT;
113
114 // Determine whether the channel is open.
115 bool is_open(const base_implementation_type& impl) const BOOST_ASIO_NOEXCEPT;
116
117 // Reset the channel to its initial state.
118 template <typename Traits, typename... Signatures>
119 void reset(implementation_type<Traits, Signatures...>& impl);
120
121 // Close the channel.
122 template <typename Traits, typename... Signatures>
123 void close(implementation_type<Traits, Signatures...>& impl);
124
125 // Cancel all operations associated with the channel.
126 template <typename Traits, typename... Signatures>
127 void cancel(implementation_type<Traits, Signatures...>& impl);
128
129 // Cancel the operation associated with the channel that has the given key.
130 template <typename Traits, typename... Signatures>
131 void cancel_by_key(implementation_type<Traits, Signatures...>& impl,
132 void* cancellation_key);
133
134 // Determine whether a value can be read from the channel without blocking.
135 bool ready(const base_implementation_type& impl) const BOOST_ASIO_NOEXCEPT;
136
137 // Synchronously send a new value into the channel.
138 template <typename Message, typename Traits,
139 typename... Signatures, typename... Args>
140 bool try_send(implementation_type<Traits, Signatures...>& impl,
141 BOOST_ASIO_MOVE_ARG(Args)... args);
142
143 // Synchronously send a number of new values into the channel.
144 template <typename Message, typename Traits,
145 typename... Signatures, typename... Args>
146 std::size_t try_send_n(implementation_type<Traits, Signatures...>& impl,
147 std::size_t count, BOOST_ASIO_MOVE_ARG(Args)... args);
148
149 // Asynchronously send a new value into the channel.
150 template <typename Traits, typename... Signatures,
151 typename Handler, typename IoExecutor>
152 void async_send(implementation_type<Traits, Signatures...>& impl,
153 BOOST_ASIO_MOVE_ARG2(typename implementation_type<
154 Traits, Signatures...>::payload_type) payload,
155 Handler& handler, const IoExecutor& io_ex)
156 {
157 typename associated_cancellation_slot<Handler>::type slot
158 = boost::asio::get_associated_cancellation_slot(handler);
159
160 // Allocate and construct an operation to wrap the handler.
161 typedef channel_send_op<
162 typename implementation_type<Traits, Signatures...>::payload_type,
163 Handler, IoExecutor> op;
164 typename op::ptr p = { boost::asio::detail::addressof(handler),
165 op::ptr::allocate(handler), 0 };
166 p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST2(typename implementation_type<
167 Traits, Signatures...>::payload_type)(payload), handler, io_ex);
168
169 // Optionally register for per-operation cancellation.
170 if (slot.is_connected())
171 {
172 p.p->cancellation_key_ =
173 &slot.template emplace<op_cancellation<Traits, Signatures...> >(
174 this, &impl);
175 }
176
177 BOOST_ASIO_HANDLER_CREATION((this->context(), *p.p,
178 "channel", &impl, 0, "async_send"));
179
180 start_send_op(impl, p.p);
181 p.v = p.p = 0;
182 }
183
184 // Synchronously receive a value from the channel.
185 template <typename Traits, typename... Signatures, typename Handler>
186 bool try_receive(implementation_type<Traits, Signatures...>& impl,
187 BOOST_ASIO_MOVE_ARG(Handler) handler);
188
189 // Asynchronously send a new value into the channel.
190 // Asynchronously receive a value from the channel.
191 template <typename Traits, typename... Signatures,
192 typename Handler, typename IoExecutor>
193 void async_receive(implementation_type<Traits, Signatures...>& impl,
194 Handler& handler, const IoExecutor& io_ex)
195 {
196 typename associated_cancellation_slot<Handler>::type slot
197 = boost::asio::get_associated_cancellation_slot(handler);
198
199 // Allocate and construct an operation to wrap the handler.
200 typedef channel_receive_op<
201 typename implementation_type<Traits, Signatures...>::payload_type,
202 Handler, IoExecutor> op;
203 typename op::ptr p = { boost::asio::detail::addressof(handler),
204 op::ptr::allocate(handler), 0 };
205 p.p = new (p.v) op(handler, io_ex);
206
207 // Optionally register for per-operation cancellation.
208 if (slot.is_connected())
209 {
210 p.p->cancellation_key_ =
211 &slot.template emplace<op_cancellation<Traits, Signatures...> >(
212 this, &impl);
213 }
214
215 BOOST_ASIO_HANDLER_CREATION((this->context(), *p.p,
216 "channel", &impl, 0, "async_receive"));
217
218 start_receive_op(impl, p.p);
219 p.v = p.p = 0;
220 }
221
222private:
223 // Helper function object to handle a closed notification.
224 template <typename Payload, typename Signature>
225 struct complete_receive
226 {
227 explicit complete_receive(channel_receive<Payload>* op)
228 : op_(op)
229 {
230 }
231
232 template <typename... Args>
233 void operator()(BOOST_ASIO_MOVE_ARG(Args)... args)
234 {
235 op_->complete(
236 channel_message<Signature>(0,
237 BOOST_ASIO_MOVE_CAST(Args)(args)...));
238 }
239
240 channel_receive<Payload>* op_;
241 };
242
243 // Destroy a base channel implementation.
244 void base_destroy(base_implementation_type& impl);
245
246 // Helper function to start an asynchronous put operation.
247 template <typename Traits, typename... Signatures>
248 void start_send_op(implementation_type<Traits, Signatures...>& impl,
249 channel_send<typename implementation_type<
250 Traits, Signatures...>::payload_type>* send_op);
251
252 // Helper function to start an asynchronous get operation.
253 template <typename Traits, typename... Signatures>
254 void start_receive_op(implementation_type<Traits, Signatures...>& impl,
255 channel_receive<typename implementation_type<
256 Traits, Signatures...>::payload_type>* receive_op);
257
258 // Helper class used to implement per-operation cancellation.
259 template <typename Traits, typename... Signatures>
260 class op_cancellation
261 {
262 public:
263 op_cancellation(channel_service* s,
264 implementation_type<Traits, Signatures...>* impl)
265 : service_(s),
266 impl_(impl)
267 {
268 }
269
270 void operator()(cancellation_type_t type)
271 {
272 if (!!(type &
273 (cancellation_type::terminal
274 | cancellation_type::partial
275 | cancellation_type::total)))
276 {
277 service_->cancel_by_key(*impl_, this);
278 }
279 }
280
281 private:
282 channel_service* service_;
283 implementation_type<Traits, Signatures...>* impl_;
284 };
285
286 // Mutex to protect access to the linked list of implementations.
287 boost::asio::detail::mutex mutex_;
288
289 // The head of a linked list of all implementations.
290 base_implementation_type* impl_list_;
291};
292
293// The implementation for a specific value type.
294template <typename Mutex>
295template <typename Traits, typename... Signatures>
296struct channel_service<Mutex>::implementation_type : base_implementation_type
297{
298 // The traits type associated with the channel.
299 typedef typename Traits::template rebind<Signatures...>::other traits_type;
300
301 // Type of an element stored in the buffer.
302 typedef typename conditional<
303 has_signature<
304 typename traits_type::receive_cancelled_signature,
305 Signatures...
306 >::value,
307 typename conditional<
308 has_signature<
309 typename traits_type::receive_closed_signature,
310 Signatures...
311 >::value,
312 channel_payload<Signatures...>,
313 channel_payload<
314 Signatures...,
315 typename traits_type::receive_closed_signature
316 >
317 >::type,
318 typename conditional<
319 has_signature<
320 typename traits_type::receive_closed_signature,
321 Signatures...,
322 typename traits_type::receive_cancelled_signature
323 >::value,
324 channel_payload<
325 Signatures...,
326 typename traits_type::receive_cancelled_signature
327 >,
328 channel_payload<
329 Signatures...,
330 typename traits_type::receive_cancelled_signature,
331 typename traits_type::receive_closed_signature
332 >
333 >::type
334 >::type payload_type;
335
336 // Move from another buffer.
337 void buffer_move_from(implementation_type& other)
338 {
339 buffer_ = BOOST_ASIO_MOVE_CAST(
340 typename traits_type::template container<
341 payload_type>::type)(other.buffer_);
342 other.buffer_clear();
343 }
344
345 // Get number of buffered elements.
346 std::size_t buffer_size() const
347 {
348 return buffer_.size();
349 }
350
351 // Push a new value to the back of the buffer.
352 void buffer_push(payload_type payload)
353 {
354 buffer_.push_back(BOOST_ASIO_MOVE_CAST(payload_type)(payload));
355 }
356
357 // Push new values to the back of the buffer.
358 std::size_t buffer_push_n(std::size_t count, payload_type payload)
359 {
360 std::size_t i = 0;
361 for (; i < count && buffer_.size() < this->max_buffer_size_; ++i)
362 buffer_.push_back(payload);
363 return i;
364 }
365
366 // Get the element at the front of the buffer.
367 payload_type buffer_front()
368 {
369 return BOOST_ASIO_MOVE_CAST(payload_type)(buffer_.front());
370 }
371
372 // Pop a value from the front of the buffer.
373 void buffer_pop()
374 {
375 buffer_.pop_front();
376 }
377
378 // Clear all buffered values.
379 void buffer_clear()
380 {
381 buffer_.clear();
382 }
383
384private:
385 // Buffered values.
386 typename traits_type::template container<payload_type>::type buffer_;
387};
388
389// The implementation for a void value type.
390template <typename Mutex>
391template <typename Traits, typename R>
392struct channel_service<Mutex>::implementation_type<Traits, R()>
393 : channel_service::base_implementation_type
394{
395 // The traits type associated with the channel.
396 typedef typename Traits::template rebind<R()>::other traits_type;
397
398 // Type of an element stored in the buffer.
399 typedef typename conditional<
400 has_signature<
401 typename traits_type::receive_cancelled_signature,
402 R()
403 >::value,
404 typename conditional<
405 has_signature<
406 typename traits_type::receive_closed_signature,
407 R()
408 >::value,
409 channel_payload<R()>,
410 channel_payload<
411 R(),
412 typename traits_type::receive_closed_signature
413 >
414 >::type,
415 typename conditional<
416 has_signature<
417 typename traits_type::receive_closed_signature,
418 R(),
419 typename traits_type::receive_cancelled_signature
420 >::value,
421 channel_payload<
422 R(),
423 typename traits_type::receive_cancelled_signature
424 >,
425 channel_payload<
426 R(),
427 typename traits_type::receive_cancelled_signature,
428 typename traits_type::receive_closed_signature
429 >
430 >::type
431 >::type payload_type;
432
433 // Construct with empty buffer.
434 implementation_type()
435 : buffer_(0)
436 {
437 }
438
439 // Move from another buffer.
440 void buffer_move_from(implementation_type& other)
441 {
442 buffer_ = other.buffer_;
443 other.buffer_ = 0;
444 }
445
446 // Get number of buffered elements.
447 std::size_t buffer_size() const
448 {
449 return buffer_;
450 }
451
452 // Push a new value to the back of the buffer.
453 void buffer_push(payload_type)
454 {
455 ++buffer_;
456 }
457
458 // Push new values to the back of the buffer.
459 std::size_t buffer_push_n(std::size_t count, payload_type)
460 {
461 std::size_t available = this->max_buffer_size_ - buffer_;
462 count = (count < available) ? count : available;
463 buffer_ += count;
464 return count;
465 }
466
467 // Get the element at the front of the buffer.
468 payload_type buffer_front()
469 {
470 return payload_type(channel_message<R()>(0));
471 }
472
473 // Pop a value from the front of the buffer.
474 void buffer_pop()
475 {
476 --buffer_;
477 }
478
479 // Clear all values from the buffer.
480 void buffer_clear()
481 {
482 buffer_ = 0;
483 }
484
485private:
486 // Number of buffered "values".
487 std::size_t buffer_;
488};
489
490} // namespace detail
491} // namespace experimental
492} // namespace asio
493} // namespace boost
494
495#include <boost/asio/detail/pop_options.hpp>
496
497#include <boost/asio/experimental/detail/impl/channel_service.hpp>
498
499#endif // BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP