]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/core/loop.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / loop.hh
index 29751e84b954c3008788685946ca9ffe3742bcf3..35d808b95ec96fe4d5b05d7acec4ede2602e5313 100644 (file)
 
 #pragma once
 
+#include <cstddef>
 #include <iterator>
 #include <memory>
+#include <type_traits>
 #include <vector>
 
 #include <seastar/core/future.hh>
@@ -70,7 +72,7 @@ public:
             do {
                 auto f = futurize_invoke(_action);
                 if (!f.available()) {
-                    internal::set_callback(f, this);
+                    internal::set_callback(std::move(f), this);
                     return;
                 }
                 if (f.get0() == stop_iteration::yes) {
@@ -122,7 +124,7 @@ future<> repeat(AsyncAction&& action) noexcept {
                 memory::scoped_critical_alloc_section _;
                 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
                 auto ret = repeater->get_future();
-                internal::set_callback(f, repeater);
+                internal::set_callback(std::move(f), repeater);
                 return ret;
             }();
         }
@@ -187,7 +189,7 @@ public:
             do {
                 auto f = futurize_invoke(_action);
                 if (!f.available()) {
-                    internal::set_callback(f, this);
+                    internal::set_callback(std::move(f), this);
                     return;
                 }
                 auto ret = f.get0();
@@ -240,7 +242,7 @@ repeat_until_value(AsyncAction action) noexcept {
             memory::scoped_critical_alloc_section _;
             auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
             auto ret = state->get_future();
-            internal::set_callback(f, state);
+            internal::set_callback(std::move(f), state);
             return ret;
           }();
         }
@@ -294,7 +296,7 @@ public:
                 }
                 auto f = _action();
                 if (!f.available()) {
-                    internal::set_callback(f, this);
+                    internal::set_callback(std::move(f), this);
                     return;
                 }
                 if (f.failed()) {
@@ -347,7 +349,7 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
                 memory::scoped_critical_alloc_section _;
                 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
                 auto ret = task->get_future();
-                internal::set_callback(f, task);
+                internal::set_callback(std::move(f), task);
                 return ret;
             }();
         }
@@ -381,9 +383,9 @@ class do_for_each_state final : public continuation_base<> {
     promise<> _pr;
 
 public:
-    do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<> first_unavailable)
+    do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
         : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
-        internal::set_callback(first_unavailable, this);
+        internal::set_callback(std::move(first_unavailable), this);
     }
     virtual void run_and_dispose() noexcept override {
         std::unique_ptr<do_for_each_state> zis(this);
@@ -399,7 +401,7 @@ public:
             }
             if (!f.available() || need_preempt()) {
                 _state = {};
-                internal::set_callback(f, this);
+                internal::set_callback(std::move(f), this);
                 zis.release();
                 return;
             }
@@ -472,7 +474,8 @@ future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept
 ///         \c action failed.
 template<typename Container, typename AsyncAction>
 SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
-    { futurize_invoke(aa, *c.begin()) } -> std::same_as<future<>>;
+    { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
+    std::end(c);
 } )
 inline
 future<> do_for_each(Container& c, AsyncAction action) noexcept {
@@ -485,18 +488,24 @@ future<> do_for_each(Container& c, AsyncAction action) noexcept {
 
 namespace internal {
 
-template <typename Iterator, typename IteratorCategory>
+template <typename T, typename = void>
+struct has_iterator_category : std::false_type {};
+
+template <typename T>
+struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
+
+template <typename Iterator, typename Sentinel, typename IteratorCategory>
 inline
 size_t
-iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
+iterator_range_estimate_vector_capacity(Iterator const&, Sentinel const&, IteratorCategory) {
     // For InputIterators we can't estimate needed capacity
     return 0;
 }
 
-template <typename Iterator>
+template <typename Iterator, typename Sentinel>
 inline
 size_t
-iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
+iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, std::forward_iterator_tag) {
     // May be linear time below random_access_iterator_tag, but still better than reallocation
     return std::distance(begin, end);
 }
@@ -540,23 +549,35 @@ public:
 /// \return a \c future<> that resolves when all the function invocations
 ///         complete.  If one or more return an exception, the return value
 ///         contains one of the exceptions.
-template <typename Iterator, typename Func>
-SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
+/// \note parallel_for_each() schedules all invocations of \c func on the
+///       current shard. If you want to run a function on all shards in parallel,
+///       have a look at \ref smp::invoke_on_all() instead.
+template <typename Iterator, typename Sentinel, typename Func>
+SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)))
+// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
+// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
+// break legacy code, for which it holds that Sentinel equals Iterator.
 inline
 future<>
-parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
+parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
     parallel_for_each_state* s = nullptr;
     // Process all elements, giving each future the following treatment:
     //   - available, not failed: do nothing
     //   - available, failed: collect exception in ex
     //   - not available: collect in s (allocating it if needed)
     while (begin != end) {
-        auto f = futurize_invoke(std::forward<Func>(func), *begin++);
+        auto f = futurize_invoke(std::forward<Func>(func), *begin);
+        ++begin;
         memory::scoped_critical_alloc_section _;
         if (!f.available() || f.failed()) {
             if (!s) {
                 using itraits = std::iterator_traits<Iterator>;
-                auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
+                size_t n{0U};
+                if constexpr (internal::has_iterator_category<Iterator>::value) {
+                    // We need if-constexpr here because there exist iterators for which std::iterator_traits
+                    // does not have 'iterator_category' as member type
+                    n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category{}) + 1);
+                }
                 s = new parallel_for_each_state(n);
             }
             s->add_future(std::move(f));
@@ -588,6 +609,9 @@ parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
 ///         was processed.  If one or more of the invocations of
 ///         \c func returned an exceptional future, then the return
 ///         value will contain one of those exceptions.
+/// \note parallel_for_each() schedules all invocations of \c func on the
+///       current shard. If you want to run a function on all shards in parallel,
+///       have a look at \ref smp::invoke_on_all() instead.
 
 namespace internal {
 
@@ -602,7 +626,10 @@ parallel_for_each_impl(Range&& range, Func&& func) {
 } // namespace internal
 
 template <typename Range, typename Func>
-SEASTAR_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
+SEASTAR_CONCEPT( requires requires (Func f, Range r) {
+    { f(*std::begin(r)) } -> std::same_as<future<>>;
+    std::end(r);
+} )
 inline
 future<>
 parallel_for_each(Range&& range, Func&& func) noexcept {
@@ -628,20 +655,26 @@ parallel_for_each(Range&& range, Func&& func) noexcept {
 /// \return a \c future<> that resolves when all the function invocations
 ///         complete.  If one or more return an exception, the return value
 ///         contains one of the exceptions.
-template <typename Iterator, typename Func>
-SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
+/// \note max_concurrent_for_each() schedules all invocations of \c func on the
+///       current shard. If you want to run a function on all shards in parallel,
+///       have a look at \ref smp::invoke_on_all() instead.
+template <typename Iterator, typename Sentinel, typename Func>
+SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) ) )
+// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
+// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
+// break legacy code, for which it holds that Sentinel equals Iterator.
 inline
 future<>
-max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Func&& func) noexcept {
+max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
     struct state {
         Iterator begin;
-        Iterator end;
+        Sentinel end;
         Func func;
         size_t max_concurrent;
         semaphore sem;
         std::exception_ptr err;
 
-        state(Iterator begin_, Iterator end_, size_t max_concurrent_, Func func_)
+        state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
             : begin(std::move(begin_))
             , end(std::move(end_))
             , func(std::move(func_))
@@ -659,7 +692,7 @@ max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Fun
                 return s.sem.wait().then([&s] () mutable noexcept {
                     // Possibly run in background and signal _sem when the task is done.
                     // The background tasks are waited on using _sem.
-                    (void)futurize_invoke(s.func, *s.begin++).then_wrapped([&s] (future<> fut) {
+                    (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
                         if (fut.failed()) {
                             auto e = fut.get_exception();;
                             if (!s.err) {
@@ -668,6 +701,7 @@ max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Fun
                         }
                         s.sem.signal();
                     });
+                    ++s.begin;
                 });
             }).then([&s] {
                 // Wait for any background task to finish
@@ -702,8 +736,14 @@ max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Fun
 /// \return a \c future<> that resolves when all the function invocations
 ///         complete.  If one or more return an exception, the return value
 ///         contains one of the exceptions.
+/// \note max_concurrent_for_each() schedules all invocations of \c func on the
+///       current shard. If you want to run a function on all shards in parallel,
+///       have a look at \ref smp::invoke_on_all() instead.
 template <typename Range, typename Func>
-SEASTAR_CONCEPT( requires std::ranges::range<Range> && requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
+SEASTAR_CONCEPT( requires requires (Func f, Range r) {
+    { f(*std::begin(r)) } -> std::same_as<future<>>;
+    std::end(r);
+} )
 inline
 future<>
 max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {