]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/src/core/future-util.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / src / core / future-util.cc
index bd36cac5c3d6b7bafd08578c8aab98ce38a55bff..3e2c0781b1bfa0d0e135f8f445d675bc0c548e51 100644 (file)
  * Copyright (C) 2017 ScyllaDB
  */
 
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
 #include <seastar/core/sleep.hh>
+#include <seastar/core/print.hh>
+#include <seastar/core/semaphore.hh>
 
 namespace seastar {
 
+parallel_for_each_state::parallel_for_each_state(size_t n) {
+    _incomplete.reserve(n);
+}
+
+future<> parallel_for_each_state::get_future() {
+    auto ret = _result.get_future();
+    wait_for_one();
+    return ret;
+}
+
+void parallel_for_each_state::add_future(future<>&& f) {
+    _incomplete.push_back(std::move(f));
+}
+
+void parallel_for_each_state::wait_for_one() noexcept {
+    // Process from back to front, on the assumption that the front
+    // futures are likely to complete earlier than the back futures.
+    // If that's indeed the case, then the front futures will be
+    // available and we won't have to wait for them.
+
+    // Skip over futures that happen to be complete already.
+    while (!_incomplete.empty() && _incomplete.back().available()) {
+        if (_incomplete.back().failed()) {
+            _ex = _incomplete.back().get_exception();
+        }
+        _incomplete.pop_back();
+    }
+
+    // If there's an incompelete future, wait for it.
+    if (!_incomplete.empty()) {
+        internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
+        // This future's state will be collected in run_and_dispose(), so we can drop it.
+        _incomplete.pop_back();
+        return;
+    }
+
+    // Everything completed, report a result.
+    if (__builtin_expect(bool(_ex), false)) {
+        _result.set_exception(std::move(_ex));
+    } else {
+        _result.set_value();
+    }
+    delete this;
+}
+
+void parallel_for_each_state::run_and_dispose() noexcept {
+    if (_state.failed()) {
+        _ex = std::move(_state).get_exception();
+    }
+    _state = {};
+    wait_for_one();
+}
+
 template <typename Clock>
 future<> sleep_abortable(typename Clock::duration dur) {
     return engine().wait_for_stop(dur).then([] {
@@ -68,4 +125,10 @@ future<> sleep_abortable(typename Clock::duration dur, abort_source& as) {
 template future<> sleep_abortable<steady_clock_type>(typename steady_clock_type::duration, abort_source&);
 template future<> sleep_abortable<lowres_clock>(typename lowres_clock::duration, abort_source&);
 
+named_semaphore_timed_out::named_semaphore_timed_out(compat::string_view msg) : _msg(format("Semaphore timed out: {}", msg)) {
+}
+
+broken_named_semaphore::broken_named_semaphore(compat::string_view msg) : _msg(format("Semaphore broken: {}", msg)) {
+}
+
 }