]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/loop.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / loop.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23 #pragma once
24
25 #include <iterator>
26 #include <memory>
27 #include <vector>
28
29 #include <seastar/core/future.hh>
30 #include <seastar/core/task.hh>
31 #include <seastar/util/bool_class.hh>
32 #include <seastar/core/semaphore.hh>
33
34 namespace seastar {
35
36 /// \addtogroup future-util
37 /// @{
38
39 // The AsyncAction concept represents an action which can complete later than
40 // the actual function invocation. It is represented by a function which
41 // returns a future which resolves when the action is done.
42
43 struct stop_iteration_tag { };
44 using stop_iteration = bool_class<stop_iteration_tag>;
45
46 namespace internal {
47
48 template <typename AsyncAction>
49 class repeater final : public continuation_base<stop_iteration> {
50 promise<> _promise;
51 AsyncAction _action;
52 public:
53 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
54 future<> get_future() { return _promise.get_future(); }
55 task* waiting_task() noexcept override { return _promise.waiting_task(); }
56 virtual void run_and_dispose() noexcept override {
57 if (_state.failed()) {
58 _promise.set_exception(std::move(_state).get_exception());
59 delete this;
60 return;
61 } else {
62 if (_state.get0() == stop_iteration::yes) {
63 _promise.set_value();
64 delete this;
65 return;
66 }
67 _state = {};
68 }
69 try {
70 do {
71 auto f = futurize_invoke(_action);
72 if (!f.available()) {
73 internal::set_callback(f, this);
74 return;
75 }
76 if (f.get0() == stop_iteration::yes) {
77 _promise.set_value();
78 delete this;
79 return;
80 }
81 } while (!need_preempt());
82 } catch (...) {
83 _promise.set_exception(std::current_exception());
84 delete this;
85 return;
86 }
87 _state.set(stop_iteration::no);
88 schedule(this);
89 }
90 };
91
92 } // namespace internal
93
94 // Delete these overloads so that the actual implementation can use a
95 // universal reference but still reject lvalue references.
96 template<typename AsyncAction>
97 future<> repeat(const AsyncAction& action) noexcept = delete;
98 template<typename AsyncAction>
99 future<> repeat(AsyncAction& action) noexcept = delete;
100
101 /// Invokes given action until it fails or the function requests iteration to stop by returning
102 /// \c stop_iteration::yes.
103 ///
104 /// \param action a callable taking no arguments, returning a future<stop_iteration>. Will
105 /// be called again as soon as the future resolves, unless the
106 /// future fails, action throws, or it resolves with \c stop_iteration::yes.
107 /// If \c action is an r-value it can be moved in the middle of iteration.
108 /// \return a ready future if we stopped successfully, or a failed future if
109 /// a call to to \c action failed.
110 template<typename AsyncAction>
111 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> )
112 inline
113 future<> repeat(AsyncAction&& action) noexcept {
114 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
115 static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
116 for (;;) {
117 // Do not type-erase here in case this is a short repeat()
118 auto f = futurator::invoke(action);
119
120 if (!f.available() || f.failed() || need_preempt()) {
121 return [&] () noexcept {
122 memory::scoped_critical_alloc_section _;
123 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
124 auto ret = repeater->get_future();
125 internal::set_callback(f, repeater);
126 return ret;
127 }();
128 }
129
130 if (f.get0() == stop_iteration::yes) {
131 return make_ready_future<>();
132 }
133 }
134 }
135
136 /// \cond internal
137
138 template <typename T>
139 struct repeat_until_value_type_helper;
140
141 /// Type helper for repeat_until_value()
142 template <typename T>
143 struct repeat_until_value_type_helper<future<std::optional<T>>> {
144 /// The type of the value we are computing
145 using value_type = T;
146 /// Type used by \c AsyncAction while looping
147 using optional_type = std::optional<T>;
148 /// Return type of repeat_until_value()
149 using future_type = future<value_type>;
150 };
151
152 /// Return value of repeat_until_value()
153 template <typename AsyncAction>
154 using repeat_until_value_return_type
155 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
156
157 /// \endcond
158
159 namespace internal {
160
161 template <typename AsyncAction, typename T>
162 class repeat_until_value_state final : public continuation_base<std::optional<T>> {
163 promise<T> _promise;
164 AsyncAction _action;
165 public:
166 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
167 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
168 this->_state.set(std::move(st));
169 }
170 future<T> get_future() { return _promise.get_future(); }
171 task* waiting_task() noexcept override { return _promise.waiting_task(); }
172 virtual void run_and_dispose() noexcept override {
173 if (this->_state.failed()) {
174 _promise.set_exception(std::move(this->_state).get_exception());
175 delete this;
176 return;
177 } else {
178 auto v = std::move(this->_state).get0();
179 if (v) {
180 _promise.set_value(std::move(*v));
181 delete this;
182 return;
183 }
184 this->_state = {};
185 }
186 try {
187 do {
188 auto f = futurize_invoke(_action);
189 if (!f.available()) {
190 internal::set_callback(f, this);
191 return;
192 }
193 auto ret = f.get0();
194 if (ret) {
195 _promise.set_value(std::move(*ret));
196 delete this;
197 return;
198 }
199 } while (!need_preempt());
200 } catch (...) {
201 _promise.set_exception(std::current_exception());
202 delete this;
203 return;
204 }
205 this->_state.set(std::nullopt);
206 schedule(this);
207 }
208 };
209
210 } // namespace internal
211
212 /// Invokes given action until it fails or the function requests iteration to stop by returning
213 /// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted
214 /// from the \c optional, and returned, as a future, from repeat_until_value().
215 ///
216 /// \param action a callable taking no arguments, returning a future<std::optional<T>>
217 /// or std::optional<T>. Will be called again as soon as the future
218 /// resolves, unless the future fails, action throws, or it resolves with
219 /// an engaged \c optional. If \c action is an r-value it can be moved
220 /// in the middle of iteration.
221 /// \return a ready future if we stopped successfully, or a failed future if
222 /// a call to to \c action failed. The \c optional's value is returned.
223 template<typename AsyncAction>
224 SEASTAR_CONCEPT( requires requires (AsyncAction aa) {
225 bool(futurize_invoke(aa).get0());
226 futurize_invoke(aa).get0().value();
227 } )
228 repeat_until_value_return_type<AsyncAction>
229 repeat_until_value(AsyncAction action) noexcept {
230 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
231 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
232 // the "T" in the documentation
233 using value_type = typename type_helper::value_type;
234 using optional_type = typename type_helper::optional_type;
235 do {
236 auto f = futurator::invoke(action);
237
238 if (!f.available()) {
239 return [&] () noexcept {
240 memory::scoped_critical_alloc_section _;
241 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
242 auto ret = state->get_future();
243 internal::set_callback(f, state);
244 return ret;
245 }();
246 }
247
248 if (f.failed()) {
249 return make_exception_future<value_type>(f.get_exception());
250 }
251
252 optional_type&& optional = std::move(f).get0();
253 if (optional) {
254 return make_ready_future<value_type>(std::move(optional.value()));
255 }
256 } while (!need_preempt());
257
258 try {
259 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
260 auto f = state->get_future();
261 schedule(state);
262 return f;
263 } catch (...) {
264 return make_exception_future<value_type>(std::current_exception());
265 }
266 }
267
268 namespace internal {
269
270 template <typename StopCondition, typename AsyncAction>
271 class do_until_state final : public continuation_base<> {
272 promise<> _promise;
273 StopCondition _stop;
274 AsyncAction _action;
275 public:
276 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
277 future<> get_future() { return _promise.get_future(); }
278 task* waiting_task() noexcept override { return _promise.waiting_task(); }
279 virtual void run_and_dispose() noexcept override {
280 if (_state.available()) {
281 if (_state.failed()) {
282 _promise.set_urgent_state(std::move(_state));
283 delete this;
284 return;
285 }
286 _state = {}; // allow next cycle to overrun state
287 }
288 try {
289 do {
290 if (_stop()) {
291 _promise.set_value();
292 delete this;
293 return;
294 }
295 auto f = _action();
296 if (!f.available()) {
297 internal::set_callback(f, this);
298 return;
299 }
300 if (f.failed()) {
301 f.forward_to(std::move(_promise));
302 delete this;
303 return;
304 }
305 } while (!need_preempt());
306 } catch (...) {
307 _promise.set_exception(std::current_exception());
308 delete this;
309 return;
310 }
311 schedule(this);
312 }
313 };
314
315 } // namespace internal
316
317 /// Invokes given action until it fails or given condition evaluates to true or fails.
318 ///
319 /// \param stop_cond a callable taking no arguments, returning a boolean that
320 /// evalutes to true when you don't want to call \c action
321 /// any longer. If \c stop_cond fails, the exception is propagated
322 // in the returned future.
323 /// \param action a callable taking no arguments, returning a future<>. Will
324 /// be called again as soon as the future resolves, unless the
325 /// future fails, or \c stop_cond returns \c true or fails.
326 /// \return a ready future if we stopped successfully, or a failed future if
327 /// a call to to \c action or a call to \c stop_cond failed.
328 template<typename AsyncAction, typename StopCondition>
329 SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> )
330 inline
331 future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
332 using namespace internal;
333 for (;;) {
334 try {
335 if (stop_cond()) {
336 return make_ready_future<>();
337 }
338 } catch (...) {
339 return current_exception_as_future();
340 }
341 auto f = futurize_invoke(action);
342 if (f.failed()) {
343 return f;
344 }
345 if (!f.available() || need_preempt()) {
346 return [&] () noexcept {
347 memory::scoped_critical_alloc_section _;
348 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
349 auto ret = task->get_future();
350 internal::set_callback(f, task);
351 return ret;
352 }();
353 }
354 }
355 }
356
357 /// Invoke given action until it fails.
358 ///
359 /// Calls \c action repeatedly until it returns a failed future.
360 ///
361 /// \param action a callable taking no arguments, returning a \c future<>
362 /// that becomes ready when you wish it to be called again.
363 /// \return a future<> that will resolve to the first failure of \c action
364 template<typename AsyncAction>
365 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> )
366 inline
367 future<> keep_doing(AsyncAction action) noexcept {
368 return repeat([action = std::move(action)] () mutable {
369 return action().then([] {
370 return stop_iteration::no;
371 });
372 });
373 }
374
375 namespace internal {
376 template <typename Iterator, typename AsyncAction>
377 class do_for_each_state final : public continuation_base<> {
378 Iterator _begin;
379 Iterator _end;
380 AsyncAction _action;
381 promise<> _pr;
382
383 public:
384 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<> first_unavailable)
385 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
386 internal::set_callback(first_unavailable, this);
387 }
388 virtual void run_and_dispose() noexcept override {
389 std::unique_ptr<do_for_each_state> zis(this);
390 if (_state.failed()) {
391 _pr.set_urgent_state(std::move(_state));
392 return;
393 }
394 while (_begin != _end) {
395 auto f = futurize_invoke(_action, *_begin++);
396 if (f.failed()) {
397 f.forward_to(std::move(_pr));
398 return;
399 }
400 if (!f.available() || need_preempt()) {
401 _state = {};
402 internal::set_callback(f, this);
403 zis.release();
404 return;
405 }
406 }
407 _pr.set_value();
408 }
409 task* waiting_task() noexcept override {
410 return _pr.waiting_task();
411 }
412 future<> get_future() {
413 return _pr.get_future();
414 }
415 };
416
417 template<typename Iterator, typename AsyncAction>
418 inline
419 future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
420 while (begin != end) {
421 auto f = futurize_invoke(action, *begin++);
422 if (f.failed()) {
423 return f;
424 }
425 if (!f.available() || need_preempt()) {
426 auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
427 std::move(begin), std::move(end), std::move(action), std::move(f)};
428 return s->get_future();
429 }
430 }
431 return make_ready_future<>();
432 }
433 } // namespace internal
434
435 /// \addtogroup future-util
436
437 /// \brief Call a function for each item in a range, sequentially (iterator version).
438 ///
439 /// For each item in a range, call a function, waiting for the previous
440 /// invocation to complete before calling the next one.
441 ///
442 /// \param begin an \c InputIterator designating the beginning of the range
443 /// \param end an \c InputIterator designating the endof the range
444 /// \param action a callable, taking a reference to objects from the range
445 /// as a parameter, and returning a \c future<> that resolves
446 /// when it is acceptable to process the next item.
447 /// \return a ready future on success, or the first failed future if
448 /// \c action failed.
449 template<typename Iterator, typename AsyncAction>
450 SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
451 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
452 } )
453 inline
454 future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
455 try {
456 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
457 } catch (...) {
458 return current_exception_as_future();
459 }
460 }
461
462 /// \brief Call a function for each item in a range, sequentially (range version).
463 ///
464 /// For each item in a range, call a function, waiting for the previous
465 /// invocation to complete before calling the next one.
466 ///
467 /// \param c an \c Container object designating input range
468 /// \param action a callable, taking a reference to objects from the range
469 /// as a parameter, and returning a \c future<> that resolves
470 /// when it is acceptable to process the next item.
471 /// \return a ready future on success, or the first failed future if
472 /// \c action failed.
473 template<typename Container, typename AsyncAction>
474 SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
475 { futurize_invoke(aa, *c.begin()) } -> std::same_as<future<>>;
476 } )
477 inline
478 future<> do_for_each(Container& c, AsyncAction action) noexcept {
479 try {
480 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
481 } catch (...) {
482 return current_exception_as_future();
483 }
484 }
485
486 namespace internal {
487
488 template <typename Iterator, typename IteratorCategory>
489 inline
490 size_t
491 iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
492 // For InputIterators we can't estimate needed capacity
493 return 0;
494 }
495
496 template <typename Iterator>
497 inline
498 size_t
499 iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
500 // May be linear time below random_access_iterator_tag, but still better than reallocation
501 return std::distance(begin, end);
502 }
503
504 } // namespace internal
505
506 /// \cond internal
507
508 class parallel_for_each_state final : private continuation_base<> {
509 std::vector<future<>> _incomplete;
510 promise<> _result;
511 std::exception_ptr _ex;
512 private:
513 // Wait for one of the futures in _incomplete to complete, and then
514 // decide what to do: wait for another one, or deliver _result if all
515 // are complete.
516 void wait_for_one() noexcept;
517 virtual void run_and_dispose() noexcept override;
518 task* waiting_task() noexcept override { return _result.waiting_task(); }
519 public:
520 parallel_for_each_state(size_t n);
521 void add_future(future<>&& f);
522 future<> get_future();
523 };
524
525 /// \endcond
526
527 /// \brief Run tasks in parallel (iterator version).
528 ///
529 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
530 /// the range, and return a future<> that resolves when all the functions
531 /// complete. \c func should return a future<> that indicates when it is
532 /// complete. All invocations are performed in parallel. This allows the range
533 /// to refer to stack objects, but means that unlike other loops this cannot
534 /// check need_preempt and can only be used with small ranges.
535 ///
536 /// \param begin an \c InputIterator designating the beginning of the range
537 /// \param end an \c InputIterator designating the end of the range
538 /// \param func Function to invoke with each element in the range (returning
539 /// a \c future<>)
540 /// \return a \c future<> that resolves when all the function invocations
541 /// complete. If one or more return an exception, the return value
542 /// contains one of the exceptions.
543 template <typename Iterator, typename Func>
544 SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
545 inline
546 future<>
547 parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
548 parallel_for_each_state* s = nullptr;
549 // Process all elements, giving each future the following treatment:
550 // - available, not failed: do nothing
551 // - available, failed: collect exception in ex
552 // - not available: collect in s (allocating it if needed)
553 while (begin != end) {
554 auto f = futurize_invoke(std::forward<Func>(func), *begin++);
555 memory::scoped_critical_alloc_section _;
556 if (!f.available() || f.failed()) {
557 if (!s) {
558 using itraits = std::iterator_traits<Iterator>;
559 auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
560 s = new parallel_for_each_state(n);
561 }
562 s->add_future(std::move(f));
563 }
564 }
565 // If any futures were not available, hand off to parallel_for_each_state::start().
566 // Otherwise we can return a result immediately.
567 if (s) {
568 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
569 // so this isn't a leak
570 return s->get_future();
571 }
572 return make_ready_future<>();
573 }
574
575 /// \brief Run tasks in parallel (range version).
576 ///
577 /// Given a \c range of objects, invoke \c func with each object
578 /// in the range, and return a future<> that resolves when all
579 /// the functions complete. \c func should return a future<> that indicates
580 /// when it is complete. All invocations are performed in parallel. This allows
581 /// the range to refer to stack objects, but means that unlike other loops this
582 /// cannot check need_preempt and can only be used with small ranges.
583 ///
584 /// \param range A range of objects to iterate run \c func on
585 /// \param func A callable, accepting reference to the range's
586 /// \c value_type, and returning a \c future<>.
587 /// \return a \c future<> that becomes ready when the entire range
588 /// was processed. If one or more of the invocations of
589 /// \c func returned an exceptional future, then the return
590 /// value will contain one of those exceptions.
591
592 namespace internal {
593
594 template <typename Range, typename Func>
595 inline
596 future<>
597 parallel_for_each_impl(Range&& range, Func&& func) {
598 return parallel_for_each(std::begin(range), std::end(range),
599 std::forward<Func>(func));
600 }
601
602 } // namespace internal
603
604 template <typename Range, typename Func>
605 SEASTAR_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
606 inline
607 future<>
608 parallel_for_each(Range&& range, Func&& func) noexcept {
609 auto impl = internal::parallel_for_each_impl<Range, Func>;
610 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
611 }
612
613 /// Run a maximum of \c max_concurrent tasks in parallel (iterator version).
614 ///
615 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
616 /// the range, and return a future<> that resolves when all the functions
617 /// complete. \c func should return a future<> that indicates when it is
618 /// complete. Up to \c max_concurrent invocations are performed in parallel.
619 /// This does not allow the range to refer to stack objects. The caller
620 /// must ensure that the range outlives the call to max_concurrent_for_each
621 /// so it can be iterated in the background.
622 ///
623 /// \param begin an \c InputIterator designating the beginning of the range
624 /// \param end an \c InputIterator designating the end of the range
625 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
626 /// \param func Function to invoke with each element in the range (returning
627 /// a \c future<>)
628 /// \return a \c future<> that resolves when all the function invocations
629 /// complete. If one or more return an exception, the return value
630 /// contains one of the exceptions.
631 template <typename Iterator, typename Func>
632 SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
633 inline
634 future<>
635 max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Func&& func) noexcept {
636 struct state {
637 Iterator begin;
638 Iterator end;
639 Func func;
640 size_t max_concurrent;
641 semaphore sem;
642 std::exception_ptr err;
643
644 state(Iterator begin_, Iterator end_, size_t max_concurrent_, Func func_)
645 : begin(std::move(begin_))
646 , end(std::move(end_))
647 , func(std::move(func_))
648 , max_concurrent(max_concurrent_)
649 , sem(max_concurrent_)
650 , err()
651 { }
652 };
653
654 assert(max_concurrent > 0);
655
656 try {
657 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
658 return do_until([&s] { return s.begin == s.end; }, [&s] {
659 return s.sem.wait().then([&s] () mutable noexcept {
660 // Possibly run in background and signal _sem when the task is done.
661 // The background tasks are waited on using _sem.
662 (void)futurize_invoke(s.func, *s.begin++).then_wrapped([&s] (future<> fut) {
663 if (fut.failed()) {
664 auto e = fut.get_exception();;
665 if (!s.err) {
666 s.err = std::move(e);
667 }
668 }
669 s.sem.signal();
670 });
671 });
672 }).then([&s] {
673 // Wait for any background task to finish
674 // and signal and semaphore
675 return s.sem.wait(s.max_concurrent);
676 }).then([&s] {
677 if (!s.err) {
678 return make_ready_future<>();
679 }
680 return seastar::make_exception_future<>(std::move(s.err));
681 });
682 });
683 } catch (...) {
684 return current_exception_as_future();
685 }
686 }
687
688 /// Run a maximum of \c max_concurrent tasks in parallel (range version).
689 ///
690 /// Given a range of objects, run \c func on each \c *i in
691 /// the range, and return a future<> that resolves when all the functions
692 /// complete. \c func should return a future<> that indicates when it is
693 /// complete. Up to \c max_concurrent invocations are performed in parallel.
694 /// This does not allow the range to refer to stack objects. The caller
695 /// must ensure that the range outlives the call to max_concurrent_for_each
696 /// so it can be iterated in the background.
697 ///
698 /// \param range a \c Range to be processed
699 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
700 /// \param func Function to invoke with each element in the range (returning
701 /// a \c future<>)
702 /// \return a \c future<> that resolves when all the function invocations
703 /// complete. If one or more return an exception, the return value
704 /// contains one of the exceptions.
705 template <typename Range, typename Func>
706 SEASTAR_CONCEPT( requires std::ranges::range<Range> && requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
707 inline
708 future<>
709 max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
710 try {
711 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
712 } catch (...) {
713 return current_exception_as_future();
714 }
715 }
716
717 /// @}
718
719 } // namespace seastar