]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/loop.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / loop.hh
CommitLineData
f67539c2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
1e59de90 25#include <cstddef>
f67539c2
TL
26#include <iterator>
27#include <memory>
1e59de90 28#include <type_traits>
f67539c2
TL
29#include <vector>
30
31#include <seastar/core/future.hh>
32#include <seastar/core/task.hh>
33#include <seastar/util/bool_class.hh>
34#include <seastar/core/semaphore.hh>
35
36namespace seastar {
37
38/// \addtogroup future-util
39/// @{
40
41// The AsyncAction concept represents an action which can complete later than
42// the actual function invocation. It is represented by a function which
43// returns a future which resolves when the action is done.
44
45struct stop_iteration_tag { };
46using stop_iteration = bool_class<stop_iteration_tag>;
47
48namespace internal {
49
50template <typename AsyncAction>
51class repeater final : public continuation_base<stop_iteration> {
52 promise<> _promise;
53 AsyncAction _action;
54public:
55 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
56 future<> get_future() { return _promise.get_future(); }
57 task* waiting_task() noexcept override { return _promise.waiting_task(); }
58 virtual void run_and_dispose() noexcept override {
59 if (_state.failed()) {
60 _promise.set_exception(std::move(_state).get_exception());
61 delete this;
62 return;
63 } else {
64 if (_state.get0() == stop_iteration::yes) {
65 _promise.set_value();
66 delete this;
67 return;
68 }
69 _state = {};
70 }
71 try {
72 do {
73 auto f = futurize_invoke(_action);
74 if (!f.available()) {
1e59de90 75 internal::set_callback(std::move(f), this);
f67539c2
TL
76 return;
77 }
78 if (f.get0() == stop_iteration::yes) {
79 _promise.set_value();
80 delete this;
81 return;
82 }
83 } while (!need_preempt());
84 } catch (...) {
85 _promise.set_exception(std::current_exception());
86 delete this;
87 return;
88 }
89 _state.set(stop_iteration::no);
90 schedule(this);
91 }
92};
93
94} // namespace internal
95
96// Delete these overloads so that the actual implementation can use a
97// universal reference but still reject lvalue references.
98template<typename AsyncAction>
99future<> repeat(const AsyncAction& action) noexcept = delete;
100template<typename AsyncAction>
101future<> repeat(AsyncAction& action) noexcept = delete;
102
103/// Invokes given action until it fails or the function requests iteration to stop by returning
104/// \c stop_iteration::yes.
105///
106/// \param action a callable taking no arguments, returning a future<stop_iteration>. Will
107/// be called again as soon as the future resolves, unless the
108/// future fails, action throws, or it resolves with \c stop_iteration::yes.
109/// If \c action is an r-value it can be moved in the middle of iteration.
110/// \return a ready future if we stopped successfully, or a failed future if
111/// a call to to \c action failed.
112template<typename AsyncAction>
113SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> )
114inline
115future<> repeat(AsyncAction&& action) noexcept {
20effc67 116 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
f67539c2
TL
117 static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
118 for (;;) {
119 // Do not type-erase here in case this is a short repeat()
120 auto f = futurator::invoke(action);
121
122 if (!f.available() || f.failed() || need_preempt()) {
123 return [&] () noexcept {
124 memory::scoped_critical_alloc_section _;
125 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
126 auto ret = repeater->get_future();
1e59de90 127 internal::set_callback(std::move(f), repeater);
f67539c2
TL
128 return ret;
129 }();
130 }
131
132 if (f.get0() == stop_iteration::yes) {
133 return make_ready_future<>();
134 }
135 }
136}
137
138/// \cond internal
139
140template <typename T>
141struct repeat_until_value_type_helper;
142
143/// Type helper for repeat_until_value()
144template <typename T>
145struct repeat_until_value_type_helper<future<std::optional<T>>> {
146 /// The type of the value we are computing
147 using value_type = T;
148 /// Type used by \c AsyncAction while looping
149 using optional_type = std::optional<T>;
150 /// Return type of repeat_until_value()
151 using future_type = future<value_type>;
152};
153
154/// Return value of repeat_until_value()
155template <typename AsyncAction>
156using repeat_until_value_return_type
20effc67 157 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
f67539c2
TL
158
159/// \endcond
160
161namespace internal {
162
163template <typename AsyncAction, typename T>
164class repeat_until_value_state final : public continuation_base<std::optional<T>> {
165 promise<T> _promise;
166 AsyncAction _action;
167public:
168 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
169 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
170 this->_state.set(std::move(st));
171 }
172 future<T> get_future() { return _promise.get_future(); }
173 task* waiting_task() noexcept override { return _promise.waiting_task(); }
174 virtual void run_and_dispose() noexcept override {
175 if (this->_state.failed()) {
176 _promise.set_exception(std::move(this->_state).get_exception());
177 delete this;
178 return;
179 } else {
180 auto v = std::move(this->_state).get0();
181 if (v) {
182 _promise.set_value(std::move(*v));
183 delete this;
184 return;
185 }
186 this->_state = {};
187 }
188 try {
189 do {
190 auto f = futurize_invoke(_action);
191 if (!f.available()) {
1e59de90 192 internal::set_callback(std::move(f), this);
f67539c2
TL
193 return;
194 }
195 auto ret = f.get0();
196 if (ret) {
197 _promise.set_value(std::move(*ret));
198 delete this;
199 return;
200 }
201 } while (!need_preempt());
202 } catch (...) {
203 _promise.set_exception(std::current_exception());
204 delete this;
205 return;
206 }
207 this->_state.set(std::nullopt);
208 schedule(this);
209 }
210};
211
212} // namespace internal
213
214/// Invokes given action until it fails or the function requests iteration to stop by returning
215/// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted
216/// from the \c optional, and returned, as a future, from repeat_until_value().
217///
218/// \param action a callable taking no arguments, returning a future<std::optional<T>>
219/// or std::optional<T>. Will be called again as soon as the future
220/// resolves, unless the future fails, action throws, or it resolves with
221/// an engaged \c optional. If \c action is an r-value it can be moved
222/// in the middle of iteration.
223/// \return a ready future if we stopped successfully, or a failed future if
224/// a call to to \c action failed. The \c optional's value is returned.
225template<typename AsyncAction>
226SEASTAR_CONCEPT( requires requires (AsyncAction aa) {
227 bool(futurize_invoke(aa).get0());
228 futurize_invoke(aa).get0().value();
229} )
230repeat_until_value_return_type<AsyncAction>
231repeat_until_value(AsyncAction action) noexcept {
20effc67 232 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
f67539c2
TL
233 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
234 // the "T" in the documentation
235 using value_type = typename type_helper::value_type;
236 using optional_type = typename type_helper::optional_type;
237 do {
238 auto f = futurator::invoke(action);
239
240 if (!f.available()) {
241 return [&] () noexcept {
242 memory::scoped_critical_alloc_section _;
243 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
244 auto ret = state->get_future();
1e59de90 245 internal::set_callback(std::move(f), state);
f67539c2
TL
246 return ret;
247 }();
248 }
249
250 if (f.failed()) {
251 return make_exception_future<value_type>(f.get_exception());
252 }
253
254 optional_type&& optional = std::move(f).get0();
255 if (optional) {
256 return make_ready_future<value_type>(std::move(optional.value()));
257 }
258 } while (!need_preempt());
259
260 try {
261 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
262 auto f = state->get_future();
263 schedule(state);
264 return f;
265 } catch (...) {
266 return make_exception_future<value_type>(std::current_exception());
267 }
268}
269
270namespace internal {
271
272template <typename StopCondition, typename AsyncAction>
273class do_until_state final : public continuation_base<> {
274 promise<> _promise;
275 StopCondition _stop;
276 AsyncAction _action;
277public:
278 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
279 future<> get_future() { return _promise.get_future(); }
280 task* waiting_task() noexcept override { return _promise.waiting_task(); }
281 virtual void run_and_dispose() noexcept override {
282 if (_state.available()) {
283 if (_state.failed()) {
284 _promise.set_urgent_state(std::move(_state));
285 delete this;
286 return;
287 }
288 _state = {}; // allow next cycle to overrun state
289 }
290 try {
291 do {
292 if (_stop()) {
293 _promise.set_value();
294 delete this;
295 return;
296 }
297 auto f = _action();
298 if (!f.available()) {
1e59de90 299 internal::set_callback(std::move(f), this);
f67539c2
TL
300 return;
301 }
302 if (f.failed()) {
303 f.forward_to(std::move(_promise));
304 delete this;
305 return;
306 }
307 } while (!need_preempt());
308 } catch (...) {
309 _promise.set_exception(std::current_exception());
310 delete this;
311 return;
312 }
313 schedule(this);
314 }
315};
316
317} // namespace internal
318
20effc67 319/// Invokes given action until it fails or given condition evaluates to true or fails.
f67539c2
TL
320///
321/// \param stop_cond a callable taking no arguments, returning a boolean that
322/// evalutes to true when you don't want to call \c action
20effc67
TL
323/// any longer. If \c stop_cond fails, the exception is propagated
324// in the returned future.
f67539c2
TL
325/// \param action a callable taking no arguments, returning a future<>. Will
326/// be called again as soon as the future resolves, unless the
20effc67 327/// future fails, or \c stop_cond returns \c true or fails.
f67539c2 328/// \return a ready future if we stopped successfully, or a failed future if
20effc67 329/// a call to to \c action or a call to \c stop_cond failed.
f67539c2
TL
330template<typename AsyncAction, typename StopCondition>
331SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> )
332inline
333future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
334 using namespace internal;
335 for (;;) {
20effc67 336 try {
f67539c2
TL
337 if (stop_cond()) {
338 return make_ready_future<>();
339 }
20effc67
TL
340 } catch (...) {
341 return current_exception_as_future();
342 }
f67539c2
TL
343 auto f = futurize_invoke(action);
344 if (f.failed()) {
345 return f;
346 }
347 if (!f.available() || need_preempt()) {
348 return [&] () noexcept {
349 memory::scoped_critical_alloc_section _;
350 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
351 auto ret = task->get_future();
1e59de90 352 internal::set_callback(std::move(f), task);
f67539c2
TL
353 return ret;
354 }();
355 }
356 }
357}
358
359/// Invoke given action until it fails.
360///
361/// Calls \c action repeatedly until it returns a failed future.
362///
363/// \param action a callable taking no arguments, returning a \c future<>
364/// that becomes ready when you wish it to be called again.
365/// \return a future<> that will resolve to the first failure of \c action
366template<typename AsyncAction>
367SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> )
368inline
369future<> keep_doing(AsyncAction action) noexcept {
370 return repeat([action = std::move(action)] () mutable {
371 return action().then([] {
372 return stop_iteration::no;
373 });
374 });
375}
376
377namespace internal {
378template <typename Iterator, typename AsyncAction>
379class do_for_each_state final : public continuation_base<> {
380 Iterator _begin;
381 Iterator _end;
382 AsyncAction _action;
383 promise<> _pr;
384
385public:
1e59de90 386 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
f67539c2 387 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
1e59de90 388 internal::set_callback(std::move(first_unavailable), this);
f67539c2
TL
389 }
390 virtual void run_and_dispose() noexcept override {
391 std::unique_ptr<do_for_each_state> zis(this);
392 if (_state.failed()) {
393 _pr.set_urgent_state(std::move(_state));
394 return;
395 }
396 while (_begin != _end) {
397 auto f = futurize_invoke(_action, *_begin++);
398 if (f.failed()) {
399 f.forward_to(std::move(_pr));
400 return;
401 }
402 if (!f.available() || need_preempt()) {
403 _state = {};
1e59de90 404 internal::set_callback(std::move(f), this);
f67539c2
TL
405 zis.release();
406 return;
407 }
408 }
409 _pr.set_value();
410 }
411 task* waiting_task() noexcept override {
412 return _pr.waiting_task();
413 }
414 future<> get_future() {
415 return _pr.get_future();
416 }
417};
418
419template<typename Iterator, typename AsyncAction>
420inline
421future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
422 while (begin != end) {
423 auto f = futurize_invoke(action, *begin++);
424 if (f.failed()) {
425 return f;
426 }
427 if (!f.available() || need_preempt()) {
428 auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
429 std::move(begin), std::move(end), std::move(action), std::move(f)};
430 return s->get_future();
431 }
432 }
433 return make_ready_future<>();
434}
435} // namespace internal
436
437/// \addtogroup future-util
438
439/// \brief Call a function for each item in a range, sequentially (iterator version).
440///
441/// For each item in a range, call a function, waiting for the previous
442/// invocation to complete before calling the next one.
443///
444/// \param begin an \c InputIterator designating the beginning of the range
445/// \param end an \c InputIterator designating the endof the range
446/// \param action a callable, taking a reference to objects from the range
447/// as a parameter, and returning a \c future<> that resolves
448/// when it is acceptable to process the next item.
449/// \return a ready future on success, or the first failed future if
450/// \c action failed.
451template<typename Iterator, typename AsyncAction>
452SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
453 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
454} )
455inline
456future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
457 try {
458 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
459 } catch (...) {
460 return current_exception_as_future();
461 }
462}
463
464/// \brief Call a function for each item in a range, sequentially (range version).
465///
466/// For each item in a range, call a function, waiting for the previous
467/// invocation to complete before calling the next one.
468///
469/// \param c an \c Container object designating input range
470/// \param action a callable, taking a reference to objects from the range
471/// as a parameter, and returning a \c future<> that resolves
472/// when it is acceptable to process the next item.
473/// \return a ready future on success, or the first failed future if
474/// \c action failed.
475template<typename Container, typename AsyncAction>
476SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
1e59de90
TL
477 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
478 std::end(c);
f67539c2
TL
479} )
480inline
481future<> do_for_each(Container& c, AsyncAction action) noexcept {
482 try {
483 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
484 } catch (...) {
485 return current_exception_as_future();
486 }
487}
488
489namespace internal {
490
1e59de90
TL
491template <typename T, typename = void>
492struct has_iterator_category : std::false_type {};
493
494template <typename T>
495struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
496
497template <typename Iterator, typename Sentinel, typename IteratorCategory>
f67539c2
TL
498inline
499size_t
1e59de90 500iterator_range_estimate_vector_capacity(Iterator const&, Sentinel const&, IteratorCategory) {
f67539c2
TL
501 // For InputIterators we can't estimate needed capacity
502 return 0;
503}
504
1e59de90 505template <typename Iterator, typename Sentinel>
f67539c2
TL
506inline
507size_t
1e59de90 508iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, std::forward_iterator_tag) {
f67539c2
TL
509 // May be linear time below random_access_iterator_tag, but still better than reallocation
510 return std::distance(begin, end);
511}
512
513} // namespace internal
514
515/// \cond internal
516
517class parallel_for_each_state final : private continuation_base<> {
518 std::vector<future<>> _incomplete;
519 promise<> _result;
520 std::exception_ptr _ex;
521private:
522 // Wait for one of the futures in _incomplete to complete, and then
523 // decide what to do: wait for another one, or deliver _result if all
524 // are complete.
525 void wait_for_one() noexcept;
526 virtual void run_and_dispose() noexcept override;
527 task* waiting_task() noexcept override { return _result.waiting_task(); }
528public:
529 parallel_for_each_state(size_t n);
530 void add_future(future<>&& f);
531 future<> get_future();
532};
533
534/// \endcond
535
536/// \brief Run tasks in parallel (iterator version).
537///
538/// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
539/// the range, and return a future<> that resolves when all the functions
540/// complete. \c func should return a future<> that indicates when it is
541/// complete. All invocations are performed in parallel. This allows the range
542/// to refer to stack objects, but means that unlike other loops this cannot
543/// check need_preempt and can only be used with small ranges.
544///
545/// \param begin an \c InputIterator designating the beginning of the range
546/// \param end an \c InputIterator designating the end of the range
547/// \param func Function to invoke with each element in the range (returning
548/// a \c future<>)
549/// \return a \c future<> that resolves when all the function invocations
550/// complete. If one or more return an exception, the return value
551/// contains one of the exceptions.
1e59de90
TL
552/// \note parallel_for_each() schedules all invocations of \c func on the
553/// current shard. If you want to run a function on all shards in parallel,
554/// have a look at \ref smp::invoke_on_all() instead.
555template <typename Iterator, typename Sentinel, typename Func>
556SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)))
557// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
558// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
559// break legacy code, for which it holds that Sentinel equals Iterator.
f67539c2
TL
560inline
561future<>
1e59de90 562parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
f67539c2
TL
563 parallel_for_each_state* s = nullptr;
564 // Process all elements, giving each future the following treatment:
565 // - available, not failed: do nothing
566 // - available, failed: collect exception in ex
567 // - not available: collect in s (allocating it if needed)
568 while (begin != end) {
1e59de90
TL
569 auto f = futurize_invoke(std::forward<Func>(func), *begin);
570 ++begin;
20effc67 571 memory::scoped_critical_alloc_section _;
f67539c2
TL
572 if (!f.available() || f.failed()) {
573 if (!s) {
f67539c2 574 using itraits = std::iterator_traits<Iterator>;
1e59de90
TL
575 size_t n{0U};
576 if constexpr (internal::has_iterator_category<Iterator>::value) {
577 // We need if-constexpr here because there exist iterators for which std::iterator_traits
578 // does not have 'iterator_category' as member type
579 n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category{}) + 1);
580 }
f67539c2
TL
581 s = new parallel_for_each_state(n);
582 }
583 s->add_future(std::move(f));
584 }
585 }
586 // If any futures were not available, hand off to parallel_for_each_state::start().
587 // Otherwise we can return a result immediately.
588 if (s) {
589 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
590 // so this isn't a leak
591 return s->get_future();
592 }
593 return make_ready_future<>();
594}
595
596/// \brief Run tasks in parallel (range version).
597///
598/// Given a \c range of objects, invoke \c func with each object
599/// in the range, and return a future<> that resolves when all
600/// the functions complete. \c func should return a future<> that indicates
601/// when it is complete. All invocations are performed in parallel. This allows
602/// the range to refer to stack objects, but means that unlike other loops this
603/// cannot check need_preempt and can only be used with small ranges.
604///
605/// \param range A range of objects to iterate run \c func on
606/// \param func A callable, accepting reference to the range's
607/// \c value_type, and returning a \c future<>.
608/// \return a \c future<> that becomes ready when the entire range
609/// was processed. If one or more of the invocations of
610/// \c func returned an exceptional future, then the return
611/// value will contain one of those exceptions.
1e59de90
TL
612/// \note parallel_for_each() schedules all invocations of \c func on the
613/// current shard. If you want to run a function on all shards in parallel,
614/// have a look at \ref smp::invoke_on_all() instead.
f67539c2
TL
615
616namespace internal {
617
618template <typename Range, typename Func>
619inline
620future<>
621parallel_for_each_impl(Range&& range, Func&& func) {
622 return parallel_for_each(std::begin(range), std::end(range),
623 std::forward<Func>(func));
624}
625
626} // namespace internal
627
628template <typename Range, typename Func>
1e59de90
TL
629SEASTAR_CONCEPT( requires requires (Func f, Range r) {
630 { f(*std::begin(r)) } -> std::same_as<future<>>;
631 std::end(r);
632} )
f67539c2
TL
633inline
634future<>
635parallel_for_each(Range&& range, Func&& func) noexcept {
636 auto impl = internal::parallel_for_each_impl<Range, Func>;
637 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
638}
639
640/// Run a maximum of \c max_concurrent tasks in parallel (iterator version).
641///
642/// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
643/// the range, and return a future<> that resolves when all the functions
644/// complete. \c func should return a future<> that indicates when it is
645/// complete. Up to \c max_concurrent invocations are performed in parallel.
646/// This does not allow the range to refer to stack objects. The caller
647/// must ensure that the range outlives the call to max_concurrent_for_each
648/// so it can be iterated in the background.
649///
650/// \param begin an \c InputIterator designating the beginning of the range
651/// \param end an \c InputIterator designating the end of the range
652/// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
653/// \param func Function to invoke with each element in the range (returning
654/// a \c future<>)
655/// \return a \c future<> that resolves when all the function invocations
656/// complete. If one or more return an exception, the return value
657/// contains one of the exceptions.
1e59de90
TL
658/// \note max_concurrent_for_each() schedules all invocations of \c func on the
659/// current shard. If you want to run a function on all shards in parallel,
660/// have a look at \ref smp::invoke_on_all() instead.
661template <typename Iterator, typename Sentinel, typename Func>
662SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) ) )
663// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
664// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
665// break legacy code, for which it holds that Sentinel equals Iterator.
f67539c2
TL
666inline
667future<>
1e59de90 668max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
f67539c2
TL
669 struct state {
670 Iterator begin;
1e59de90 671 Sentinel end;
f67539c2
TL
672 Func func;
673 size_t max_concurrent;
674 semaphore sem;
675 std::exception_ptr err;
676
1e59de90 677 state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
f67539c2
TL
678 : begin(std::move(begin_))
679 , end(std::move(end_))
680 , func(std::move(func_))
681 , max_concurrent(max_concurrent_)
682 , sem(max_concurrent_)
683 , err()
684 { }
685 };
686
687 assert(max_concurrent > 0);
688
689 try {
690 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
691 return do_until([&s] { return s.begin == s.end; }, [&s] {
692 return s.sem.wait().then([&s] () mutable noexcept {
693 // Possibly run in background and signal _sem when the task is done.
694 // The background tasks are waited on using _sem.
1e59de90 695 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
f67539c2
TL
696 if (fut.failed()) {
697 auto e = fut.get_exception();;
698 if (!s.err) {
699 s.err = std::move(e);
700 }
701 }
702 s.sem.signal();
703 });
1e59de90 704 ++s.begin;
f67539c2
TL
705 });
706 }).then([&s] {
707 // Wait for any background task to finish
708 // and signal and semaphore
709 return s.sem.wait(s.max_concurrent);
710 }).then([&s] {
711 if (!s.err) {
712 return make_ready_future<>();
713 }
714 return seastar::make_exception_future<>(std::move(s.err));
715 });
716 });
717 } catch (...) {
718 return current_exception_as_future();
719 }
720}
721
722/// Run a maximum of \c max_concurrent tasks in parallel (range version).
723///
20effc67 724/// Given a range of objects, run \c func on each \c *i in
f67539c2
TL
725/// the range, and return a future<> that resolves when all the functions
726/// complete. \c func should return a future<> that indicates when it is
727/// complete. Up to \c max_concurrent invocations are performed in parallel.
728/// This does not allow the range to refer to stack objects. The caller
729/// must ensure that the range outlives the call to max_concurrent_for_each
730/// so it can be iterated in the background.
731///
20effc67 732/// \param range a \c Range to be processed
f67539c2
TL
733/// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
734/// \param func Function to invoke with each element in the range (returning
735/// a \c future<>)
736/// \return a \c future<> that resolves when all the function invocations
737/// complete. If one or more return an exception, the return value
738/// contains one of the exceptions.
1e59de90
TL
739/// \note max_concurrent_for_each() schedules all invocations of \c func on the
740/// current shard. If you want to run a function on all shards in parallel,
741/// have a look at \ref smp::invoke_on_all() instead.
f67539c2 742template <typename Range, typename Func>
1e59de90
TL
743SEASTAR_CONCEPT( requires requires (Func f, Range r) {
744 { f(*std::begin(r)) } -> std::same_as<future<>>;
745 std::end(r);
746} )
f67539c2
TL
747inline
748future<>
749max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
750 try {
751 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
752 } catch (...) {
753 return current_exception_as_future();
754 }
755}
756
757/// @}
758
759} // namespace seastar