]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/fiber/include/boost/fiber/unbounded_channel.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / fiber / include / boost / fiber / unbounded_channel.hpp
CommitLineData
7c673cae
FG
1
2// Copyright Oliver Kowalke 2013.
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_FIBERS_UNBOUNDED_CHANNEL_H
9#define BOOST_FIBERS_UNBOUNDED_CHANNEL_H
10
11#include <atomic>
12#include <algorithm>
13#include <chrono>
14#include <cstddef>
15#include <deque>
16#include <memory>
17#include <mutex>
18#include <utility>
19
20#include <boost/config.hpp>
21#include <boost/intrusive_ptr.hpp>
22
23#include <boost/fiber/detail/config.hpp>
24#include <boost/fiber/channel_op_status.hpp>
25#include <boost/fiber/condition_variable.hpp>
26#include <boost/fiber/detail/convert.hpp>
27#include <boost/fiber/exceptions.hpp>
28#include <boost/fiber/mutex.hpp>
29#include <boost/fiber/operations.hpp>
30
31#ifdef BOOST_HAS_ABI_HEADERS
32# include BOOST_ABI_PREFIX
33#endif
34
35namespace boost {
36namespace fibers {
37
38template< typename T,
39 typename Allocator = std::allocator< T >
40>
41class unbounded_channel {
42public:
43 typedef T value_type;
44
45private:
46 struct node {
47 typedef intrusive_ptr< node > ptr_t;
48 typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
49 node
50 > allocator_t;
51 typedef std::allocator_traits< allocator_t > allocator_traits_t;
52
53#if ! defined(BOOST_FIBERS_NO_ATOMICS)
54 std::atomic< std::size_t > use_count{ 0 };
55#else
56 std::size_t use_count{ 0 };
57#endif
58 allocator_t alloc;
59 T va;
60 ptr_t nxt{};
61
62 node( T const& t, allocator_t const& alloc_) noexcept :
63 alloc{ alloc_ },
64 va{ t } {
65 }
66
67 node( T && t, allocator_t const& alloc_) noexcept :
68 alloc{ alloc_ },
69 va{ std::move( t) } {
70 }
71
72 friend
73 void intrusive_ptr_add_ref( node * p) noexcept {
74 ++p->use_count;
75 }
76
77 friend
78 void intrusive_ptr_release( node * p) noexcept {
79 if ( 0 == --p->use_count) {
80 allocator_t alloc( p->alloc);
81 allocator_traits_t::destroy( alloc, p);
82 allocator_traits_t::deallocate( alloc, p, 1);
83 }
84 }
85 };
86
87 using ptr_t = typename node::ptr_t;
88 using allocator_t = typename node::allocator_t;
89 using allocator_traits_t = typename node::allocator_traits_t;
90
91 enum class queue_status {
92 open = 0,
93 closed
94 };
95
96 allocator_t alloc_;
97 queue_status state_{ queue_status::open };
98 ptr_t head_{};
99 ptr_t * tail_;
100 mutable mutex mtx_{};
101 condition_variable not_empty_cond_{};
102
103 bool is_closed_() const noexcept {
104 return queue_status::closed == state_;
105 }
106
107 void close_( std::unique_lock< mutex > & lk) noexcept {
108 state_ = queue_status::closed;
109 lk.unlock();
110 not_empty_cond_.notify_all();
111 }
112
113 bool is_empty_() const noexcept {
114 return ! head_;
115 }
116
117 channel_op_status push_( ptr_t new_node,
118 std::unique_lock< mutex > & lk) noexcept {
119 if ( is_closed_() ) {
120 return channel_op_status::closed;
121 }
122 return push_and_notify_( new_node, lk);
123 }
124
125 channel_op_status push_and_notify_( ptr_t new_node,
126 std::unique_lock< mutex > & lk) noexcept {
127 push_tail_( new_node);
128 lk.unlock();
129 not_empty_cond_.notify_one();
130 return channel_op_status::success;
131 }
132
133 void push_tail_( ptr_t new_node) noexcept {
134 * tail_ = new_node;
135 tail_ = & new_node->nxt;
136 }
137
138 value_type value_pop_( std::unique_lock< mutex > & lk) {
139 BOOST_ASSERT( ! is_empty_() );
140 auto old_head = pop_head_();
141 return std::move( old_head->va);
142 }
143
144 ptr_t pop_head_() noexcept {
145 auto old_head = head_;
146 head_ = old_head->nxt;
147 if ( ! head_) {
148 tail_ = & head_;
149 }
150 old_head->nxt.reset();
151 return old_head;
152 }
153
154public:
155 explicit unbounded_channel( Allocator const& alloc = Allocator() ) noexcept :
156 alloc_{ alloc },
157 tail_{ & head_ } {
158 }
159
160 unbounded_channel( unbounded_channel const&) = delete;
161 unbounded_channel & operator=( unbounded_channel const&) = delete;
162
163 void close() noexcept {
164 std::unique_lock< mutex > lk( mtx_);
165 close_( lk);
166 }
167
168 channel_op_status push( value_type const& va) {
169 typename allocator_traits_t::pointer ptr{
170 allocator_traits_t::allocate( alloc_, 1) };
171 try {
172 allocator_traits_t::construct( alloc_, ptr, va, alloc_);
173 } catch (...) {
174 allocator_traits_t::deallocate( alloc_, ptr, 1);
175 throw;
176 }
177 std::unique_lock< mutex > lk( mtx_);
178 return push_( { detail::convert( ptr) }, lk);
179 }
180
181 channel_op_status push( value_type && va) {
182 typename allocator_traits_t::pointer ptr{
183 allocator_traits_t::allocate( alloc_, 1) };
184 try {
185 allocator_traits_t::construct(
186 alloc_, ptr, std::move( va), alloc_);
187 } catch (...) {
188 allocator_traits_t::deallocate( alloc_, ptr, 1);
189 throw;
190 }
191 std::unique_lock< mutex > lk( mtx_);
192 return push_( { detail::convert( ptr) }, lk);
193 }
194
195 channel_op_status pop( value_type & va) {
196 std::unique_lock< mutex > lk( mtx_);
197 not_empty_cond_.wait( lk,
198 [this](){
199 return is_closed_() || ! is_empty_();
200 });
201 if ( is_closed_() && is_empty_() ) {
202 return channel_op_status::closed;
203 }
204 va = value_pop_( lk);
205 return channel_op_status::success;
206 }
207
208 value_type value_pop() {
209 std::unique_lock< mutex > lk( mtx_);
210 not_empty_cond_.wait( lk,
211 [this](){
212 return is_closed_() || ! is_empty_();
213 });
214 if ( is_closed_() && is_empty_() ) {
215 throw fiber_error(
216 std::make_error_code( std::errc::operation_not_permitted),
217 "boost fiber: queue is closed");
218 }
219 return value_pop_( lk);
220 }
221
222 channel_op_status try_pop( value_type & va) {
223 std::unique_lock< mutex > lk( mtx_);
224 if ( is_closed_() && is_empty_() ) {
225 // let other fibers run
226 lk.unlock();
227 this_fiber::yield();
228 return channel_op_status::closed;
229 }
230 if ( is_empty_() ) {
231 // let other fibers run
232 lk.unlock();
233 this_fiber::yield();
234 return channel_op_status::empty;
235 }
236 va = value_pop_( lk);
237 return channel_op_status::success;
238 }
239
240 template< typename Rep, typename Period >
241 channel_op_status pop_wait_for( value_type & va,
242 std::chrono::duration< Rep, Period > const& timeout_duration) {
243 return pop_wait_until( va, std::chrono::steady_clock::now() + timeout_duration);
244 }
245
246 template< typename Clock, typename Duration >
247 channel_op_status pop_wait_until( value_type & va,
248 std::chrono::time_point< Clock, Duration > const& timeout_time) {
249 std::unique_lock< mutex > lk( mtx_);
250 if ( ! not_empty_cond_.wait_until( lk, timeout_time,
251 [this](){
252 return is_closed_() || ! is_empty_();
253 })) {
254 return channel_op_status::timeout;
255 }
256 if ( is_closed_() && is_empty_() ) {
257 return channel_op_status::closed;
258 }
259 va = value_pop_( lk);
260 return channel_op_status::success;
261 }
262};
263
264}}
265
266#ifdef BOOST_HAS_ABI_HEADERS
267# include BOOST_ABI_SUFFIX
268#endif
269
270#endif // BOOST_FIBERS_UNBOUNDED_CHANNEL_H