]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/common/errorator-loop.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / common / errorator-loop.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #pragma once
5
6 #include <seastar/core/future.hh>
7
8 #include "crimson/common/errorator.h"
9
10
11 namespace crimson {
12 template <class... AllowedErrors>
13 class parallel_for_each_state final : private seastar::continuation_base<> {
14 using future_t = typename errorator<AllowedErrors...>::template future<>;
15 std::vector<future_t> _incomplete;
16 seastar::promise<> _result;
17 std::exception_ptr _ex;
18 private:
19 void wait_for_one() noexcept {
20 while (!_incomplete.empty() && _incomplete.back().available()) {
21 if (_incomplete.back().failed()) {
22 _ex = _incomplete.back().get_exception();
23 }
24 _incomplete.pop_back();
25 }
26 if (!_incomplete.empty()) {
27 seastar::internal::set_callback(std::move(_incomplete.back()),
28 static_cast<continuation_base<>*>(this));
29 _incomplete.pop_back();
30 return;
31 }
32 if (__builtin_expect(bool(_ex), false)) {
33 _result.set_exception(std::move(_ex));
34 } else {
35 _result.set_value();
36 }
37 delete this;
38 }
39 virtual void run_and_dispose() noexcept override {
40 if (_state.failed()) {
41 _ex = std::move(_state).get_exception();
42 }
43 _state = {};
44 wait_for_one();
45 }
46 task* waiting_task() noexcept override { return _result.waiting_task(); }
47 public:
48 parallel_for_each_state(size_t n) {
49 _incomplete.reserve(n);
50 }
51 void add_future(future_t&& f) {
52 _incomplete.push_back(std::move(f));
53 }
54 future_t get_future() {
55 auto ret = _result.get_future();
56 wait_for_one();
57 return ret;
58 }
59 };
60
61 template <typename Iterator, typename Func, typename... AllowedErrors>
62 static inline typename errorator<AllowedErrors...>::template future<>
63 parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
64 parallel_for_each_state<AllowedErrors...>* s = nullptr;
65 // Process all elements, giving each future the following treatment:
66 // - available, not failed: do nothing
67 // - available, failed: collect exception in ex
68 // - not available: collect in s (allocating it if needed)
69 for (;first != last; ++first) {
70 auto f = seastar::futurize_invoke(std::forward<Func>(func), *first);
71 if (!f.available() || f.failed()) {
72 if (!s) {
73 using itraits = std::iterator_traits<Iterator>;
74 auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
75 first, last, typename itraits::iterator_category()) + 1);
76 s = new parallel_for_each_state<AllowedErrors...>(n);
77 }
78 s->add_future(std::move(f));
79 }
80 }
81 // If any futures were not available, hand off to parallel_for_each_state::start().
82 // Otherwise we can return a result immediately.
83 if (s) {
84 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
85 // so this isn't a leak
86 return s->get_future();
87 }
88 return seastar::make_ready_future<>();
89 }
90
91 } // namespace crimson