]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/fiber/include/boost/fiber/bounded_channel.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / fiber / include / boost / fiber / bounded_channel.hpp
CommitLineData
7c673cae
FG
1
2// Copyright Oliver Kowalke 2013.
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H
9#define BOOST_FIBERS_BOUNDED_CHANNEL_H
10
11#include <algorithm>
12#include <atomic>
13#include <chrono>
14#include <cstddef>
15#include <memory>
16#include <mutex>
17#include <system_error>
18#include <utility>
19
20#include <boost/config.hpp>
21#include <boost/intrusive_ptr.hpp>
22
23#include <boost/fiber/detail/config.hpp>
24#include <boost/fiber/exceptions.hpp>
25#include <boost/fiber/exceptions.hpp>
26#include <boost/fiber/condition_variable.hpp>
27#include <boost/fiber/mutex.hpp>
28#include <boost/fiber/channel_op_status.hpp>
29
30#ifdef BOOST_HAS_ABI_HEADERS
31# include BOOST_ABI_PREFIX
32#endif
33
34namespace boost {
35namespace fibers {
36
37template< typename T,
38 typename Allocator = std::allocator< T >
39>
40class bounded_channel {
41public:
42 typedef T value_type;
43
44private:
45 struct node {
46 typedef intrusive_ptr< node > ptr_t;
47 typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
48 node
49 > allocator_t;
50 typedef std::allocator_traits< allocator_t > allocator_traits_t;
51
52#if ! defined(BOOST_FIBERS_NO_ATOMICS)
53 std::atomic< std::size_t > use_count{ 0 };
54#else
55 std::size_t use_count{ 0 };
56#endif
57 allocator_t alloc;
58 T va;
59 ptr_t nxt{};
60
61 node( T const& t, allocator_t const& alloc_) noexcept :
62 alloc{ alloc_ },
63 va{ t } {
64 }
65
66 node( T && t, allocator_t & alloc_) noexcept :
67 alloc{ alloc_ },
68 va{ std::move( t) } {
69 }
70
71 friend
72 void intrusive_ptr_add_ref( node * p) noexcept {
73 ++p->use_count;
74 }
75
76 friend
77 void intrusive_ptr_release( node * p) noexcept {
78 if ( 0 == --p->use_count) {
79 allocator_t alloc( p->alloc);
80 allocator_traits_t::destroy( alloc, p);
81 allocator_traits_t::deallocate( alloc, p, 1);
82 }
83 }
84 };
85
86 using ptr_t = typename node::ptr_t;
87 using allocator_t = typename node::allocator_t;
88 using allocator_traits_t = typename node::allocator_traits_t;
89
90 enum class queue_status {
91 open = 0,
92 closed
93 };
94
95 allocator_t alloc_;
96 queue_status state_{ queue_status::open };
97 std::size_t count_{ 0 };
98 ptr_t head_{};
99 ptr_t * tail_;
100 mutable mutex mtx_{};
101 condition_variable not_empty_cond_{};
102 condition_variable not_full_cond_{};
103 std::size_t hwm_;
104 std::size_t lwm_;
105
106 bool is_closed_() const noexcept {
107 return queue_status::closed == state_;
108 }
109
110 void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept {
111 state_ = queue_status::closed;
112 lk.unlock();
113 not_empty_cond_.notify_all();
114 not_full_cond_.notify_all();
115 }
116
117 std::size_t size_() const noexcept {
118 return count_;
119 }
120
121 bool is_empty_() const noexcept {
122 return ! head_;
123 }
124
125 bool is_full_() const noexcept {
126 return count_ >= hwm_;
127 }
128
129 channel_op_status push_( ptr_t new_node,
130 std::unique_lock< boost::fibers::mutex > & lk) {
131 if ( is_closed_() ) {
132 return channel_op_status::closed;
133 }
134 not_full_cond_.wait( lk,
135 [this](){
136 return ! is_full_();
137 });
138 return push_and_notify_( new_node, lk);
139 }
140
141 channel_op_status try_push_( ptr_t new_node,
142 std::unique_lock< boost::fibers::mutex > & lk) noexcept {
143 if ( is_closed_() ) {
144 return channel_op_status::closed;
145 }
146 if ( is_full_() ) {
147 return channel_op_status::full;
148 }
149 return push_and_notify_( new_node, lk);
150 }
151
152 template< typename Clock, typename Duration >
153 channel_op_status push_wait_until_( ptr_t new_node,
154 std::chrono::time_point< Clock, Duration > const& timeout_time,
155 std::unique_lock< boost::fibers::mutex > & lk) {
156 if ( is_closed_() ) {
157 return channel_op_status::closed;
158 }
159 if ( ! not_full_cond_.wait_until( lk, timeout_time,
160 [this](){
161 return ! is_full_();
162 })) {
163 return channel_op_status::timeout;
164 }
165 return push_and_notify_( new_node, lk);
166 }
167
168 channel_op_status push_and_notify_( ptr_t new_node,
169 std::unique_lock< boost::fibers::mutex > & lk) noexcept {
170 push_tail_( new_node);
171 lk.unlock();
172 not_empty_cond_.notify_one();
173 return channel_op_status::success;
174 }
175
176 void push_tail_( ptr_t new_node) noexcept {
177 * tail_ = new_node;
178 tail_ = & new_node->nxt;
179 ++count_;
180 }
181
182 value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) {
183 BOOST_ASSERT( ! is_empty_() );
184 auto old_head = pop_head_();
185 if ( size_() <= lwm_) {
186 if ( lwm_ == hwm_) {
187 lk.unlock();
188 not_full_cond_.notify_one();
189 } else {
190 lk.unlock();
191 // more than one producer could be waiting
192 // to push a value
193 not_full_cond_.notify_all();
194 }
195 }
196 return std::move( old_head->va);
197 }
198
199 ptr_t pop_head_() noexcept {
200 auto old_head = head_;
201 head_ = old_head->nxt;
202 if ( ! head_) {
203 tail_ = & head_;
204 }
205 old_head->nxt.reset();
206 --count_;
207 return old_head;
208 }
209
210public:
211 bounded_channel( std::size_t hwm, std::size_t lwm,
212 Allocator const& alloc = Allocator() ) :
213 alloc_{ alloc },
214 tail_{ & head_ },
215 hwm_{ hwm },
216 lwm_{ lwm } {
217 if ( hwm_ <= lwm_) {
218 throw fiber_error( std::make_error_code( std::errc::invalid_argument),
219 "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel");
220 }
221 if ( 0 == hwm) {
222 throw fiber_error( std::make_error_code( std::errc::invalid_argument),
223 "boost fiber: high-watermark is zero");
224 }
225 }
226
227 bounded_channel( std::size_t wm,
228 Allocator const& alloc = Allocator() ) :
229 alloc_{ alloc },
230 tail_{ & head_ },
231 hwm_{ wm },
232 lwm_{ wm - 1 } {
233 if ( 0 == wm) {
234 throw fiber_error( std::make_error_code( std::errc::invalid_argument),
235 "boost fiber: watermark is zero");
236 }
237 }
238
239 bounded_channel( bounded_channel const&) = delete;
240 bounded_channel & operator=( bounded_channel const&) = delete;
241
242 std::size_t upper_bound() const noexcept {
243 return hwm_;
244 }
245
246 std::size_t lower_bound() const noexcept {
247 return lwm_;
248 }
249
250 void close() noexcept {
251 std::unique_lock< mutex > lk( mtx_);
252 close_( lk);
253 }
254
255 channel_op_status push( value_type const& va) {
256 typename allocator_traits_t::pointer ptr{
257 allocator_traits_t::allocate( alloc_, 1) };
258 try {
259 allocator_traits_t::construct( alloc_, ptr, va, alloc_);
260 } catch (...) {
261 allocator_traits_t::deallocate( alloc_, ptr, 1);
262 throw;
263 }
264 std::unique_lock< mutex > lk( mtx_);
265 return push_( { detail::convert( ptr) }, lk);
266 }
267
268 channel_op_status push( value_type && va) {
269 typename allocator_traits_t::pointer ptr{
270 allocator_traits_t::allocate( alloc_, 1) };
271 try {
272 allocator_traits_t::construct(
273 alloc_, ptr, std::move( va), alloc_);
274 } catch (...) {
275 allocator_traits_t::deallocate( alloc_, ptr, 1);
276 throw;
277 }
278 std::unique_lock< mutex > lk( mtx_);
279 return push_( { detail::convert( ptr) }, lk);
280 }
281
282 template< typename Rep, typename Period >
283 channel_op_status push_wait_for( value_type const& va,
284 std::chrono::duration< Rep, Period > const& timeout_duration) {
285 return push_wait_until( va,
286 std::chrono::steady_clock::now() + timeout_duration);
287 }
288
289 template< typename Rep, typename Period >
290 channel_op_status push_wait_for( value_type && va,
291 std::chrono::duration< Rep, Period > const& timeout_duration) {
292 return push_wait_until( std::forward< value_type >( va),
293 std::chrono::steady_clock::now() + timeout_duration);
294 }
295
296 template< typename Clock, typename Duration >
297 channel_op_status push_wait_until( value_type const& va,
298 std::chrono::time_point< Clock, Duration > const& timeout_time) {
299 typename allocator_traits_t::pointer ptr{
300 allocator_traits_t::allocate( alloc_, 1) };
301 try {
302 allocator_traits_t::construct( alloc_, ptr, va, alloc_);
303 } catch (...) {
304 allocator_traits_t::deallocate( alloc_, ptr, 1);
305 throw;
306 }
307 std::unique_lock< mutex > lk( mtx_);
308 return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
309 }
310
311 template< typename Clock, typename Duration >
312 channel_op_status push_wait_until( value_type && va,
313 std::chrono::time_point< Clock, Duration > const& timeout_time) {
314 typename allocator_traits_t::pointer ptr{
315 allocator_traits_t::allocate( alloc_, 1) };
316 try {
317 allocator_traits_t::construct(
318 alloc_, ptr, std::move( va), alloc_);
319 } catch (...) {
320 allocator_traits_t::deallocate( alloc_, ptr, 1);
321 throw;
322 }
323 std::unique_lock< mutex > lk( mtx_);
324 return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
325 }
326
327 channel_op_status try_push( value_type const& va) {
328 typename allocator_traits_t::pointer ptr{
329 allocator_traits_t::allocate( alloc_, 1) };
330 try {
331 allocator_traits_t::construct( alloc_, ptr, va, alloc_);
332 } catch (...) {
333 allocator_traits_t::deallocate( alloc_, ptr, 1);
334 throw;
335 }
336 std::unique_lock< mutex > lk( mtx_);
337 return try_push_( { detail::convert( ptr) }, lk);
338 }
339
340 channel_op_status try_push( value_type && va) {
341 typename allocator_traits_t::pointer ptr{
342 allocator_traits_t::allocate( alloc_, 1) };
343 try {
344 allocator_traits_t::construct(
345 alloc_, ptr, std::move( va), alloc_);
346 } catch (...) {
347 allocator_traits_t::deallocate( alloc_, ptr, 1);
348 throw;
349 }
350 std::unique_lock< mutex > lk( mtx_);
351 return try_push_( { detail::convert( ptr) }, lk);
352 }
353
354 channel_op_status pop( value_type & va) {
355 std::unique_lock< mutex > lk( mtx_);
356 not_empty_cond_.wait( lk,
357 [this](){
358 return is_closed_() || ! is_empty_();
359 });
360 if ( is_closed_() && is_empty_() ) {
361 return channel_op_status::closed;
362 }
363 va = value_pop_( lk);
364 return channel_op_status::success;
365 }
366
367 value_type value_pop() {
368 std::unique_lock< mutex > lk( mtx_);
369 not_empty_cond_.wait( lk,
370 [this](){
371 return is_closed_() || ! is_empty_();
372 });
373 if ( is_closed_() && is_empty_() ) {
374 throw fiber_error(
375 std::make_error_code( std::errc::operation_not_permitted),
376 "boost fiber: queue is closed");
377 }
378 return value_pop_( lk);
379 }
380
381 channel_op_status try_pop( value_type & va) {
382 std::unique_lock< mutex > lk( mtx_);
383 if ( is_closed_() && is_empty_() ) {
384 // let other fibers run
385 lk.unlock();
386 this_fiber::yield();
387 return channel_op_status::closed;
388 }
389 if ( is_empty_() ) {
390 // let other fibers run
391 lk.unlock();
392 this_fiber::yield();
393 return channel_op_status::empty;
394 }
395 va = value_pop_( lk);
396 return channel_op_status::success;
397 }
398
399 template< typename Rep, typename Period >
400 channel_op_status pop_wait_for( value_type & va,
401 std::chrono::duration< Rep, Period > const& timeout_duration) {
402 return pop_wait_until( va,
403 std::chrono::steady_clock::now() + timeout_duration);
404 }
405
406 template< typename Clock, typename Duration >
407 channel_op_status pop_wait_until( value_type & va,
408 std::chrono::time_point< Clock, Duration > const& timeout_time) {
409 std::unique_lock< mutex > lk( mtx_);
410 if ( ! not_empty_cond_.wait_until( lk,
411 timeout_time,
412 [this](){
413 return is_closed_() || ! is_empty_();
414 })) {
415 return channel_op_status::timeout;
416 }
417 if ( is_closed_() && is_empty_() ) {
418 return channel_op_status::closed;
419 }
420 va = value_pop_( lk);
421 return channel_op_status::success;
422 }
423};
424
425}}
426
427#ifdef BOOST_HAS_ABI_HEADERS
428# include BOOST_ABI_SUFFIX
429#endif
430
431#endif // BOOST_FIBERS_BOUNDED_CHANNEL_H