#include <seastar/core/do_with.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/thread.hh>
+#include <seastar/core/print.hh>
#include <boost/iterator/counting_iterator.hpp>
+#include <seastar/testing/thread_test_case.hh>
using namespace seastar;
using namespace std::chrono_literals;
expected_exception() : runtime_error("expected") {}
};
+#ifdef __clang__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wself-move"
+#endif
+SEASTAR_TEST_CASE(test_self_move) {
+ future_state<std::unique_ptr<int>> s1;
+ s1.set(std::make_unique<int>(42));
+ s1 = std::move(s1); // no crash, but the value of s1 is not defined.
+
+ future_state<std::unique_ptr<int>> s2;
+ s2.set(std::make_unique<int>(42));
+ std::swap(s2, s2);
+ BOOST_REQUIRE_EQUAL(*std::get<0>(std::move(s2).get()), 42);
+
+ promise<std::unique_ptr<int>> p1;
+ p1.set_value(std::make_unique<int>(42));
+ p1 = std::move(p1); // no crash, but the value of p1 is not defined.
+
+ promise<std::unique_ptr<int>> p2;
+ p2.set_value(std::make_unique<int>(42));
+ std::swap(p2, p2);
+ BOOST_REQUIRE_EQUAL(*p2.get_future().get0(), 42);
+
+ auto f1 = make_ready_future<std::unique_ptr<int>>(std::make_unique<int>(42));
+ f1 = std::move(f1); // no crash, but the value of f1 is not defined.
+
+ auto f2 = make_ready_future<std::unique_ptr<int>>(std::make_unique<int>(42));
+ std::swap(f2, f2);
+ BOOST_REQUIRE_EQUAL(*f2.get0(), 42);
+
+ return make_ready_future<>();
+}
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
+static subscription<int> get_empty_subscription(std::function<future<> (int)> func) {
+ stream<int> s;
+ auto ret = s.listen(func);
+ s.close();
+ return ret;
+}
+
+SEASTAR_TEST_CASE(test_stream) {
+ auto sub = get_empty_subscription([](int x) {
+ return make_ready_future<>();
+ });
+ return sub.done();
+}
+
+SEASTAR_TEST_CASE(test_stream_drop_sub) {
+ auto s = make_lw_shared<stream<int>>();
+ compat::optional<future<>> ret;
+ {
+ auto sub = s->listen([](int x) {
+ return make_ready_future<>();
+ });
+ *ret = sub.done();
+ // It is ok to drop the subscription when we only want the competition future.
+ }
+ return s->produce(42).then([ret = std::move(*ret), s] () mutable {
+ s->close();
+ return std::move(ret);
+ });
+}
+
+SEASTAR_TEST_CASE(test_set_future_state_with_tuple) {
+ future_state<int> s1;
+ promise<int> p1;
+ const std::tuple<int> v1(42);
+ s1.set(v1);
+ p1.set_value(v1);
+
+ future_state<int, int> s2;
+ promise<int, int> p2;
+ const std::tuple<int, int> v2(41, 42);
+ s2.set(v2);
+ p2.set_value(v2);
+
+ return make_ready_future<>();
+}
+
+SEASTAR_TEST_CASE(test_set_value_throw_in_copy) {
+ struct throw_in_copy {
+ throw_in_copy() noexcept = default;
+ throw_in_copy(throw_in_copy&& x) noexcept {
+ }
+ throw_in_copy(const throw_in_copy& x) {
+ throw 42;
+ }
+ };
+ promise<throw_in_copy> p1;
+ throw_in_copy v;
+ BOOST_REQUIRE_THROW(p1.set_value(v), int);
+ return make_ready_future<>();
+}
+
SEASTAR_TEST_CASE(test_finally_is_called_on_success_and_failure) {
auto finally1 = make_shared<bool>();
auto finally2 = make_shared<bool>();
promise<> p1;
promise<> p2;
- auto f = p1.get_future().then([f = std::move(p2.get_future())] () mutable {
+ auto f = p1.get_future().then([f = p2.get_future()] () mutable {
return std::move(f);
}).then([] {
BOOST_REQUIRE(false);
BOOST_REQUIRE_EQUAL(sum, 15);
// throws immediately
- BOOST_CHECK_EXCEPTION(parallel_for_each(range, [&sum] (int) -> future<> {
+ BOOST_CHECK_EXCEPTION(parallel_for_each(range, [] (int) -> future<> {
throw 5;
}).get(), int, [] (int v) { return v == 5; });
// throws after suspension
- BOOST_CHECK_EXCEPTION(parallel_for_each(range, [&sum] (int) {
+ BOOST_CHECK_EXCEPTION(parallel_for_each(range, [] (int) {
return later().then([] {
throw 5;
});
engine().add_high_priority_task(make_task([flag] {
*flag = true;
}));
- make_ready_future().then([flag] {
+ return make_ready_future().then([flag] {
BOOST_REQUIRE(*flag);
});
});
});
}
+SEASTAR_TEST_CASE(test_unused_shared_future_is_not_a_broken_future) {
+ promise<> p;
+ shared_future<> s(p.get_future());
+ return make_ready_future<>();
+}
+
SEASTAR_TEST_CASE(test_shared_future_propagates_value_to_all) {
return seastar::async([] {
promise<shared_ptr<int>> p; // shared_ptr<> to check it deals with emptyable types
});
}
+SEASTAR_TEST_CASE(test_ignored_future_warning) {
+ // This doesn't warn:
+ promise<> p;
+ p.set_exception(expected_exception());
+ future<> f = p.get_future();
+ f.ignore_ready_future();
+
+ // And by analogy, neither should this
+ shared_promise<> p2;
+ p2.set_exception(expected_exception());
+ future<> f2 = p2.get_shared_future();
+ f2.ignore_ready_future();
+ return make_ready_future<>();
+}
+
SEASTAR_TEST_CASE(test_futurize_from_tuple) {
std::tuple<int> v1 = std::make_tuple(3);
std::tuple<> v2 = {};
});
}
+SEASTAR_TEST_CASE(test_repeat_until_value_implicit_future) {
+ // Same as above, but returning compat::optional<int> instead of future<compat::optional<int>>
+ return do_with(int(), [] (int& counter) {
+ return repeat_until_value([&counter] {
+ if (counter == 10000) {
+ return compat::optional<int>(counter);
+ } else {
+ ++counter;
+ return compat::optional<int>(compat::nullopt);
+ }
+ }).then([&counter] (int result) {
+ BOOST_REQUIRE(counter == 10000);
+ BOOST_REQUIRE(result == counter);
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_repeat_until_value_exception) {
+ return repeat_until_value([] {
+ throw expected_exception();
+ return compat::optional<int>(43);
+ }).then_wrapped([] (future<int> f) {
+ check_fails_with_expected(std::move(f));
+ });
+}
+
SEASTAR_TEST_CASE(test_when_allx) {
return when_all(later(), later(), make_ready_future()).discard_result();
}
+#if __cplusplus >= 201703L
+
+// A noncopyable and nonmovable struct
+struct non_copy_non_move {
+ non_copy_non_move(non_copy_non_move&&) = delete;
+ non_copy_non_move(const non_copy_non_move&) = delete;
+};
+
+SEASTAR_TEST_CASE(test_when_all_functions) {
+ auto f = [x = non_copy_non_move{}] {
+ (void)x;
+ return make_ready_future<int>(42);
+ };
+ return when_all(f, [] {
+ throw 42;
+ return make_ready_future<>();
+ }, later()).then([] (std::tuple<future<int>, future<>, future<>> res) {
+ BOOST_REQUIRE_EQUAL(std::get<0>(res).get0(), 42);
+
+ BOOST_REQUIRE(std::get<1>(res).available());
+ BOOST_REQUIRE(std::get<1>(res).failed());
+ std::get<1>(res).ignore_ready_future();
+
+ BOOST_REQUIRE(std::get<2>(res).available());
+ BOOST_REQUIRE(!std::get<2>(res).failed());
+ return make_ready_future<>();
+ });
+}
+
+SEASTAR_TEST_CASE(test_when_all_succeed_functions) {
+ auto f = [x = non_copy_non_move{}] {
+ (void)x;
+ return make_ready_future<int>(42);
+ };
+ return when_all_succeed(f, [] {
+ throw 42;
+ return make_ready_future<>();
+ }, later()).then_wrapped([] (future<int> res) {
+ BOOST_REQUIRE(res.available());
+ BOOST_REQUIRE(res.failed());
+ res.ignore_ready_future();
+ return make_ready_future<>();
+ });
+}
+
+#endif
+
template<typename E, typename... T>
static void check_failed_with(future<T...>&& f) {
BOOST_REQUIRE(f.failed());
});
}
+SEASTAR_THREAD_TEST_CASE(test_shared_future_get_future_after_timeout) {
+ // This used to crash because shared_future checked if the list of
+ // pending futures was empty to decide if it had already called
+ // then_wrapped. If all pending futures timed out, it would call
+ // it again.
+ promise<> pr;
+ shared_future<with_clock<manual_clock>> sfut(pr.get_future());
+ future<> fut1 = sfut.get_future(manual_clock::now() + 1s);
+
+ manual_clock::advance(1s);
+
+ check_timed_out(std::move(fut1));
+
+ future<> fut2 = sfut.get_future(manual_clock::now() + 1s);
+ manual_clock::advance(1s);
+ check_timed_out(std::move(fut2));
+
+ future<> fut3 = sfut.get_future(manual_clock::now() + 1s);
+ pr.set_value();
+}
+
SEASTAR_TEST_CASE(test_custom_exception_factory_in_with_timeout) {
return seastar::async([] {
class custom_error : public std::exception {
return seastar::stop_iteration::no;
});
}
+
+SEASTAR_THREAD_TEST_CASE(test_broken_promises) {
+ compat::optional<future<>> f;
+ compat::optional<future<>> f2;
+ { // Broken after attaching a continuation
+ auto p = promise<>();
+ f = p.get_future();
+ f2 = f->then_wrapped([&] (future<> f3) {
+ BOOST_CHECK(f3.failed());
+ BOOST_CHECK_THROW(f3.get(), broken_promise);
+ f = { };
+ });
+ }
+ f2->get();
+ BOOST_CHECK(!f);
+
+ { // Broken before attaching a continuation
+ auto p = promise<>();
+ f = p.get_future();
+ }
+ f->then_wrapped([&] (future<> f3) {
+ BOOST_CHECK(f3.failed());
+ BOOST_CHECK_THROW(f3.get(), broken_promise);
+ f = { };
+ }).get();
+ BOOST_CHECK(!f);
+
+ { // Broken before suspending a thread
+ auto p = promise<>();
+ f = p.get_future();
+ }
+ BOOST_CHECK_THROW(f->get(), broken_promise);
+}
+
+SEASTAR_TEST_CASE(test_warn_on_broken_promise_with_no_future) {
+ // Example code where we expect a "Exceptional future ignored"
+ // warning. We can't directly test that the warning is issued, but
+ // this example functions as documentation.
+ promise<> p;
+ // Intentionally destroy the future
+ (void)p.get_future();
+ p.set_exception(std::runtime_error("foo"));
+ return make_ready_future<>();
+}