]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/fiber/buffered_channel.hpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / boost / fiber / buffered_channel.hpp
1
2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
10
11 #include <atomic>
12 #include <chrono>
13 #include <cstddef>
14 #include <cstdint>
15 #include <memory>
16 #include <type_traits>
17
18 #include <boost/config.hpp>
19
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/waker.hpp>
23 #include <boost/fiber/detail/config.hpp>
24 #include <boost/fiber/detail/convert.hpp>
25 #include <boost/fiber/detail/spinlock.hpp>
26 #include <boost/fiber/exceptions.hpp>
27
28 #ifdef BOOST_HAS_ABI_HEADERS
29 # include BOOST_ABI_PREFIX
30 #endif
31
32 namespace boost {
33 namespace fibers {
34
35 template< typename T >
36 class buffered_channel {
37 public:
38 using value_type = typename std::remove_reference<T>::type;
39
40 private:
41 using slot_type = value_type;
42
43 mutable detail::spinlock splk_{};
44 wait_queue waiting_producers_{};
45 wait_queue waiting_consumers_{};
46 slot_type * slots_;
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
51
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
54 }
55
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
58 }
59
60 bool is_closed_() const noexcept {
61 return closed_;
62 }
63
64 public:
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
70 }
71 slots_ = new slot_type[capacity_];
72 }
73
74 ~buffered_channel() {
75 close();
76 delete [] slots_;
77 }
78
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
81
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
84 return is_closed_();
85 }
86
87 void close() noexcept {
88 detail::spinlock_lock lk{ splk_ };
89 if ( ! closed_) {
90 closed_ = true;
91 waiting_producers_.notify_all();
92 waiting_consumers_.notify_all();
93 }
94 }
95
96 channel_op_status try_push( value_type const& value) {
97 detail::spinlock_lock lk{ splk_ };
98 if ( BOOST_UNLIKELY( is_closed_() ) ) {
99 return channel_op_status::closed;
100 }
101 if ( is_full_() ) {
102 return channel_op_status::full;
103 }
104 slots_[pidx_] = value;
105 pidx_ = (pidx_ + 1) % capacity_;
106 waiting_consumers_.notify_one();
107 return channel_op_status::success;
108 }
109
110 channel_op_status try_push( value_type && value) {
111 detail::spinlock_lock lk{ splk_ };
112 if ( BOOST_UNLIKELY( is_closed_() ) ) {
113 return channel_op_status::closed;
114 }
115 if ( is_full_() ) {
116 return channel_op_status::full;
117 }
118 slots_[pidx_] = std::move( value);
119 pidx_ = (pidx_ + 1) % capacity_;
120 waiting_consumers_.notify_one();
121 return channel_op_status::success;
122 }
123
124 channel_op_status push( value_type const& value) {
125 context * active_ctx = context::active();
126 for (;;) {
127 detail::spinlock_lock lk{ splk_ };
128 if ( BOOST_UNLIKELY( is_closed_() ) ) {
129 return channel_op_status::closed;
130 }
131 if ( is_full_() ) {
132 waiting_producers_.suspend_and_wait( lk, active_ctx);
133 } else {
134 slots_[pidx_] = value;
135 pidx_ = (pidx_ + 1) % capacity_;
136 waiting_consumers_.notify_one();
137 return channel_op_status::success;
138 }
139 }
140 }
141
142 channel_op_status push( value_type && value) {
143 context * active_ctx = context::active();
144 for (;;) {
145 detail::spinlock_lock lk{ splk_ };
146 if ( BOOST_UNLIKELY( is_closed_() ) ) {
147 return channel_op_status::closed;
148 }
149 if ( is_full_() ) {
150 waiting_producers_.suspend_and_wait( lk, active_ctx);
151 } else {
152 slots_[pidx_] = std::move( value);
153 pidx_ = (pidx_ + 1) % capacity_;
154
155 waiting_consumers_.notify_one();
156 return channel_op_status::success;
157 }
158 }
159 }
160
161 template< typename Rep, typename Period >
162 channel_op_status push_wait_for( value_type const& value,
163 std::chrono::duration< Rep, Period > const& timeout_duration) {
164 return push_wait_until( value,
165 std::chrono::steady_clock::now() + timeout_duration);
166 }
167
168 template< typename Rep, typename Period >
169 channel_op_status push_wait_for( value_type && value,
170 std::chrono::duration< Rep, Period > const& timeout_duration) {
171 return push_wait_until( std::forward< value_type >( value),
172 std::chrono::steady_clock::now() + timeout_duration);
173 }
174
175 template< typename Clock, typename Duration >
176 channel_op_status push_wait_until( value_type const& value,
177 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
178 context * active_ctx = context::active();
179 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
180 for (;;) {
181 detail::spinlock_lock lk{ splk_ };
182 if ( BOOST_UNLIKELY( is_closed_() ) ) {
183 return channel_op_status::closed;
184 }
185 if ( is_full_() ) {
186 if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
187 return channel_op_status::timeout;
188 }
189 } else {
190 slots_[pidx_] = value;
191 pidx_ = (pidx_ + 1) % capacity_;
192 waiting_consumers_.notify_one();
193 return channel_op_status::success;
194 }
195 }
196 }
197
198 template< typename Clock, typename Duration >
199 channel_op_status push_wait_until( value_type && value,
200 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
201 context * active_ctx = context::active();
202 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
203 for (;;) {
204 detail::spinlock_lock lk{ splk_ };
205 if ( BOOST_UNLIKELY( is_closed_() ) ) {
206 return channel_op_status::closed;
207 }
208 if ( is_full_() ) {
209 if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
210 return channel_op_status::timeout;
211 }
212 } else {
213 slots_[pidx_] = std::move( value);
214 pidx_ = (pidx_ + 1) % capacity_;
215 // notify one waiting consumer
216 waiting_consumers_.notify_one();
217 return channel_op_status::success;
218 }
219 }
220 }
221
222 channel_op_status try_pop( value_type & value) {
223 detail::spinlock_lock lk{ splk_ };
224 if ( is_empty_() ) {
225 return is_closed_()
226 ? channel_op_status::closed
227 : channel_op_status::empty;
228 }
229 value = std::move( slots_[cidx_]);
230 cidx_ = (cidx_ + 1) % capacity_;
231 waiting_producers_.notify_one();
232 return channel_op_status::success;
233 }
234
235 channel_op_status pop( value_type & value) {
236 context * active_ctx = context::active();
237 for (;;) {
238 detail::spinlock_lock lk{ splk_ };
239 if ( is_empty_() ) {
240 if ( BOOST_UNLIKELY( is_closed_() ) ) {
241 return channel_op_status::closed;
242 }
243 waiting_consumers_.suspend_and_wait( lk, active_ctx);
244 } else {
245 value = std::move( slots_[cidx_]);
246 cidx_ = (cidx_ + 1) % capacity_;
247 waiting_producers_.notify_one();
248 return channel_op_status::success;
249 }
250 }
251 }
252
253 value_type value_pop() {
254 context * active_ctx = context::active();
255 for (;;) {
256 detail::spinlock_lock lk{ splk_ };
257 if ( is_empty_() ) {
258 if ( BOOST_UNLIKELY( is_closed_() ) ) {
259 throw fiber_error{
260 std::make_error_code( std::errc::operation_not_permitted),
261 "boost fiber: channel is closed" };
262 }
263 waiting_consumers_.suspend_and_wait( lk, active_ctx);
264 } else {
265 value_type value = std::move( slots_[cidx_]);
266 cidx_ = (cidx_ + 1) % capacity_;
267 waiting_producers_.notify_one();
268 return value;
269 }
270 }
271 }
272
273 template< typename Rep, typename Period >
274 channel_op_status pop_wait_for( value_type & value,
275 std::chrono::duration< Rep, Period > const& timeout_duration) {
276 return pop_wait_until( value,
277 std::chrono::steady_clock::now() + timeout_duration);
278 }
279
280 template< typename Clock, typename Duration >
281 channel_op_status pop_wait_until( value_type & value,
282 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
283 context * active_ctx = context::active();
284 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
285 for (;;) {
286 detail::spinlock_lock lk{ splk_ };
287 if ( is_empty_() ) {
288 if ( BOOST_UNLIKELY( is_closed_() ) ) {
289 return channel_op_status::closed;
290 }
291 if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
292 return channel_op_status::timeout;
293 }
294 } else {
295 value = std::move( slots_[cidx_]);
296 cidx_ = (cidx_ + 1) % capacity_;
297 waiting_producers_.notify_one();
298 return channel_op_status::success;
299 }
300 }
301 }
302
303 class iterator {
304 private:
305 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
306
307 buffered_channel * chan_{ nullptr };
308 storage_type storage_;
309
310 void increment_( bool initial = false) {
311 BOOST_ASSERT( nullptr != chan_);
312 try {
313 if ( ! initial) {
314 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
315 }
316 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
317 } catch ( fiber_error const&) {
318 chan_ = nullptr;
319 }
320 }
321
322 public:
323 using iterator_category = std::input_iterator_tag;
324 using difference_type = std::ptrdiff_t;
325 using pointer = value_type *;
326 using reference = value_type &;
327
328 using pointer_t = pointer;
329 using reference_t = reference;
330
331 iterator() noexcept = default;
332
333 explicit iterator( buffered_channel< T > * chan) noexcept :
334 chan_{ chan } {
335 increment_( true);
336 }
337
338 iterator( iterator const& other) noexcept :
339 chan_{ other.chan_ } {
340 }
341
342 iterator & operator=( iterator const& other) noexcept {
343 if ( BOOST_LIKELY( this != & other) ) {
344 chan_ = other.chan_;
345 }
346 return * this;
347 }
348
349 bool operator==( iterator const& other) const noexcept {
350 return other.chan_ == chan_;
351 }
352
353 bool operator!=( iterator const& other) const noexcept {
354 return other.chan_ != chan_;
355 }
356
357 iterator & operator++() {
358 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
359 increment_();
360 return * this;
361 }
362
363 const iterator operator++( int) = delete;
364
365 reference_t operator*() noexcept {
366 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
367 }
368
369 pointer_t operator->() noexcept {
370 return reinterpret_cast< value_type * >( std::addressof( storage_) );
371 }
372 };
373
374 friend class iterator;
375 };
376
377 template< typename T >
378 typename buffered_channel< T >::iterator
379 begin( buffered_channel< T > & chan) {
380 return typename buffered_channel< T >::iterator( & chan);
381 }
382
383 template< typename T >
384 typename buffered_channel< T >::iterator
385 end( buffered_channel< T > &) {
386 return typename buffered_channel< T >::iterator();
387 }
388
389 }}
390
391 #ifdef BOOST_HAS_ABI_HEADERS
392 # include BOOST_ABI_SUFFIX
393 #endif
394
395 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H