]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/loop.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / loop.hh
CommitLineData
f67539c2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#include <iterator>
26#include <memory>
27#include <vector>
28
29#include <seastar/core/future.hh>
30#include <seastar/core/task.hh>
31#include <seastar/util/bool_class.hh>
32#include <seastar/core/semaphore.hh>
33
34namespace seastar {
35
36/// \addtogroup future-util
37/// @{
38
39// The AsyncAction concept represents an action which can complete later than
40// the actual function invocation. It is represented by a function which
41// returns a future which resolves when the action is done.
42
43struct stop_iteration_tag { };
44using stop_iteration = bool_class<stop_iteration_tag>;
45
46namespace internal {
47
48template <typename AsyncAction>
49class repeater final : public continuation_base<stop_iteration> {
50 promise<> _promise;
51 AsyncAction _action;
52public:
53 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
54 future<> get_future() { return _promise.get_future(); }
55 task* waiting_task() noexcept override { return _promise.waiting_task(); }
56 virtual void run_and_dispose() noexcept override {
57 if (_state.failed()) {
58 _promise.set_exception(std::move(_state).get_exception());
59 delete this;
60 return;
61 } else {
62 if (_state.get0() == stop_iteration::yes) {
63 _promise.set_value();
64 delete this;
65 return;
66 }
67 _state = {};
68 }
69 try {
70 do {
71 auto f = futurize_invoke(_action);
72 if (!f.available()) {
73 internal::set_callback(f, this);
74 return;
75 }
76 if (f.get0() == stop_iteration::yes) {
77 _promise.set_value();
78 delete this;
79 return;
80 }
81 } while (!need_preempt());
82 } catch (...) {
83 _promise.set_exception(std::current_exception());
84 delete this;
85 return;
86 }
87 _state.set(stop_iteration::no);
88 schedule(this);
89 }
90};
91
92} // namespace internal
93
94// Delete these overloads so that the actual implementation can use a
95// universal reference but still reject lvalue references.
96template<typename AsyncAction>
97future<> repeat(const AsyncAction& action) noexcept = delete;
98template<typename AsyncAction>
99future<> repeat(AsyncAction& action) noexcept = delete;
100
101/// Invokes given action until it fails or the function requests iteration to stop by returning
102/// \c stop_iteration::yes.
103///
104/// \param action a callable taking no arguments, returning a future<stop_iteration>. Will
105/// be called again as soon as the future resolves, unless the
106/// future fails, action throws, or it resolves with \c stop_iteration::yes.
107/// If \c action is an r-value it can be moved in the middle of iteration.
108/// \return a ready future if we stopped successfully, or a failed future if
109/// a call to to \c action failed.
110template<typename AsyncAction>
111SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> )
112inline
113future<> repeat(AsyncAction&& action) noexcept {
20effc67 114 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
f67539c2
TL
115 static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
116 for (;;) {
117 // Do not type-erase here in case this is a short repeat()
118 auto f = futurator::invoke(action);
119
120 if (!f.available() || f.failed() || need_preempt()) {
121 return [&] () noexcept {
122 memory::scoped_critical_alloc_section _;
123 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
124 auto ret = repeater->get_future();
125 internal::set_callback(f, repeater);
126 return ret;
127 }();
128 }
129
130 if (f.get0() == stop_iteration::yes) {
131 return make_ready_future<>();
132 }
133 }
134}
135
136/// \cond internal
137
138template <typename T>
139struct repeat_until_value_type_helper;
140
141/// Type helper for repeat_until_value()
142template <typename T>
143struct repeat_until_value_type_helper<future<std::optional<T>>> {
144 /// The type of the value we are computing
145 using value_type = T;
146 /// Type used by \c AsyncAction while looping
147 using optional_type = std::optional<T>;
148 /// Return type of repeat_until_value()
149 using future_type = future<value_type>;
150};
151
152/// Return value of repeat_until_value()
153template <typename AsyncAction>
154using repeat_until_value_return_type
20effc67 155 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
f67539c2
TL
156
157/// \endcond
158
159namespace internal {
160
161template <typename AsyncAction, typename T>
162class repeat_until_value_state final : public continuation_base<std::optional<T>> {
163 promise<T> _promise;
164 AsyncAction _action;
165public:
166 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
167 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
168 this->_state.set(std::move(st));
169 }
170 future<T> get_future() { return _promise.get_future(); }
171 task* waiting_task() noexcept override { return _promise.waiting_task(); }
172 virtual void run_and_dispose() noexcept override {
173 if (this->_state.failed()) {
174 _promise.set_exception(std::move(this->_state).get_exception());
175 delete this;
176 return;
177 } else {
178 auto v = std::move(this->_state).get0();
179 if (v) {
180 _promise.set_value(std::move(*v));
181 delete this;
182 return;
183 }
184 this->_state = {};
185 }
186 try {
187 do {
188 auto f = futurize_invoke(_action);
189 if (!f.available()) {
190 internal::set_callback(f, this);
191 return;
192 }
193 auto ret = f.get0();
194 if (ret) {
195 _promise.set_value(std::move(*ret));
196 delete this;
197 return;
198 }
199 } while (!need_preempt());
200 } catch (...) {
201 _promise.set_exception(std::current_exception());
202 delete this;
203 return;
204 }
205 this->_state.set(std::nullopt);
206 schedule(this);
207 }
208};
209
210} // namespace internal
211
212/// Invokes given action until it fails or the function requests iteration to stop by returning
213/// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted
214/// from the \c optional, and returned, as a future, from repeat_until_value().
215///
216/// \param action a callable taking no arguments, returning a future<std::optional<T>>
217/// or std::optional<T>. Will be called again as soon as the future
218/// resolves, unless the future fails, action throws, or it resolves with
219/// an engaged \c optional. If \c action is an r-value it can be moved
220/// in the middle of iteration.
221/// \return a ready future if we stopped successfully, or a failed future if
222/// a call to to \c action failed. The \c optional's value is returned.
223template<typename AsyncAction>
224SEASTAR_CONCEPT( requires requires (AsyncAction aa) {
225 bool(futurize_invoke(aa).get0());
226 futurize_invoke(aa).get0().value();
227} )
228repeat_until_value_return_type<AsyncAction>
229repeat_until_value(AsyncAction action) noexcept {
20effc67 230 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
f67539c2
TL
231 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
232 // the "T" in the documentation
233 using value_type = typename type_helper::value_type;
234 using optional_type = typename type_helper::optional_type;
235 do {
236 auto f = futurator::invoke(action);
237
238 if (!f.available()) {
239 return [&] () noexcept {
240 memory::scoped_critical_alloc_section _;
241 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
242 auto ret = state->get_future();
243 internal::set_callback(f, state);
244 return ret;
245 }();
246 }
247
248 if (f.failed()) {
249 return make_exception_future<value_type>(f.get_exception());
250 }
251
252 optional_type&& optional = std::move(f).get0();
253 if (optional) {
254 return make_ready_future<value_type>(std::move(optional.value()));
255 }
256 } while (!need_preempt());
257
258 try {
259 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
260 auto f = state->get_future();
261 schedule(state);
262 return f;
263 } catch (...) {
264 return make_exception_future<value_type>(std::current_exception());
265 }
266}
267
268namespace internal {
269
270template <typename StopCondition, typename AsyncAction>
271class do_until_state final : public continuation_base<> {
272 promise<> _promise;
273 StopCondition _stop;
274 AsyncAction _action;
275public:
276 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
277 future<> get_future() { return _promise.get_future(); }
278 task* waiting_task() noexcept override { return _promise.waiting_task(); }
279 virtual void run_and_dispose() noexcept override {
280 if (_state.available()) {
281 if (_state.failed()) {
282 _promise.set_urgent_state(std::move(_state));
283 delete this;
284 return;
285 }
286 _state = {}; // allow next cycle to overrun state
287 }
288 try {
289 do {
290 if (_stop()) {
291 _promise.set_value();
292 delete this;
293 return;
294 }
295 auto f = _action();
296 if (!f.available()) {
297 internal::set_callback(f, this);
298 return;
299 }
300 if (f.failed()) {
301 f.forward_to(std::move(_promise));
302 delete this;
303 return;
304 }
305 } while (!need_preempt());
306 } catch (...) {
307 _promise.set_exception(std::current_exception());
308 delete this;
309 return;
310 }
311 schedule(this);
312 }
313};
314
315} // namespace internal
316
20effc67 317/// Invokes given action until it fails or given condition evaluates to true or fails.
f67539c2
TL
318///
319/// \param stop_cond a callable taking no arguments, returning a boolean that
320/// evalutes to true when you don't want to call \c action
20effc67
TL
321/// any longer. If \c stop_cond fails, the exception is propagated
322// in the returned future.
f67539c2
TL
323/// \param action a callable taking no arguments, returning a future<>. Will
324/// be called again as soon as the future resolves, unless the
20effc67 325/// future fails, or \c stop_cond returns \c true or fails.
f67539c2 326/// \return a ready future if we stopped successfully, or a failed future if
20effc67 327/// a call to to \c action or a call to \c stop_cond failed.
f67539c2
TL
328template<typename AsyncAction, typename StopCondition>
329SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> )
330inline
331future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
332 using namespace internal;
333 for (;;) {
20effc67 334 try {
f67539c2
TL
335 if (stop_cond()) {
336 return make_ready_future<>();
337 }
20effc67
TL
338 } catch (...) {
339 return current_exception_as_future();
340 }
f67539c2
TL
341 auto f = futurize_invoke(action);
342 if (f.failed()) {
343 return f;
344 }
345 if (!f.available() || need_preempt()) {
346 return [&] () noexcept {
347 memory::scoped_critical_alloc_section _;
348 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
349 auto ret = task->get_future();
350 internal::set_callback(f, task);
351 return ret;
352 }();
353 }
354 }
355}
356
357/// Invoke given action until it fails.
358///
359/// Calls \c action repeatedly until it returns a failed future.
360///
361/// \param action a callable taking no arguments, returning a \c future<>
362/// that becomes ready when you wish it to be called again.
363/// \return a future<> that will resolve to the first failure of \c action
364template<typename AsyncAction>
365SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> )
366inline
367future<> keep_doing(AsyncAction action) noexcept {
368 return repeat([action = std::move(action)] () mutable {
369 return action().then([] {
370 return stop_iteration::no;
371 });
372 });
373}
374
375namespace internal {
376template <typename Iterator, typename AsyncAction>
377class do_for_each_state final : public continuation_base<> {
378 Iterator _begin;
379 Iterator _end;
380 AsyncAction _action;
381 promise<> _pr;
382
383public:
384 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<> first_unavailable)
385 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
386 internal::set_callback(first_unavailable, this);
387 }
388 virtual void run_and_dispose() noexcept override {
389 std::unique_ptr<do_for_each_state> zis(this);
390 if (_state.failed()) {
391 _pr.set_urgent_state(std::move(_state));
392 return;
393 }
394 while (_begin != _end) {
395 auto f = futurize_invoke(_action, *_begin++);
396 if (f.failed()) {
397 f.forward_to(std::move(_pr));
398 return;
399 }
400 if (!f.available() || need_preempt()) {
401 _state = {};
402 internal::set_callback(f, this);
403 zis.release();
404 return;
405 }
406 }
407 _pr.set_value();
408 }
409 task* waiting_task() noexcept override {
410 return _pr.waiting_task();
411 }
412 future<> get_future() {
413 return _pr.get_future();
414 }
415};
416
417template<typename Iterator, typename AsyncAction>
418inline
419future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
420 while (begin != end) {
421 auto f = futurize_invoke(action, *begin++);
422 if (f.failed()) {
423 return f;
424 }
425 if (!f.available() || need_preempt()) {
426 auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
427 std::move(begin), std::move(end), std::move(action), std::move(f)};
428 return s->get_future();
429 }
430 }
431 return make_ready_future<>();
432}
433} // namespace internal
434
435/// \addtogroup future-util
436
437/// \brief Call a function for each item in a range, sequentially (iterator version).
438///
439/// For each item in a range, call a function, waiting for the previous
440/// invocation to complete before calling the next one.
441///
442/// \param begin an \c InputIterator designating the beginning of the range
443/// \param end an \c InputIterator designating the endof the range
444/// \param action a callable, taking a reference to objects from the range
445/// as a parameter, and returning a \c future<> that resolves
446/// when it is acceptable to process the next item.
447/// \return a ready future on success, or the first failed future if
448/// \c action failed.
449template<typename Iterator, typename AsyncAction>
450SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
451 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
452} )
453inline
454future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
455 try {
456 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
457 } catch (...) {
458 return current_exception_as_future();
459 }
460}
461
462/// \brief Call a function for each item in a range, sequentially (range version).
463///
464/// For each item in a range, call a function, waiting for the previous
465/// invocation to complete before calling the next one.
466///
467/// \param c an \c Container object designating input range
468/// \param action a callable, taking a reference to objects from the range
469/// as a parameter, and returning a \c future<> that resolves
470/// when it is acceptable to process the next item.
471/// \return a ready future on success, or the first failed future if
472/// \c action failed.
473template<typename Container, typename AsyncAction>
474SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
475 { futurize_invoke(aa, *c.begin()) } -> std::same_as<future<>>;
476} )
477inline
478future<> do_for_each(Container& c, AsyncAction action) noexcept {
479 try {
480 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
481 } catch (...) {
482 return current_exception_as_future();
483 }
484}
485
486namespace internal {
487
488template <typename Iterator, typename IteratorCategory>
489inline
490size_t
491iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
492 // For InputIterators we can't estimate needed capacity
493 return 0;
494}
495
496template <typename Iterator>
497inline
498size_t
499iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
500 // May be linear time below random_access_iterator_tag, but still better than reallocation
501 return std::distance(begin, end);
502}
503
504} // namespace internal
505
506/// \cond internal
507
508class parallel_for_each_state final : private continuation_base<> {
509 std::vector<future<>> _incomplete;
510 promise<> _result;
511 std::exception_ptr _ex;
512private:
513 // Wait for one of the futures in _incomplete to complete, and then
514 // decide what to do: wait for another one, or deliver _result if all
515 // are complete.
516 void wait_for_one() noexcept;
517 virtual void run_and_dispose() noexcept override;
518 task* waiting_task() noexcept override { return _result.waiting_task(); }
519public:
520 parallel_for_each_state(size_t n);
521 void add_future(future<>&& f);
522 future<> get_future();
523};
524
525/// \endcond
526
527/// \brief Run tasks in parallel (iterator version).
528///
529/// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
530/// the range, and return a future<> that resolves when all the functions
531/// complete. \c func should return a future<> that indicates when it is
532/// complete. All invocations are performed in parallel. This allows the range
533/// to refer to stack objects, but means that unlike other loops this cannot
534/// check need_preempt and can only be used with small ranges.
535///
536/// \param begin an \c InputIterator designating the beginning of the range
537/// \param end an \c InputIterator designating the end of the range
538/// \param func Function to invoke with each element in the range (returning
539/// a \c future<>)
540/// \return a \c future<> that resolves when all the function invocations
541/// complete. If one or more return an exception, the return value
542/// contains one of the exceptions.
543template <typename Iterator, typename Func>
544SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
545inline
546future<>
547parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
548 parallel_for_each_state* s = nullptr;
549 // Process all elements, giving each future the following treatment:
550 // - available, not failed: do nothing
551 // - available, failed: collect exception in ex
552 // - not available: collect in s (allocating it if needed)
553 while (begin != end) {
554 auto f = futurize_invoke(std::forward<Func>(func), *begin++);
20effc67 555 memory::scoped_critical_alloc_section _;
f67539c2
TL
556 if (!f.available() || f.failed()) {
557 if (!s) {
f67539c2
TL
558 using itraits = std::iterator_traits<Iterator>;
559 auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
560 s = new parallel_for_each_state(n);
561 }
562 s->add_future(std::move(f));
563 }
564 }
565 // If any futures were not available, hand off to parallel_for_each_state::start().
566 // Otherwise we can return a result immediately.
567 if (s) {
568 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
569 // so this isn't a leak
570 return s->get_future();
571 }
572 return make_ready_future<>();
573}
574
575/// \brief Run tasks in parallel (range version).
576///
577/// Given a \c range of objects, invoke \c func with each object
578/// in the range, and return a future<> that resolves when all
579/// the functions complete. \c func should return a future<> that indicates
580/// when it is complete. All invocations are performed in parallel. This allows
581/// the range to refer to stack objects, but means that unlike other loops this
582/// cannot check need_preempt and can only be used with small ranges.
583///
584/// \param range A range of objects to iterate run \c func on
585/// \param func A callable, accepting reference to the range's
586/// \c value_type, and returning a \c future<>.
587/// \return a \c future<> that becomes ready when the entire range
588/// was processed. If one or more of the invocations of
589/// \c func returned an exceptional future, then the return
590/// value will contain one of those exceptions.
591
592namespace internal {
593
594template <typename Range, typename Func>
595inline
596future<>
597parallel_for_each_impl(Range&& range, Func&& func) {
598 return parallel_for_each(std::begin(range), std::end(range),
599 std::forward<Func>(func));
600}
601
602} // namespace internal
603
604template <typename Range, typename Func>
605SEASTAR_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
606inline
607future<>
608parallel_for_each(Range&& range, Func&& func) noexcept {
609 auto impl = internal::parallel_for_each_impl<Range, Func>;
610 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
611}
612
613/// Run a maximum of \c max_concurrent tasks in parallel (iterator version).
614///
615/// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
616/// the range, and return a future<> that resolves when all the functions
617/// complete. \c func should return a future<> that indicates when it is
618/// complete. Up to \c max_concurrent invocations are performed in parallel.
619/// This does not allow the range to refer to stack objects. The caller
620/// must ensure that the range outlives the call to max_concurrent_for_each
621/// so it can be iterated in the background.
622///
623/// \param begin an \c InputIterator designating the beginning of the range
624/// \param end an \c InputIterator designating the end of the range
625/// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
626/// \param func Function to invoke with each element in the range (returning
627/// a \c future<>)
628/// \return a \c future<> that resolves when all the function invocations
629/// complete. If one or more return an exception, the return value
630/// contains one of the exceptions.
631template <typename Iterator, typename Func>
632SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
633inline
634future<>
635max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Func&& func) noexcept {
636 struct state {
637 Iterator begin;
638 Iterator end;
639 Func func;
640 size_t max_concurrent;
641 semaphore sem;
642 std::exception_ptr err;
643
644 state(Iterator begin_, Iterator end_, size_t max_concurrent_, Func func_)
645 : begin(std::move(begin_))
646 , end(std::move(end_))
647 , func(std::move(func_))
648 , max_concurrent(max_concurrent_)
649 , sem(max_concurrent_)
650 , err()
651 { }
652 };
653
654 assert(max_concurrent > 0);
655
656 try {
657 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
658 return do_until([&s] { return s.begin == s.end; }, [&s] {
659 return s.sem.wait().then([&s] () mutable noexcept {
660 // Possibly run in background and signal _sem when the task is done.
661 // The background tasks are waited on using _sem.
662 (void)futurize_invoke(s.func, *s.begin++).then_wrapped([&s] (future<> fut) {
663 if (fut.failed()) {
664 auto e = fut.get_exception();;
665 if (!s.err) {
666 s.err = std::move(e);
667 }
668 }
669 s.sem.signal();
670 });
671 });
672 }).then([&s] {
673 // Wait for any background task to finish
674 // and signal and semaphore
675 return s.sem.wait(s.max_concurrent);
676 }).then([&s] {
677 if (!s.err) {
678 return make_ready_future<>();
679 }
680 return seastar::make_exception_future<>(std::move(s.err));
681 });
682 });
683 } catch (...) {
684 return current_exception_as_future();
685 }
686}
687
688/// Run a maximum of \c max_concurrent tasks in parallel (range version).
689///
20effc67 690/// Given a range of objects, run \c func on each \c *i in
f67539c2
TL
691/// the range, and return a future<> that resolves when all the functions
692/// complete. \c func should return a future<> that indicates when it is
693/// complete. Up to \c max_concurrent invocations are performed in parallel.
694/// This does not allow the range to refer to stack objects. The caller
695/// must ensure that the range outlives the call to max_concurrent_for_each
696/// so it can be iterated in the background.
697///
20effc67 698/// \param range a \c Range to be processed
f67539c2
TL
699/// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
700/// \param func Function to invoke with each element in the range (returning
701/// a \c future<>)
702/// \return a \c future<> that resolves when all the function invocations
703/// complete. If one or more return an exception, the return value
704/// contains one of the exceptions.
705template <typename Range, typename Func>
706SEASTAR_CONCEPT( requires std::ranges::range<Range> && requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
707inline
708future<>
709max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
710 try {
711 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
712 } catch (...) {
713 return current_exception_as_future();
714 }
715}
716
717/// @}
718
719} // namespace seastar