]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/loop.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / loop.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23 #pragma once
24
25 #include <cstddef>
26 #include <iterator>
27 #include <memory>
28 #include <type_traits>
29 #include <vector>
30
31 #include <seastar/core/future.hh>
32 #include <seastar/core/task.hh>
33 #include <seastar/util/bool_class.hh>
34 #include <seastar/core/semaphore.hh>
35
36 namespace seastar {
37
38 /// \addtogroup future-util
39 /// @{
40
41 // The AsyncAction concept represents an action which can complete later than
42 // the actual function invocation. It is represented by a function which
43 // returns a future which resolves when the action is done.
44
45 struct stop_iteration_tag { };
46 using stop_iteration = bool_class<stop_iteration_tag>;
47
48 namespace internal {
49
50 template <typename AsyncAction>
51 class repeater final : public continuation_base<stop_iteration> {
52 promise<> _promise;
53 AsyncAction _action;
54 public:
55 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
56 future<> get_future() { return _promise.get_future(); }
57 task* waiting_task() noexcept override { return _promise.waiting_task(); }
58 virtual void run_and_dispose() noexcept override {
59 if (_state.failed()) {
60 _promise.set_exception(std::move(_state).get_exception());
61 delete this;
62 return;
63 } else {
64 if (_state.get0() == stop_iteration::yes) {
65 _promise.set_value();
66 delete this;
67 return;
68 }
69 _state = {};
70 }
71 try {
72 do {
73 auto f = futurize_invoke(_action);
74 if (!f.available()) {
75 internal::set_callback(std::move(f), this);
76 return;
77 }
78 if (f.get0() == stop_iteration::yes) {
79 _promise.set_value();
80 delete this;
81 return;
82 }
83 } while (!need_preempt());
84 } catch (...) {
85 _promise.set_exception(std::current_exception());
86 delete this;
87 return;
88 }
89 _state.set(stop_iteration::no);
90 schedule(this);
91 }
92 };
93
94 } // namespace internal
95
96 // Delete these overloads so that the actual implementation can use a
97 // universal reference but still reject lvalue references.
98 template<typename AsyncAction>
99 future<> repeat(const AsyncAction& action) noexcept = delete;
100 template<typename AsyncAction>
101 future<> repeat(AsyncAction& action) noexcept = delete;
102
103 /// Invokes given action until it fails or the function requests iteration to stop by returning
104 /// \c stop_iteration::yes.
105 ///
106 /// \param action a callable taking no arguments, returning a future<stop_iteration>. Will
107 /// be called again as soon as the future resolves, unless the
108 /// future fails, action throws, or it resolves with \c stop_iteration::yes.
109 /// If \c action is an r-value it can be moved in the middle of iteration.
110 /// \return a ready future if we stopped successfully, or a failed future if
111 /// a call to to \c action failed.
112 template<typename AsyncAction>
113 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> )
114 inline
115 future<> repeat(AsyncAction&& action) noexcept {
116 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
117 static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
118 for (;;) {
119 // Do not type-erase here in case this is a short repeat()
120 auto f = futurator::invoke(action);
121
122 if (!f.available() || f.failed() || need_preempt()) {
123 return [&] () noexcept {
124 memory::scoped_critical_alloc_section _;
125 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
126 auto ret = repeater->get_future();
127 internal::set_callback(std::move(f), repeater);
128 return ret;
129 }();
130 }
131
132 if (f.get0() == stop_iteration::yes) {
133 return make_ready_future<>();
134 }
135 }
136 }
137
138 /// \cond internal
139
140 template <typename T>
141 struct repeat_until_value_type_helper;
142
143 /// Type helper for repeat_until_value()
144 template <typename T>
145 struct repeat_until_value_type_helper<future<std::optional<T>>> {
146 /// The type of the value we are computing
147 using value_type = T;
148 /// Type used by \c AsyncAction while looping
149 using optional_type = std::optional<T>;
150 /// Return type of repeat_until_value()
151 using future_type = future<value_type>;
152 };
153
154 /// Return value of repeat_until_value()
155 template <typename AsyncAction>
156 using repeat_until_value_return_type
157 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
158
159 /// \endcond
160
161 namespace internal {
162
163 template <typename AsyncAction, typename T>
164 class repeat_until_value_state final : public continuation_base<std::optional<T>> {
165 promise<T> _promise;
166 AsyncAction _action;
167 public:
168 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
169 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
170 this->_state.set(std::move(st));
171 }
172 future<T> get_future() { return _promise.get_future(); }
173 task* waiting_task() noexcept override { return _promise.waiting_task(); }
174 virtual void run_and_dispose() noexcept override {
175 if (this->_state.failed()) {
176 _promise.set_exception(std::move(this->_state).get_exception());
177 delete this;
178 return;
179 } else {
180 auto v = std::move(this->_state).get0();
181 if (v) {
182 _promise.set_value(std::move(*v));
183 delete this;
184 return;
185 }
186 this->_state = {};
187 }
188 try {
189 do {
190 auto f = futurize_invoke(_action);
191 if (!f.available()) {
192 internal::set_callback(std::move(f), this);
193 return;
194 }
195 auto ret = f.get0();
196 if (ret) {
197 _promise.set_value(std::move(*ret));
198 delete this;
199 return;
200 }
201 } while (!need_preempt());
202 } catch (...) {
203 _promise.set_exception(std::current_exception());
204 delete this;
205 return;
206 }
207 this->_state.set(std::nullopt);
208 schedule(this);
209 }
210 };
211
212 } // namespace internal
213
214 /// Invokes given action until it fails or the function requests iteration to stop by returning
215 /// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted
216 /// from the \c optional, and returned, as a future, from repeat_until_value().
217 ///
218 /// \param action a callable taking no arguments, returning a future<std::optional<T>>
219 /// or std::optional<T>. Will be called again as soon as the future
220 /// resolves, unless the future fails, action throws, or it resolves with
221 /// an engaged \c optional. If \c action is an r-value it can be moved
222 /// in the middle of iteration.
223 /// \return a ready future if we stopped successfully, or a failed future if
224 /// a call to to \c action failed. The \c optional's value is returned.
225 template<typename AsyncAction>
226 SEASTAR_CONCEPT( requires requires (AsyncAction aa) {
227 bool(futurize_invoke(aa).get0());
228 futurize_invoke(aa).get0().value();
229 } )
230 repeat_until_value_return_type<AsyncAction>
231 repeat_until_value(AsyncAction action) noexcept {
232 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
233 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
234 // the "T" in the documentation
235 using value_type = typename type_helper::value_type;
236 using optional_type = typename type_helper::optional_type;
237 do {
238 auto f = futurator::invoke(action);
239
240 if (!f.available()) {
241 return [&] () noexcept {
242 memory::scoped_critical_alloc_section _;
243 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
244 auto ret = state->get_future();
245 internal::set_callback(std::move(f), state);
246 return ret;
247 }();
248 }
249
250 if (f.failed()) {
251 return make_exception_future<value_type>(f.get_exception());
252 }
253
254 optional_type&& optional = std::move(f).get0();
255 if (optional) {
256 return make_ready_future<value_type>(std::move(optional.value()));
257 }
258 } while (!need_preempt());
259
260 try {
261 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
262 auto f = state->get_future();
263 schedule(state);
264 return f;
265 } catch (...) {
266 return make_exception_future<value_type>(std::current_exception());
267 }
268 }
269
270 namespace internal {
271
272 template <typename StopCondition, typename AsyncAction>
273 class do_until_state final : public continuation_base<> {
274 promise<> _promise;
275 StopCondition _stop;
276 AsyncAction _action;
277 public:
278 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
279 future<> get_future() { return _promise.get_future(); }
280 task* waiting_task() noexcept override { return _promise.waiting_task(); }
281 virtual void run_and_dispose() noexcept override {
282 if (_state.available()) {
283 if (_state.failed()) {
284 _promise.set_urgent_state(std::move(_state));
285 delete this;
286 return;
287 }
288 _state = {}; // allow next cycle to overrun state
289 }
290 try {
291 do {
292 if (_stop()) {
293 _promise.set_value();
294 delete this;
295 return;
296 }
297 auto f = _action();
298 if (!f.available()) {
299 internal::set_callback(std::move(f), this);
300 return;
301 }
302 if (f.failed()) {
303 f.forward_to(std::move(_promise));
304 delete this;
305 return;
306 }
307 } while (!need_preempt());
308 } catch (...) {
309 _promise.set_exception(std::current_exception());
310 delete this;
311 return;
312 }
313 schedule(this);
314 }
315 };
316
317 } // namespace internal
318
319 /// Invokes given action until it fails or given condition evaluates to true or fails.
320 ///
321 /// \param stop_cond a callable taking no arguments, returning a boolean that
322 /// evalutes to true when you don't want to call \c action
323 /// any longer. If \c stop_cond fails, the exception is propagated
324 // in the returned future.
325 /// \param action a callable taking no arguments, returning a future<>. Will
326 /// be called again as soon as the future resolves, unless the
327 /// future fails, or \c stop_cond returns \c true or fails.
328 /// \return a ready future if we stopped successfully, or a failed future if
329 /// a call to to \c action or a call to \c stop_cond failed.
330 template<typename AsyncAction, typename StopCondition>
331 SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> )
332 inline
333 future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
334 using namespace internal;
335 for (;;) {
336 try {
337 if (stop_cond()) {
338 return make_ready_future<>();
339 }
340 } catch (...) {
341 return current_exception_as_future();
342 }
343 auto f = futurize_invoke(action);
344 if (f.failed()) {
345 return f;
346 }
347 if (!f.available() || need_preempt()) {
348 return [&] () noexcept {
349 memory::scoped_critical_alloc_section _;
350 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
351 auto ret = task->get_future();
352 internal::set_callback(std::move(f), task);
353 return ret;
354 }();
355 }
356 }
357 }
358
359 /// Invoke given action until it fails.
360 ///
361 /// Calls \c action repeatedly until it returns a failed future.
362 ///
363 /// \param action a callable taking no arguments, returning a \c future<>
364 /// that becomes ready when you wish it to be called again.
365 /// \return a future<> that will resolve to the first failure of \c action
366 template<typename AsyncAction>
367 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> )
368 inline
369 future<> keep_doing(AsyncAction action) noexcept {
370 return repeat([action = std::move(action)] () mutable {
371 return action().then([] {
372 return stop_iteration::no;
373 });
374 });
375 }
376
377 namespace internal {
378 template <typename Iterator, typename AsyncAction>
379 class do_for_each_state final : public continuation_base<> {
380 Iterator _begin;
381 Iterator _end;
382 AsyncAction _action;
383 promise<> _pr;
384
385 public:
386 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
387 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
388 internal::set_callback(std::move(first_unavailable), this);
389 }
390 virtual void run_and_dispose() noexcept override {
391 std::unique_ptr<do_for_each_state> zis(this);
392 if (_state.failed()) {
393 _pr.set_urgent_state(std::move(_state));
394 return;
395 }
396 while (_begin != _end) {
397 auto f = futurize_invoke(_action, *_begin++);
398 if (f.failed()) {
399 f.forward_to(std::move(_pr));
400 return;
401 }
402 if (!f.available() || need_preempt()) {
403 _state = {};
404 internal::set_callback(std::move(f), this);
405 zis.release();
406 return;
407 }
408 }
409 _pr.set_value();
410 }
411 task* waiting_task() noexcept override {
412 return _pr.waiting_task();
413 }
414 future<> get_future() {
415 return _pr.get_future();
416 }
417 };
418
419 template<typename Iterator, typename AsyncAction>
420 inline
421 future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
422 while (begin != end) {
423 auto f = futurize_invoke(action, *begin++);
424 if (f.failed()) {
425 return f;
426 }
427 if (!f.available() || need_preempt()) {
428 auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
429 std::move(begin), std::move(end), std::move(action), std::move(f)};
430 return s->get_future();
431 }
432 }
433 return make_ready_future<>();
434 }
435 } // namespace internal
436
437 /// \addtogroup future-util
438
439 /// \brief Call a function for each item in a range, sequentially (iterator version).
440 ///
441 /// For each item in a range, call a function, waiting for the previous
442 /// invocation to complete before calling the next one.
443 ///
444 /// \param begin an \c InputIterator designating the beginning of the range
445 /// \param end an \c InputIterator designating the endof the range
446 /// \param action a callable, taking a reference to objects from the range
447 /// as a parameter, and returning a \c future<> that resolves
448 /// when it is acceptable to process the next item.
449 /// \return a ready future on success, or the first failed future if
450 /// \c action failed.
451 template<typename Iterator, typename AsyncAction>
452 SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
453 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
454 } )
455 inline
456 future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
457 try {
458 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
459 } catch (...) {
460 return current_exception_as_future();
461 }
462 }
463
464 /// \brief Call a function for each item in a range, sequentially (range version).
465 ///
466 /// For each item in a range, call a function, waiting for the previous
467 /// invocation to complete before calling the next one.
468 ///
469 /// \param c an \c Container object designating input range
470 /// \param action a callable, taking a reference to objects from the range
471 /// as a parameter, and returning a \c future<> that resolves
472 /// when it is acceptable to process the next item.
473 /// \return a ready future on success, or the first failed future if
474 /// \c action failed.
475 template<typename Container, typename AsyncAction>
476 SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
477 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
478 std::end(c);
479 } )
480 inline
481 future<> do_for_each(Container& c, AsyncAction action) noexcept {
482 try {
483 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
484 } catch (...) {
485 return current_exception_as_future();
486 }
487 }
488
489 namespace internal {
490
491 template <typename T, typename = void>
492 struct has_iterator_category : std::false_type {};
493
494 template <typename T>
495 struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
496
497 template <typename Iterator, typename Sentinel, typename IteratorCategory>
498 inline
499 size_t
500 iterator_range_estimate_vector_capacity(Iterator const&, Sentinel const&, IteratorCategory) {
501 // For InputIterators we can't estimate needed capacity
502 return 0;
503 }
504
505 template <typename Iterator, typename Sentinel>
506 inline
507 size_t
508 iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, std::forward_iterator_tag) {
509 // May be linear time below random_access_iterator_tag, but still better than reallocation
510 return std::distance(begin, end);
511 }
512
513 } // namespace internal
514
515 /// \cond internal
516
517 class parallel_for_each_state final : private continuation_base<> {
518 std::vector<future<>> _incomplete;
519 promise<> _result;
520 std::exception_ptr _ex;
521 private:
522 // Wait for one of the futures in _incomplete to complete, and then
523 // decide what to do: wait for another one, or deliver _result if all
524 // are complete.
525 void wait_for_one() noexcept;
526 virtual void run_and_dispose() noexcept override;
527 task* waiting_task() noexcept override { return _result.waiting_task(); }
528 public:
529 parallel_for_each_state(size_t n);
530 void add_future(future<>&& f);
531 future<> get_future();
532 };
533
534 /// \endcond
535
536 /// \brief Run tasks in parallel (iterator version).
537 ///
538 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
539 /// the range, and return a future<> that resolves when all the functions
540 /// complete. \c func should return a future<> that indicates when it is
541 /// complete. All invocations are performed in parallel. This allows the range
542 /// to refer to stack objects, but means that unlike other loops this cannot
543 /// check need_preempt and can only be used with small ranges.
544 ///
545 /// \param begin an \c InputIterator designating the beginning of the range
546 /// \param end an \c InputIterator designating the end of the range
547 /// \param func Function to invoke with each element in the range (returning
548 /// a \c future<>)
549 /// \return a \c future<> that resolves when all the function invocations
550 /// complete. If one or more return an exception, the return value
551 /// contains one of the exceptions.
552 /// \note parallel_for_each() schedules all invocations of \c func on the
553 /// current shard. If you want to run a function on all shards in parallel,
554 /// have a look at \ref smp::invoke_on_all() instead.
555 template <typename Iterator, typename Sentinel, typename Func>
556 SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)))
557 // We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
558 // which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
559 // break legacy code, for which it holds that Sentinel equals Iterator.
560 inline
561 future<>
562 parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
563 parallel_for_each_state* s = nullptr;
564 // Process all elements, giving each future the following treatment:
565 // - available, not failed: do nothing
566 // - available, failed: collect exception in ex
567 // - not available: collect in s (allocating it if needed)
568 while (begin != end) {
569 auto f = futurize_invoke(std::forward<Func>(func), *begin);
570 ++begin;
571 memory::scoped_critical_alloc_section _;
572 if (!f.available() || f.failed()) {
573 if (!s) {
574 using itraits = std::iterator_traits<Iterator>;
575 size_t n{0U};
576 if constexpr (internal::has_iterator_category<Iterator>::value) {
577 // We need if-constexpr here because there exist iterators for which std::iterator_traits
578 // does not have 'iterator_category' as member type
579 n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category{}) + 1);
580 }
581 s = new parallel_for_each_state(n);
582 }
583 s->add_future(std::move(f));
584 }
585 }
586 // If any futures were not available, hand off to parallel_for_each_state::start().
587 // Otherwise we can return a result immediately.
588 if (s) {
589 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
590 // so this isn't a leak
591 return s->get_future();
592 }
593 return make_ready_future<>();
594 }
595
596 /// \brief Run tasks in parallel (range version).
597 ///
598 /// Given a \c range of objects, invoke \c func with each object
599 /// in the range, and return a future<> that resolves when all
600 /// the functions complete. \c func should return a future<> that indicates
601 /// when it is complete. All invocations are performed in parallel. This allows
602 /// the range to refer to stack objects, but means that unlike other loops this
603 /// cannot check need_preempt and can only be used with small ranges.
604 ///
605 /// \param range A range of objects to iterate run \c func on
606 /// \param func A callable, accepting reference to the range's
607 /// \c value_type, and returning a \c future<>.
608 /// \return a \c future<> that becomes ready when the entire range
609 /// was processed. If one or more of the invocations of
610 /// \c func returned an exceptional future, then the return
611 /// value will contain one of those exceptions.
612 /// \note parallel_for_each() schedules all invocations of \c func on the
613 /// current shard. If you want to run a function on all shards in parallel,
614 /// have a look at \ref smp::invoke_on_all() instead.
615
616 namespace internal {
617
618 template <typename Range, typename Func>
619 inline
620 future<>
621 parallel_for_each_impl(Range&& range, Func&& func) {
622 return parallel_for_each(std::begin(range), std::end(range),
623 std::forward<Func>(func));
624 }
625
626 } // namespace internal
627
628 template <typename Range, typename Func>
629 SEASTAR_CONCEPT( requires requires (Func f, Range r) {
630 { f(*std::begin(r)) } -> std::same_as<future<>>;
631 std::end(r);
632 } )
633 inline
634 future<>
635 parallel_for_each(Range&& range, Func&& func) noexcept {
636 auto impl = internal::parallel_for_each_impl<Range, Func>;
637 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
638 }
639
640 /// Run a maximum of \c max_concurrent tasks in parallel (iterator version).
641 ///
642 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
643 /// the range, and return a future<> that resolves when all the functions
644 /// complete. \c func should return a future<> that indicates when it is
645 /// complete. Up to \c max_concurrent invocations are performed in parallel.
646 /// This does not allow the range to refer to stack objects. The caller
647 /// must ensure that the range outlives the call to max_concurrent_for_each
648 /// so it can be iterated in the background.
649 ///
650 /// \param begin an \c InputIterator designating the beginning of the range
651 /// \param end an \c InputIterator designating the end of the range
652 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
653 /// \param func Function to invoke with each element in the range (returning
654 /// a \c future<>)
655 /// \return a \c future<> that resolves when all the function invocations
656 /// complete. If one or more return an exception, the return value
657 /// contains one of the exceptions.
658 /// \note max_concurrent_for_each() schedules all invocations of \c func on the
659 /// current shard. If you want to run a function on all shards in parallel,
660 /// have a look at \ref smp::invoke_on_all() instead.
661 template <typename Iterator, typename Sentinel, typename Func>
662 SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) ) )
663 // We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
664 // which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
665 // break legacy code, for which it holds that Sentinel equals Iterator.
666 inline
667 future<>
668 max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
669 struct state {
670 Iterator begin;
671 Sentinel end;
672 Func func;
673 size_t max_concurrent;
674 semaphore sem;
675 std::exception_ptr err;
676
677 state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
678 : begin(std::move(begin_))
679 , end(std::move(end_))
680 , func(std::move(func_))
681 , max_concurrent(max_concurrent_)
682 , sem(max_concurrent_)
683 , err()
684 { }
685 };
686
687 assert(max_concurrent > 0);
688
689 try {
690 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
691 return do_until([&s] { return s.begin == s.end; }, [&s] {
692 return s.sem.wait().then([&s] () mutable noexcept {
693 // Possibly run in background and signal _sem when the task is done.
694 // The background tasks are waited on using _sem.
695 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
696 if (fut.failed()) {
697 auto e = fut.get_exception();;
698 if (!s.err) {
699 s.err = std::move(e);
700 }
701 }
702 s.sem.signal();
703 });
704 ++s.begin;
705 });
706 }).then([&s] {
707 // Wait for any background task to finish
708 // and signal and semaphore
709 return s.sem.wait(s.max_concurrent);
710 }).then([&s] {
711 if (!s.err) {
712 return make_ready_future<>();
713 }
714 return seastar::make_exception_future<>(std::move(s.err));
715 });
716 });
717 } catch (...) {
718 return current_exception_as_future();
719 }
720 }
721
722 /// Run a maximum of \c max_concurrent tasks in parallel (range version).
723 ///
724 /// Given a range of objects, run \c func on each \c *i in
725 /// the range, and return a future<> that resolves when all the functions
726 /// complete. \c func should return a future<> that indicates when it is
727 /// complete. Up to \c max_concurrent invocations are performed in parallel.
728 /// This does not allow the range to refer to stack objects. The caller
729 /// must ensure that the range outlives the call to max_concurrent_for_each
730 /// so it can be iterated in the background.
731 ///
732 /// \param range a \c Range to be processed
733 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
734 /// \param func Function to invoke with each element in the range (returning
735 /// a \c future<>)
736 /// \return a \c future<> that resolves when all the function invocations
737 /// complete. If one or more return an exception, the return value
738 /// contains one of the exceptions.
739 /// \note max_concurrent_for_each() schedules all invocations of \c func on the
740 /// current shard. If you want to run a function on all shards in parallel,
741 /// have a look at \ref smp::invoke_on_all() instead.
742 template <typename Range, typename Func>
743 SEASTAR_CONCEPT( requires requires (Func f, Range r) {
744 { f(*std::begin(r)) } -> std::same_as<future<>>;
745 std::end(r);
746 } )
747 inline
748 future<>
749 max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
750 try {
751 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
752 } catch (...) {
753 return current_exception_as_future();
754 }
755 }
756
757 /// @}
758
759 } // namespace seastar