]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2014 Cloudius Systems, Ltd. | |
20 | */ | |
21 | ||
22 | #pragma once | |
23 | ||
24 | #include <seastar/core/circular_buffer.hh> | |
25 | #include <seastar/core/future.hh> | |
26 | #include <queue> | |
27 | #include <seastar/util/std-compat.hh> | |
28 | ||
29 | namespace seastar { | |
30 | ||
9f95a23c TL |
31 | /// Asynchronous single-producer single-consumer queue with limited capacity. |
32 | /// There can be at most one producer-side and at most one consumer-side operation active at any time. | |
33 | /// Operations returning a future are considered to be active until the future resolves. | |
20effc67 TL |
34 | /// |
35 | /// Note: queue requires the data type T to be nothrow move constructible as it's | |
36 | /// returned as future<T> by \ref pop_eventually and seastar futurized data type | |
37 | /// are required to be nothrow move-constructible. | |
11fdf7f2 | 38 | template <typename T> |
20effc67 | 39 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 TL |
40 | class queue { |
41 | std::queue<T, circular_buffer<T>> _q; | |
42 | size_t _max; | |
f67539c2 TL |
43 | std::optional<promise<>> _not_empty; |
44 | std::optional<promise<>> _not_full; | |
11fdf7f2 TL |
45 | std::exception_ptr _ex = nullptr; |
46 | private: | |
20effc67 TL |
47 | void notify_not_empty() noexcept; |
48 | void notify_not_full() noexcept; | |
11fdf7f2 TL |
49 | public: |
50 | explicit queue(size_t size); | |
51 | ||
52 | /// \brief Push an item. | |
53 | /// | |
54 | /// Returns false if the queue was full and the item was not pushed. | |
55 | bool push(T&& a); | |
56 | ||
57 | /// \brief Pop an item. | |
58 | /// | |
59 | /// Popping from an empty queue will result in undefined behavior. | |
20effc67 TL |
60 | T pop() noexcept; |
61 | ||
62 | /// \brief access the front element in the queue | |
63 | /// | |
64 | /// Accessing the front of an empty or aborted queue will result in undefined | |
65 | /// behaviour. | |
66 | T& front() noexcept; | |
11fdf7f2 | 67 | |
f67539c2 | 68 | /// Consumes items from the queue, passing them to \c func, until \c func |
11fdf7f2 TL |
69 | /// returns false or the queue it empty |
70 | /// | |
71 | /// Returns false if func returned false. | |
72 | template <typename Func> | |
73 | bool consume(Func&& func); | |
74 | ||
75 | /// Returns true when the queue is empty. | |
20effc67 | 76 | bool empty() const noexcept; |
11fdf7f2 TL |
77 | |
78 | /// Returns true when the queue is full. | |
20effc67 | 79 | bool full() const noexcept; |
11fdf7f2 TL |
80 | |
81 | /// Returns a future<> that becomes available when pop() or consume() | |
82 | /// can be called. | |
9f95a23c | 83 | /// A consumer-side operation. Cannot be called concurrently with other consumer-side operations. |
20effc67 | 84 | future<> not_empty() noexcept; |
11fdf7f2 TL |
85 | |
86 | /// Returns a future<> that becomes available when push() can be called. | |
9f95a23c | 87 | /// A producer-side operation. Cannot be called concurrently with other producer-side operations. |
20effc67 | 88 | future<> not_full() noexcept; |
11fdf7f2 TL |
89 | |
90 | /// Pops element now or when there is some. Returns a future that becomes | |
91 | /// available when some element is available. | |
92 | /// If the queue is, or already was, abort()ed, the future resolves with | |
93 | /// the exception provided to abort(). | |
9f95a23c | 94 | /// A consumer-side operation. Cannot be called concurrently with other consumer-side operations. |
20effc67 | 95 | future<T> pop_eventually() noexcept; |
11fdf7f2 TL |
96 | |
97 | /// Pushes the element now or when there is room. Returns a future<> which | |
98 | /// resolves when data was pushed. | |
99 | /// If the queue is, or already was, abort()ed, the future resolves with | |
100 | /// the exception provided to abort(). | |
9f95a23c | 101 | /// A producer-side operation. Cannot be called concurrently with other producer-side operations. |
20effc67 | 102 | future<> push_eventually(T&& data) noexcept; |
11fdf7f2 TL |
103 | |
104 | /// Returns the number of items currently in the queue. | |
20effc67 TL |
105 | size_t size() const noexcept { |
106 | // std::queue::size() has no reason to throw | |
107 | return _q.size(); | |
108 | } | |
11fdf7f2 TL |
109 | |
110 | /// Returns the size limit imposed on the queue during its construction | |
111 | /// or by a call to set_max_size(). If the queue contains max_size() | |
112 | /// items (or more), further items cannot be pushed until some are popped. | |
20effc67 | 113 | size_t max_size() const noexcept { return _max; } |
11fdf7f2 TL |
114 | |
115 | /// Set the maximum size to a new value. If the queue's max size is reduced, | |
116 | /// items already in the queue will not be expunged and the queue will be temporarily | |
117 | /// bigger than its max_size. | |
20effc67 | 118 | void set_max_size(size_t max) noexcept { |
11fdf7f2 TL |
119 | _max = max; |
120 | if (!full()) { | |
121 | notify_not_full(); | |
122 | } | |
123 | } | |
124 | ||
125 | /// Destroy any items in the queue, and pass the provided exception to any | |
126 | /// waiting readers or writers - or to any later read or write attempts. | |
20effc67 TL |
127 | void abort(std::exception_ptr ex) noexcept { |
128 | // std::queue::empty() and pop() doesn't throw | |
129 | // since it just calls seastar::circular_buffer::pop_front | |
130 | // that is specified as noexcept. | |
11fdf7f2 TL |
131 | while (!_q.empty()) { |
132 | _q.pop(); | |
133 | } | |
134 | _ex = ex; | |
135 | if (_not_full) { | |
136 | _not_full->set_exception(ex); | |
f67539c2 | 137 | _not_full= std::nullopt; |
11fdf7f2 TL |
138 | } |
139 | if (_not_empty) { | |
140 | _not_empty->set_exception(std::move(ex)); | |
f67539c2 | 141 | _not_empty = std::nullopt; |
11fdf7f2 TL |
142 | } |
143 | } | |
f67539c2 TL |
144 | |
145 | /// \brief Check if there is an active consumer | |
146 | /// | |
147 | /// Returns true if another fiber waits for an item to be pushed into the queue | |
20effc67 | 148 | bool has_blocked_consumer() const noexcept { |
f67539c2 TL |
149 | return bool(_not_empty); |
150 | } | |
11fdf7f2 TL |
151 | }; |
152 | ||
153 | template <typename T> | |
20effc67 | 154 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 TL |
155 | inline |
156 | queue<T>::queue(size_t size) | |
157 | : _max(size) { | |
158 | } | |
159 | ||
160 | template <typename T> | |
20effc67 | 161 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 162 | inline |
20effc67 | 163 | void queue<T>::notify_not_empty() noexcept { |
11fdf7f2 TL |
164 | if (_not_empty) { |
165 | _not_empty->set_value(); | |
f67539c2 | 166 | _not_empty = std::optional<promise<>>(); |
11fdf7f2 TL |
167 | } |
168 | } | |
169 | ||
170 | template <typename T> | |
20effc67 | 171 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 172 | inline |
20effc67 | 173 | void queue<T>::notify_not_full() noexcept { |
11fdf7f2 TL |
174 | if (_not_full) { |
175 | _not_full->set_value(); | |
f67539c2 | 176 | _not_full = std::optional<promise<>>(); |
11fdf7f2 TL |
177 | } |
178 | } | |
179 | ||
180 | template <typename T> | |
20effc67 | 181 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 TL |
182 | inline |
183 | bool queue<T>::push(T&& data) { | |
184 | if (_q.size() < _max) { | |
185 | _q.push(std::move(data)); | |
186 | notify_not_empty(); | |
187 | return true; | |
188 | } else { | |
189 | return false; | |
190 | } | |
191 | } | |
192 | ||
193 | template <typename T> | |
20effc67 TL |
194 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
195 | inline | |
196 | T& queue<T>::front() noexcept { | |
197 | // std::queue::front() has no reason to throw | |
198 | return _q.front(); | |
199 | } | |
200 | ||
201 | template <typename T> | |
202 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) | |
11fdf7f2 | 203 | inline |
20effc67 | 204 | T queue<T>::pop() noexcept { |
11fdf7f2 TL |
205 | if (_q.size() == _max) { |
206 | notify_not_full(); | |
207 | } | |
20effc67 TL |
208 | // popping the front element must not throw |
209 | // as T is required to be nothrow_move_constructible | |
210 | // and std::queue::pop won't throw since it uses | |
211 | // seastar::circular_beffer::pop_front. | |
1e59de90 | 212 | assert(!_q.empty()); |
11fdf7f2 TL |
213 | T data = std::move(_q.front()); |
214 | _q.pop(); | |
215 | return data; | |
216 | } | |
217 | ||
218 | template <typename T> | |
20effc67 | 219 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 220 | inline |
20effc67 TL |
221 | future<T> queue<T>::pop_eventually() noexcept { |
222 | // seastar allows only nothrow_move_constructible types | |
223 | // to be returned as future<T> | |
224 | static_assert(std::is_nothrow_move_constructible_v<T>, | |
225 | "Queue element type must be no-throw move constructible"); | |
226 | ||
11fdf7f2 TL |
227 | if (_ex) { |
228 | return make_exception_future<T>(_ex); | |
229 | } | |
230 | if (empty()) { | |
231 | return not_empty().then([this] { | |
232 | if (_ex) { | |
233 | return make_exception_future<T>(_ex); | |
234 | } else { | |
235 | return make_ready_future<T>(pop()); | |
236 | } | |
237 | }); | |
238 | } else { | |
239 | return make_ready_future<T>(pop()); | |
240 | } | |
241 | } | |
242 | ||
243 | template <typename T> | |
20effc67 | 244 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 245 | inline |
20effc67 | 246 | future<> queue<T>::push_eventually(T&& data) noexcept { |
11fdf7f2 TL |
247 | if (_ex) { |
248 | return make_exception_future<>(_ex); | |
249 | } | |
250 | if (full()) { | |
251 | return not_full().then([this, data = std::move(data)] () mutable { | |
252 | _q.push(std::move(data)); | |
253 | notify_not_empty(); | |
254 | }); | |
255 | } else { | |
20effc67 | 256 | try { |
11fdf7f2 TL |
257 | _q.push(std::move(data)); |
258 | notify_not_empty(); | |
259 | return make_ready_future<>(); | |
20effc67 TL |
260 | } catch (...) { |
261 | return current_exception_as_future(); | |
262 | } | |
11fdf7f2 TL |
263 | } |
264 | } | |
265 | ||
266 | template <typename T> | |
20effc67 | 267 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 TL |
268 | template <typename Func> |
269 | inline | |
270 | bool queue<T>::consume(Func&& func) { | |
271 | if (_ex) { | |
272 | std::rethrow_exception(_ex); | |
273 | } | |
274 | bool running = true; | |
275 | while (!_q.empty() && running) { | |
276 | running = func(std::move(_q.front())); | |
277 | _q.pop(); | |
278 | } | |
279 | if (!full()) { | |
280 | notify_not_full(); | |
281 | } | |
282 | return running; | |
283 | } | |
284 | ||
285 | template <typename T> | |
20effc67 | 286 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 287 | inline |
20effc67 TL |
288 | bool queue<T>::empty() const noexcept { |
289 | // std::queue::empty() has no reason to throw | |
11fdf7f2 TL |
290 | return _q.empty(); |
291 | } | |
292 | ||
293 | template <typename T> | |
20effc67 | 294 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 295 | inline |
20effc67 TL |
296 | bool queue<T>::full() const noexcept { |
297 | // std::queue::size() has no reason to throw | |
11fdf7f2 TL |
298 | return _q.size() >= _max; |
299 | } | |
300 | ||
301 | template <typename T> | |
20effc67 | 302 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 303 | inline |
20effc67 | 304 | future<> queue<T>::not_empty() noexcept { |
11fdf7f2 TL |
305 | if (_ex) { |
306 | return make_exception_future<>(_ex); | |
307 | } | |
308 | if (!empty()) { | |
309 | return make_ready_future<>(); | |
310 | } else { | |
311 | _not_empty = promise<>(); | |
312 | return _not_empty->get_future(); | |
313 | } | |
314 | } | |
315 | ||
316 | template <typename T> | |
20effc67 | 317 | SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>) |
11fdf7f2 | 318 | inline |
20effc67 | 319 | future<> queue<T>::not_full() noexcept { |
11fdf7f2 TL |
320 | if (_ex) { |
321 | return make_exception_future<>(_ex); | |
322 | } | |
323 | if (!full()) { | |
324 | return make_ready_future<>(); | |
325 | } else { | |
326 | _not_full = promise<>(); | |
327 | return _not_full->get_future(); | |
328 | } | |
329 | } | |
330 | ||
331 | } | |
332 |