]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/queue.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / queue.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/future.hh>
26#include <queue>
27#include <seastar/util/std-compat.hh>
28
29namespace seastar {
30
9f95a23c
TL
31/// Asynchronous single-producer single-consumer queue with limited capacity.
32/// There can be at most one producer-side and at most one consumer-side operation active at any time.
33/// Operations returning a future are considered to be active until the future resolves.
20effc67
TL
34///
35/// Note: queue requires the data type T to be nothrow move constructible as it's
36/// returned as future<T> by \ref pop_eventually and seastar futurized data type
37/// are required to be nothrow move-constructible.
11fdf7f2 38template <typename T>
20effc67 39SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2
TL
40class queue {
41 std::queue<T, circular_buffer<T>> _q;
42 size_t _max;
f67539c2
TL
43 std::optional<promise<>> _not_empty;
44 std::optional<promise<>> _not_full;
11fdf7f2
TL
45 std::exception_ptr _ex = nullptr;
46private:
20effc67
TL
47 void notify_not_empty() noexcept;
48 void notify_not_full() noexcept;
11fdf7f2
TL
49public:
50 explicit queue(size_t size);
51
52 /// \brief Push an item.
53 ///
54 /// Returns false if the queue was full and the item was not pushed.
55 bool push(T&& a);
56
57 /// \brief Pop an item.
58 ///
59 /// Popping from an empty queue will result in undefined behavior.
20effc67
TL
60 T pop() noexcept;
61
62 /// \brief access the front element in the queue
63 ///
64 /// Accessing the front of an empty or aborted queue will result in undefined
65 /// behaviour.
66 T& front() noexcept;
11fdf7f2 67
f67539c2 68 /// Consumes items from the queue, passing them to \c func, until \c func
11fdf7f2
TL
69 /// returns false or the queue it empty
70 ///
71 /// Returns false if func returned false.
72 template <typename Func>
73 bool consume(Func&& func);
74
75 /// Returns true when the queue is empty.
20effc67 76 bool empty() const noexcept;
11fdf7f2
TL
77
78 /// Returns true when the queue is full.
20effc67 79 bool full() const noexcept;
11fdf7f2
TL
80
81 /// Returns a future<> that becomes available when pop() or consume()
82 /// can be called.
9f95a23c 83 /// A consumer-side operation. Cannot be called concurrently with other consumer-side operations.
20effc67 84 future<> not_empty() noexcept;
11fdf7f2
TL
85
86 /// Returns a future<> that becomes available when push() can be called.
9f95a23c 87 /// A producer-side operation. Cannot be called concurrently with other producer-side operations.
20effc67 88 future<> not_full() noexcept;
11fdf7f2
TL
89
90 /// Pops element now or when there is some. Returns a future that becomes
91 /// available when some element is available.
92 /// If the queue is, or already was, abort()ed, the future resolves with
93 /// the exception provided to abort().
9f95a23c 94 /// A consumer-side operation. Cannot be called concurrently with other consumer-side operations.
20effc67 95 future<T> pop_eventually() noexcept;
11fdf7f2
TL
96
97 /// Pushes the element now or when there is room. Returns a future<> which
98 /// resolves when data was pushed.
99 /// If the queue is, or already was, abort()ed, the future resolves with
100 /// the exception provided to abort().
9f95a23c 101 /// A producer-side operation. Cannot be called concurrently with other producer-side operations.
20effc67 102 future<> push_eventually(T&& data) noexcept;
11fdf7f2
TL
103
104 /// Returns the number of items currently in the queue.
20effc67
TL
105 size_t size() const noexcept {
106 // std::queue::size() has no reason to throw
107 return _q.size();
108 }
11fdf7f2
TL
109
110 /// Returns the size limit imposed on the queue during its construction
111 /// or by a call to set_max_size(). If the queue contains max_size()
112 /// items (or more), further items cannot be pushed until some are popped.
20effc67 113 size_t max_size() const noexcept { return _max; }
11fdf7f2
TL
114
115 /// Set the maximum size to a new value. If the queue's max size is reduced,
116 /// items already in the queue will not be expunged and the queue will be temporarily
117 /// bigger than its max_size.
20effc67 118 void set_max_size(size_t max) noexcept {
11fdf7f2
TL
119 _max = max;
120 if (!full()) {
121 notify_not_full();
122 }
123 }
124
125 /// Destroy any items in the queue, and pass the provided exception to any
126 /// waiting readers or writers - or to any later read or write attempts.
20effc67
TL
127 void abort(std::exception_ptr ex) noexcept {
128 // std::queue::empty() and pop() doesn't throw
129 // since it just calls seastar::circular_buffer::pop_front
130 // that is specified as noexcept.
11fdf7f2
TL
131 while (!_q.empty()) {
132 _q.pop();
133 }
134 _ex = ex;
135 if (_not_full) {
136 _not_full->set_exception(ex);
f67539c2 137 _not_full= std::nullopt;
11fdf7f2
TL
138 }
139 if (_not_empty) {
140 _not_empty->set_exception(std::move(ex));
f67539c2 141 _not_empty = std::nullopt;
11fdf7f2
TL
142 }
143 }
f67539c2
TL
144
145 /// \brief Check if there is an active consumer
146 ///
147 /// Returns true if another fiber waits for an item to be pushed into the queue
20effc67 148 bool has_blocked_consumer() const noexcept {
f67539c2
TL
149 return bool(_not_empty);
150 }
11fdf7f2
TL
151};
152
153template <typename T>
20effc67 154SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2
TL
155inline
156queue<T>::queue(size_t size)
157 : _max(size) {
158}
159
160template <typename T>
20effc67 161SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 162inline
20effc67 163void queue<T>::notify_not_empty() noexcept {
11fdf7f2
TL
164 if (_not_empty) {
165 _not_empty->set_value();
f67539c2 166 _not_empty = std::optional<promise<>>();
11fdf7f2
TL
167 }
168}
169
170template <typename T>
20effc67 171SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 172inline
20effc67 173void queue<T>::notify_not_full() noexcept {
11fdf7f2
TL
174 if (_not_full) {
175 _not_full->set_value();
f67539c2 176 _not_full = std::optional<promise<>>();
11fdf7f2
TL
177 }
178}
179
180template <typename T>
20effc67 181SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2
TL
182inline
183bool queue<T>::push(T&& data) {
184 if (_q.size() < _max) {
185 _q.push(std::move(data));
186 notify_not_empty();
187 return true;
188 } else {
189 return false;
190 }
191}
192
193template <typename T>
20effc67
TL
194SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
195inline
196T& queue<T>::front() noexcept {
197 // std::queue::front() has no reason to throw
198 return _q.front();
199}
200
201template <typename T>
202SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 203inline
20effc67 204T queue<T>::pop() noexcept {
11fdf7f2
TL
205 if (_q.size() == _max) {
206 notify_not_full();
207 }
20effc67
TL
208 // popping the front element must not throw
209 // as T is required to be nothrow_move_constructible
210 // and std::queue::pop won't throw since it uses
211 // seastar::circular_beffer::pop_front.
1e59de90 212 assert(!_q.empty());
11fdf7f2
TL
213 T data = std::move(_q.front());
214 _q.pop();
215 return data;
216}
217
218template <typename T>
20effc67 219SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 220inline
20effc67
TL
221future<T> queue<T>::pop_eventually() noexcept {
222 // seastar allows only nothrow_move_constructible types
223 // to be returned as future<T>
224 static_assert(std::is_nothrow_move_constructible_v<T>,
225 "Queue element type must be no-throw move constructible");
226
11fdf7f2
TL
227 if (_ex) {
228 return make_exception_future<T>(_ex);
229 }
230 if (empty()) {
231 return not_empty().then([this] {
232 if (_ex) {
233 return make_exception_future<T>(_ex);
234 } else {
235 return make_ready_future<T>(pop());
236 }
237 });
238 } else {
239 return make_ready_future<T>(pop());
240 }
241}
242
243template <typename T>
20effc67 244SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 245inline
20effc67 246future<> queue<T>::push_eventually(T&& data) noexcept {
11fdf7f2
TL
247 if (_ex) {
248 return make_exception_future<>(_ex);
249 }
250 if (full()) {
251 return not_full().then([this, data = std::move(data)] () mutable {
252 _q.push(std::move(data));
253 notify_not_empty();
254 });
255 } else {
20effc67 256 try {
11fdf7f2
TL
257 _q.push(std::move(data));
258 notify_not_empty();
259 return make_ready_future<>();
20effc67
TL
260 } catch (...) {
261 return current_exception_as_future();
262 }
11fdf7f2
TL
263 }
264}
265
266template <typename T>
20effc67 267SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2
TL
268template <typename Func>
269inline
270bool queue<T>::consume(Func&& func) {
271 if (_ex) {
272 std::rethrow_exception(_ex);
273 }
274 bool running = true;
275 while (!_q.empty() && running) {
276 running = func(std::move(_q.front()));
277 _q.pop();
278 }
279 if (!full()) {
280 notify_not_full();
281 }
282 return running;
283}
284
285template <typename T>
20effc67 286SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 287inline
20effc67
TL
288bool queue<T>::empty() const noexcept {
289 // std::queue::empty() has no reason to throw
11fdf7f2
TL
290 return _q.empty();
291}
292
293template <typename T>
20effc67 294SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 295inline
20effc67
TL
296bool queue<T>::full() const noexcept {
297 // std::queue::size() has no reason to throw
11fdf7f2
TL
298 return _q.size() >= _max;
299}
300
301template <typename T>
20effc67 302SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 303inline
20effc67 304future<> queue<T>::not_empty() noexcept {
11fdf7f2
TL
305 if (_ex) {
306 return make_exception_future<>(_ex);
307 }
308 if (!empty()) {
309 return make_ready_future<>();
310 } else {
311 _not_empty = promise<>();
312 return _not_empty->get_future();
313 }
314}
315
316template <typename T>
20effc67 317SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
11fdf7f2 318inline
20effc67 319future<> queue<T>::not_full() noexcept {
11fdf7f2
TL
320 if (_ex) {
321 return make_exception_future<>(_ex);
322 }
323 if (!full()) {
324 return make_ready_future<>();
325 } else {
326 _not_full = promise<>();
327 return _not_full->get_future();
328 }
329}
330
331}
332