2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2019 ScyllaDB Ltd.
26 #include <seastar/core/circular_buffer.hh>
27 #include <seastar/core/future-util.hh>
28 #include <seastar/testing/test_case.hh>
29 #include <seastar/core/sleep.hh>
30 #include <seastar/util/later.hh>
31 #include <seastar/core/thread.hh>
32 #include <seastar/testing/random.hh>
34 using namespace seastar
;
35 using namespace std::chrono_literals
;
37 #ifndef SEASTAR_COROUTINES_ENABLED
39 SEASTAR_TEST_CASE(test_coroutines_not_compiled_in
) {
40 return make_ready_future
<>();
45 #include <seastar/core/coroutine.hh>
46 #include <seastar/coroutine/all.hh>
47 #include <seastar/coroutine/maybe_yield.hh>
48 #include <seastar/coroutine/switch_to.hh>
49 #include <seastar/coroutine/parallel_for_each.hh>
50 #include <seastar/coroutine/as_future.hh>
51 #include <seastar/coroutine/exception.hh>
52 #include <seastar/coroutine/generator.hh>
56 future
<int> old_fashioned_continuations() {
57 return yield().then([] {
62 future
<int> simple_coroutine() {
67 future
<int> ready_coroutine() {
71 future
<std::tuple
<int, double>> tuple_coroutine() {
72 co_return
std::tuple(1, 2.);
75 future
<int> failing_coroutine() {
80 [[gnu::noinline
]] int throw_exception(int x
) {
84 future
<int> failing_coroutine2() noexcept
{
86 co_return
throw_exception(17);
91 SEASTAR_TEST_CASE(test_simple_coroutines
) {
92 BOOST_REQUIRE_EQUAL(co_await
old_fashioned_continuations(), 42);
93 BOOST_REQUIRE_EQUAL(co_await
simple_coroutine(), 53);
94 BOOST_REQUIRE_EQUAL(ready_coroutine().get0(), 64);
95 BOOST_REQUIRE(co_await
tuple_coroutine() == std::tuple(1, 2.));
96 BOOST_REQUIRE_EXCEPTION((void)co_await
failing_coroutine(), int, [] (auto v
) { return v
== 42; });
97 BOOST_CHECK_EQUAL(co_await
failing_coroutine().then_wrapped([] (future
<int> f
) -> future
<int> {
98 BOOST_REQUIRE(f
.failed());
100 std::rethrow_exception(f
.get_exception());
105 BOOST_REQUIRE_EXCEPTION((void)co_await
failing_coroutine2(), int, [] (auto v
) { return v
== 17; });
106 BOOST_CHECK_EQUAL(co_await
failing_coroutine2().then_wrapped([] (future
<int> f
) -> future
<int> {
107 BOOST_REQUIRE(f
.failed());
109 std::rethrow_exception(f
.get_exception());
116 SEASTAR_TEST_CASE(test_abandond_coroutine
) {
117 std::optional
<future
<int>> f
;
119 auto p1
= promise
<>();
120 auto p2
= promise
<>();
121 auto p3
= promise
<>();
122 f
= p1
.get_future().then([&] () -> future
<int> {
124 BOOST_CHECK_THROW(co_await p3
.get_future(), broken_promise
);
128 co_await p2
.get_future();
130 BOOST_CHECK_EQUAL(co_await
std::move(*f
), 1);
133 SEASTAR_TEST_CASE(test_scheduling_group
) {
134 auto other_sg
= co_await
create_scheduling_group("the other group", 10.f
);
135 std::exception_ptr ex
;
138 auto p1
= promise
<>();
139 auto p2
= promise
<>();
141 auto p1b
= promise
<>();
142 auto p2b
= promise
<>();
143 auto f1
= p1b
.get_future();
144 auto f2
= p2b
.get_future();
146 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
147 auto f_ret
= with_scheduling_group(other_sg
,
148 [other_sg_cap
= other_sg
] (future
<> f1
, future
<> f2
, promise
<> p1
, promise
<> p2
) -> future
<int> {
149 // Make a copy in the coroutine before the lambda is destroyed.
150 auto other_sg
= other_sg_cap
;
151 BOOST_REQUIRE(current_scheduling_group() == other_sg
);
152 BOOST_REQUIRE(other_sg
== other_sg
);
154 co_await
std::move(f1
);
155 BOOST_REQUIRE(current_scheduling_group() == other_sg
);
157 co_await
std::move(f2
);
158 BOOST_REQUIRE(current_scheduling_group() == other_sg
);
160 }, p1
.get_future(), p2
.get_future(), std::move(p1b
), std::move(p2b
));
162 co_await
std::move(f1
);
163 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
165 co_await
std::move(f2
);
166 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
168 BOOST_REQUIRE_EQUAL(co_await
std::move(f_ret
), 42);
169 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
171 ex
= std::current_exception();
173 co_await
destroy_scheduling_group(other_sg
);
175 std::rethrow_exception(std::move(ex
));
179 future
<scheduling_group
> switch_to_with_context(scheduling_group
& sg
) {
180 scheduling_group new_sg
= co_await
coroutine::switch_to(sg
);
181 BOOST_REQUIRE(current_scheduling_group() == sg
);
185 SEASTAR_TEST_CASE(test_switch_to
) {
186 auto other_sg0
= co_await
create_scheduling_group("other group 0", 10.f
);
187 auto other_sg1
= co_await
create_scheduling_group("other group 1", 10.f
);
188 auto other_sg2
= co_await
create_scheduling_group("other group 2", 10.f
);
189 std::exception_ptr ex
;
192 auto base_sg
= current_scheduling_group();
194 auto prev_sg
= co_await
coroutine::switch_to(other_sg0
);
195 BOOST_REQUIRE(current_scheduling_group() == other_sg0
);
196 BOOST_REQUIRE(prev_sg
== base_sg
);
198 auto same_sg
= co_await
coroutine::switch_to(other_sg0
);
199 BOOST_REQUIRE(current_scheduling_group() == other_sg0
);
200 BOOST_REQUIRE(same_sg
== other_sg0
);
202 auto nested_sg
= co_await
coroutine::switch_to(other_sg1
);
203 BOOST_REQUIRE(current_scheduling_group() == other_sg1
);
204 BOOST_REQUIRE(nested_sg
== other_sg0
);
206 co_await
switch_to_with_context(other_sg2
);
208 co_await
coroutine::switch_to(base_sg
);
209 BOOST_REQUIRE(current_scheduling_group() == base_sg
);
211 ex
= std::current_exception();
214 co_await
destroy_scheduling_group(other_sg1
);
215 co_await
destroy_scheduling_group(other_sg0
);
217 std::rethrow_exception(std::move(ex
));
221 future
<> check_thread_inherits_sg_from_coroutine_frame(scheduling_group expected_sg
) {
222 return seastar::async([expected_sg
] {
223 BOOST_REQUIRE(current_scheduling_group() == expected_sg
);
227 future
<> check_coroutine_inherits_sg_from_another_one(scheduling_group expected_sg
) {
229 BOOST_REQUIRE(current_scheduling_group() == expected_sg
);
232 future
<> switch_to_sg_and_perform_inheriting_checks(scheduling_group base_sg
, scheduling_group new_sg
) {
233 BOOST_REQUIRE(current_scheduling_group() == base_sg
);
234 co_await
coroutine::switch_to(new_sg
);
235 BOOST_REQUIRE(current_scheduling_group() == new_sg
);
237 co_await
check_thread_inherits_sg_from_coroutine_frame(new_sg
);
238 co_await
check_coroutine_inherits_sg_from_another_one(new_sg
);
240 // don't restore previous sg on purpose, expecting it will be restored once coroutine goes out of scope
243 SEASTAR_TEST_CASE(test_switch_to_sg_restoration_and_inheriting
) {
244 auto new_sg
= co_await
create_scheduling_group("other group 0", 10.f
);
245 std::exception_ptr ex
;
248 auto base_sg
= current_scheduling_group();
250 co_await
switch_to_sg_and_perform_inheriting_checks(base_sg
, new_sg
);
251 // seastar automatically restores base_sg once it goes out of coroutine frame
252 BOOST_REQUIRE(current_scheduling_group() == base_sg
);
254 co_await
seastar::async([base_sg
, new_sg
] {
255 switch_to_sg_and_perform_inheriting_checks(base_sg
, new_sg
).get();
256 BOOST_REQUIRE(current_scheduling_group() == base_sg
);
259 co_await
switch_to_sg_and_perform_inheriting_checks(base_sg
, new_sg
).finally([base_sg
] {
260 BOOST_REQUIRE(current_scheduling_group() == base_sg
);
263 ex
= std::current_exception();
266 co_await
destroy_scheduling_group(new_sg
);
268 std::rethrow_exception(std::move(ex
));
272 SEASTAR_TEST_CASE(test_preemption
) {
274 unsigned preempted
= 0;
275 auto f
= yield().then([&x
] {
279 // try to preempt 1000 times. 1 should be enough if not for
280 // task queue shaffling in debug mode which may cause co-routine
281 // continuation to run first.
282 while(preempted
< 1000 && !x
) {
283 preempted
+= need_preempt();
284 co_await make_ready_future
<>();
287 // wait for yield() to complete
288 co_await
std::move(f
);
289 BOOST_REQUIRE(save_x
);
293 SEASTAR_TEST_CASE(test_no_preemption
) {
295 unsigned preempted
= 0;
296 auto f
= yield().then([&x
] {
300 // preemption should not happen, we explicitly asked for continuing if possible
301 while(preempted
< 1000 && !x
) {
302 preempted
+= need_preempt();
303 co_await
coroutine::without_preemption_check(make_ready_future
<>());
306 // wait for yield() to complete
307 co_await
std::move(f
);
308 BOOST_REQUIRE(!save_x
);
312 SEASTAR_TEST_CASE(test_all_simple
) {
313 auto [a
, b
] = co_await
coroutine::all(
314 [] { return make_ready_future
<int>(1); },
315 [] { return make_ready_future
<int>(2); }
317 BOOST_REQUIRE_EQUAL(a
, 1);
318 BOOST_REQUIRE_EQUAL(b
, 2);
321 SEASTAR_TEST_CASE(test_all_permutations
) {
322 std::vector
<std::chrono::milliseconds
> delays
= { 0ms
, 0ms
, 2ms
, 2ms
, 4ms
, 6ms
};
323 auto make_delayed_future_returning_nr
= [&] (int nr
) {
325 auto delay
= delays
[nr
];
326 return delay
== 0ms
? make_ready_future
<int>(nr
) : sleep(delay
).then([nr
] { return make_ready_future
<int>(nr
); });
330 auto [a
, b
, c
, d
, e
, f
] = co_await
coroutine::all(
331 make_delayed_future_returning_nr(0),
332 make_delayed_future_returning_nr(1),
333 make_delayed_future_returning_nr(2),
334 make_delayed_future_returning_nr(3),
335 make_delayed_future_returning_nr(4),
336 make_delayed_future_returning_nr(5)
338 BOOST_REQUIRE_EQUAL(a
, 0);
339 BOOST_REQUIRE_EQUAL(b
, 1);
340 BOOST_REQUIRE_EQUAL(c
, 2);
341 BOOST_REQUIRE_EQUAL(d
, 3);
342 BOOST_REQUIRE_EQUAL(e
, 4);
343 BOOST_REQUIRE_EQUAL(f
, 5);
344 } while (std::ranges::next_permutation(delays
).found
);
347 SEASTAR_TEST_CASE(test_all_ready_exceptions
) {
349 co_await
coroutine::all(
350 [] () -> future
<> { throw 1; },
351 [] () -> future
<> { throw 2; }
354 BOOST_REQUIRE(e
== 1 || e
== 2);
358 SEASTAR_TEST_CASE(test_all_nonready_exceptions
) {
360 co_await
coroutine::all(
371 BOOST_REQUIRE(e
== 1 || e
== 2);
375 SEASTAR_TEST_CASE(test_all_heterogeneous_types
) {
376 auto [a
, b
] = co_await
coroutine::all(
377 [] () -> future
<int> {
384 [] () -> future
<long> {
389 BOOST_REQUIRE_EQUAL(a
, 1);
390 BOOST_REQUIRE_EQUAL(b
, 2L);
393 SEASTAR_TEST_CASE(test_all_noncopyable_types
) {
394 auto [a
] = co_await
coroutine::all(
395 [] () -> future
<std::unique_ptr
<int>> {
396 co_return
std::make_unique
<int>(6);
399 BOOST_REQUIRE_EQUAL(*a
, 6);
402 SEASTAR_TEST_CASE(test_all_throw_in_input_func
) {
403 int nr_completed
= 0;
404 bool exception_seen
= false;
406 co_await
coroutine::all(
407 [&] () -> future
<int> {
412 [&] () -> future
<int> {
415 [&] () -> future
<int> {
422 BOOST_REQUIRE_EQUAL(n
, 9);
423 exception_seen
= true;
425 BOOST_REQUIRE_EQUAL(nr_completed
, 2);
426 BOOST_REQUIRE(exception_seen
);
434 explicit counter_ref(int& cnt
)
444 template<typename Ex
>
445 static future
<> check_coroutine_throws(auto fun
) {
446 // The counter keeps track of the number of alive "counter_ref" objects.
447 // If it is not zero then it means that some destructors weren't run
448 // while quitting the coroutine.
450 BOOST_REQUIRE_THROW(co_await
fun(counter
), Ex
);
451 BOOST_REQUIRE_EQUAL(counter
, 0);
452 co_await
fun(counter
).then_wrapped([&counter
] (auto f
) {
453 BOOST_REQUIRE(f
.failed());
454 BOOST_REQUIRE_THROW(std::rethrow_exception(f
.get_exception()), Ex
);
455 BOOST_REQUIRE_EQUAL(counter
, 0);
459 SEASTAR_TEST_CASE(test_coroutine_exception
) {
460 co_await check_coroutine_throws
<std::runtime_error
>([] (int& counter
) -> future
<int> {
461 counter_ref ref
{counter
};
462 co_return
coroutine::exception(std::make_exception_ptr(std::runtime_error("threw")));
464 co_await check_coroutine_throws
<std::runtime_error
>([] (int& counter
) -> future
<int> {
465 counter_ref ref
{counter
};
466 co_await
coroutine::exception(std::make_exception_ptr(std::runtime_error("threw")));
469 co_await check_coroutine_throws
<std::logic_error
>([] (int& counter
) -> future
<> {
470 counter_ref ref
{counter
};
471 co_await
coroutine::return_exception(std::logic_error("threw"));
474 co_await check_coroutine_throws
<int>([] (int& counter
) -> future
<> {
475 counter_ref ref
{counter
};
476 co_await
coroutine::return_exception(42);
481 SEASTAR_TEST_CASE(test_coroutine_return_exception_ptr
) {
482 co_await check_coroutine_throws
<std::runtime_error
>([] (int& counter
) -> future
<> {
483 co_await
coroutine::return_exception(std::runtime_error("threw"));
485 co_await check_coroutine_throws
<std::runtime_error
>([] (int& counter
) -> future
<> {
486 auto ex
= std::make_exception_ptr(std::runtime_error("threw"));
487 co_await
coroutine::return_exception_ptr(std::move(ex
));
489 co_await check_coroutine_throws
<std::runtime_error
>([] (int& counter
) -> future
<> {
490 auto ex
= std::make_exception_ptr(std::runtime_error("threw"));
491 co_await
coroutine::return_exception_ptr(ex
);
493 co_await check_coroutine_throws
<int>([] (int& counter
) -> future
<> {
494 co_await
coroutine::return_exception_ptr(std::make_exception_ptr(3));
498 SEASTAR_TEST_CASE(test_maybe_yield
) {
501 auto spinner
= [&] () -> future
<> {
502 // increment a variable continuously, but yield so an observer can see it.
505 co_await
coroutine::maybe_yield();
508 auto spinner_fut
= spinner();
510 for (int nr_changes
= 0; nr_changes
< 10; ++nr_changes
) {
511 // Try to observe the value changing in time, yield to
512 // allow the spinner to advance it.
513 while (snapshot
== var
) {
514 co_await
coroutine::maybe_yield();
519 co_await
std::move(spinner_fut
);
520 BOOST_REQUIRE(true); // the test will hang if it doesn't work.
523 #if __has_include(<coroutine>) && !defined(__clang__)
525 #include "tl-generator.hh"
526 tl::generator
<int> simple_generator(int max
)
528 for (int i
= 0; i
< max
; ++i
) {
533 SEASTAR_TEST_CASE(generator
)
535 // test ability of seastar::parallel_for_each to deal with move-only views
537 co_await
seastar::parallel_for_each(simple_generator(10), [&](int i
) {
539 return seastar::make_ready_future
<>();
541 BOOST_REQUIRE_EQUAL(accum
, 45);
543 // test ability of seastar::max_concurrent_for_each to deal with move-only views
545 co_await
seastar::max_concurrent_for_each(simple_generator(10), 10, [&](int i
) {
547 return seastar::make_ready_future
<>();
549 BOOST_REQUIRE_EQUAL(accum
, 45);
554 SEASTAR_TEST_CASE(test_parallel_for_each_empty
) {
555 std::vector
<int> values
;
558 co_await
coroutine::parallel_for_each(values
, [&] (int x
) {
561 BOOST_REQUIRE_EQUAL(count
, 0); // the test will hang if it doesn't work.
564 SEASTAR_TEST_CASE(test_parallel_for_each_exception
) {
565 std::array
<int, 5> values
= { 10, 2, 1, 4, 8 };
567 auto& eng
= testing::local_random_engine
;
568 auto dist
= std::uniform_int_distribution
<unsigned>();
569 int throw_at
= dist(eng
) % values
.size();
571 BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at
, values
.size()));
573 auto f0
= coroutine::parallel_for_each(values
, [&] (int x
) {
574 if (count
++ == throw_at
) {
575 BOOST_TEST_MESSAGE("throw");
576 throw std::runtime_error("test");
579 // An exception thrown by the functor must be propagated
580 BOOST_REQUIRE_THROW(co_await
std::move(f0
), std::runtime_error
);
581 // Functor must be called on all values, even if there's an exception
582 BOOST_REQUIRE_EQUAL(count
, values
.size());
585 throw_at
= dist(eng
) % values
.size();
586 BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at
, values
.size()));
588 auto f1
= coroutine::parallel_for_each(values
, [&] (int x
) -> future
<> {
589 co_await
sleep(std::chrono::milliseconds(x
));
590 if (count
++ == throw_at
) {
591 throw std::runtime_error("test");
594 BOOST_REQUIRE_THROW(co_await
std::move(f1
), std::runtime_error
);
595 BOOST_REQUIRE_EQUAL(count
, values
.size());
598 SEASTAR_TEST_CASE(test_parallel_for_each
) {
599 std::vector
<int> values
= { 3, 1, 4 };
600 int sum_of_squares
= 0;
602 int expected
= std::accumulate(values
.begin(), values
.end(), 0, [] (int sum
, int x
) {
606 // Test all-ready futures
607 co_await
coroutine::parallel_for_each(values
, [&sum_of_squares
] (int x
) {
608 sum_of_squares
+= x
* x
;
610 BOOST_REQUIRE_EQUAL(sum_of_squares
, expected
);
612 // Test non-ready futures
614 co_await
coroutine::parallel_for_each(values
, [&sum_of_squares
] (int x
) -> future
<> {
616 co_await
sleep(std::chrono::milliseconds(x
));
618 sum_of_squares
+= x
* x
;
620 BOOST_REQUIRE_EQUAL(sum_of_squares
, expected
);
622 // Test legacy subrange
624 co_await
coroutine::parallel_for_each(values
.begin(), values
.end() - 1, [&sum_of_squares
] (int x
) -> future
<> {
626 co_await
sleep(std::chrono::milliseconds(x
));
628 sum_of_squares
+= x
* x
;
630 BOOST_REQUIRE_EQUAL(sum_of_squares
, 10);
632 // clang 13.0.1 doesn't support subrange
633 // so provide also a Iterator/Sentinel based constructor.
634 // See https://github.com/llvm/llvm-project/issues/46091
636 // Test std::ranges::subrange
638 co_await
coroutine::parallel_for_each(std::ranges::subrange(values
.begin(), values
.end() - 1), [&sum_of_squares
] (int x
) -> future
<> {
640 co_await
sleep(std::chrono::milliseconds(x
));
642 sum_of_squares
+= x
* x
;
644 BOOST_REQUIRE_EQUAL(sum_of_squares
, 10);
648 SEASTAR_TEST_CASE(test_void_as_future
) {
649 auto f
= co_await
coroutine::as_future(make_ready_future
<>());
650 BOOST_REQUIRE(f
.available());
652 f
= co_await
coroutine::as_future(make_exception_future
<>(std::runtime_error("exception")));
653 BOOST_REQUIRE_THROW(f
.get(), std::runtime_error
);
656 (void)sleep(1ms
).then([&] { sem
.signal(); });
657 f
= co_await
coroutine::as_future(sem
.wait());
658 BOOST_REQUIRE(f
.available());
660 f
= co_await
coroutine::as_future(sem
.wait(duration_cast
<semaphore::duration
>(1ms
)));
661 BOOST_REQUIRE_THROW(f
.get(), semaphore_timed_out
);
664 SEASTAR_TEST_CASE(test_void_as_future_without_preemption_check
) {
665 auto f
= co_await
coroutine::as_future_without_preemption_check(make_ready_future
<>());
666 BOOST_REQUIRE(f
.available());
668 f
= co_await
coroutine::as_future_without_preemption_check(make_exception_future
<>(std::runtime_error("exception")));
669 BOOST_REQUIRE_THROW(f
.get(), std::runtime_error
);
672 (void)sleep(1ms
).then([&] { sem
.signal(); });
673 f
= co_await
coroutine::as_future_without_preemption_check(sem
.wait());
674 BOOST_REQUIRE(f
.available());
676 f
= co_await
coroutine::as_future_without_preemption_check(sem
.wait(duration_cast
<semaphore::duration
>(1ms
)));
677 BOOST_REQUIRE_THROW(f
.get(), semaphore_timed_out
);
680 SEASTAR_TEST_CASE(test_non_void_as_future
) {
681 auto f
= co_await
coroutine::as_future(make_ready_future
<int>(42));
682 BOOST_REQUIRE_EQUAL(f
.get0(), 42);
684 f
= co_await
coroutine::as_future(make_exception_future
<int>(std::runtime_error("exception")));
685 BOOST_REQUIRE_THROW(f
.get(), std::runtime_error
);
687 auto p
= promise
<int>();
688 (void)sleep(1ms
).then([&] { p
.set_value(314); });
689 f
= co_await
coroutine::as_future(p
.get_future());
690 BOOST_REQUIRE_EQUAL(f
.get0(), 314);
692 auto gen_exception
= [] () -> future
<int> {
694 co_return
coroutine::exception(std::make_exception_ptr(std::runtime_error("exception")));
696 f
= co_await
coroutine::as_future(gen_exception());
697 BOOST_REQUIRE_THROW(f
.get(), std::runtime_error
);
700 SEASTAR_TEST_CASE(test_non_void_as_future_without_preemption_check
) {
701 auto f
= co_await
coroutine::as_future_without_preemption_check(make_ready_future
<int>(42));
702 BOOST_REQUIRE_EQUAL(f
.get0(), 42);
704 f
= co_await
coroutine::as_future_without_preemption_check(make_exception_future
<int>(std::runtime_error("exception")));
705 BOOST_REQUIRE_THROW(f
.get(), std::runtime_error
);
707 auto p
= promise
<int>();
708 (void)sleep(1ms
).then([&] { p
.set_value(314); });
709 f
= co_await
coroutine::as_future_without_preemption_check(p
.get_future());
710 BOOST_REQUIRE_EQUAL(f
.get0(), 314);
712 auto gen_exception
= [] () -> future
<int> {
714 co_return
coroutine::exception(std::make_exception_ptr(std::runtime_error("exception")));
716 f
= co_await
coroutine::as_future_without_preemption_check(gen_exception());
717 BOOST_REQUIRE_THROW(f
.get(), std::runtime_error
);
720 SEASTAR_TEST_CASE(test_as_future_preemption
) {
723 auto get_ready_future
= [&] {
724 return stop
? make_exception_future
<>(std::runtime_error("exception")) : make_ready_future
<>();
727 auto wait_for_stop
= [&] () -> future
<bool> {
729 auto f
= co_await
coroutine::as_future(get_ready_future());
731 co_return
coroutine::exception(f
.get_exception());
736 auto f0
= wait_for_stop();
738 auto set_stop
= [&] () -> future
<> {
741 if (f0
.available()) {
744 co_await
coroutine::maybe_yield();
750 BOOST_REQUIRE_THROW(f0
.get(), std::runtime_error
);
755 template<template<typename
> class Container
>
756 coroutine::experimental::generator
<int, Container
>
757 fibonacci_sequence(coroutine::experimental::buffer_size_t size
, unsigned count
) {
759 for (unsigned i
= 0; i
< count
; ++i
) {
760 if (std::numeric_limits
<decltype(a
)>::max() - a
< b
) {
761 throw std::out_of_range(
762 fmt::format("fibonacci[{}] is greater than the largest value of int", i
));
764 co_yield
std::exchange(a
, std::exchange(b
, a
+ b
));
768 template<template<typename
> class Container
>
769 seastar::future
<> test_async_generator_drained() {
770 auto expected_fibs
= {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55};
771 auto fib
= fibonacci_sequence
<Container
>(coroutine::experimental::buffer_size_t
{2},
772 std::size(expected_fibs
));
773 for (auto expected_fib
: expected_fibs
) {
774 auto actual_fib
= co_await
fib();
775 BOOST_REQUIRE(actual_fib
.has_value());
776 BOOST_REQUIRE_EQUAL(actual_fib
.value(), expected_fib
);
778 auto sentinel
= co_await
fib();
779 BOOST_REQUIRE(!sentinel
.has_value());
783 using buffered_container
= circular_buffer
<T
>;
785 SEASTAR_TEST_CASE(test_async_generator_drained_buffered
) {
786 return test_async_generator_drained
<buffered_container
>();
789 SEASTAR_TEST_CASE(test_async_generator_drained_unbuffered
) {
790 return test_async_generator_drained
<std::optional
>();
793 template<template<typename
> class Container
>
794 seastar::future
<> test_async_generator_not_drained() {
795 auto fib
= fibonacci_sequence
<Container
>(coroutine::experimental::buffer_size_t
{2},
797 auto actual_fib
= co_await
fib();
798 BOOST_REQUIRE(actual_fib
.has_value());
799 BOOST_REQUIRE_EQUAL(actual_fib
.value(), 0);
802 SEASTAR_TEST_CASE(test_async_generator_not_drained_buffered
) {
803 return test_async_generator_not_drained
<buffered_container
>();
806 SEASTAR_TEST_CASE(test_async_generator_not_drained_unbuffered
) {
807 return test_async_generator_not_drained
<std::optional
>();
813 counter_t(counter_t
&& other
) noexcept
814 : n
{std::exchange(other
.n
, -1)},
815 count
{std::exchange(other
.count
, nullptr)}
817 counter_t(int n
, int* count
) noexcept
818 : n
{n
}, count
{count
} {
821 ~counter_t() noexcept
{
827 std::ostream
& operator<<(std::ostream
& os
, const counter_t
& c
) {
831 template<template<typename
> class Container
>
832 coroutine::experimental::generator
<counter_t
, Container
>
833 fiddle(coroutine::experimental::buffer_size_t size
, int n
, int* total
) {
837 throw std::invalid_argument("Eureka from generator!");
839 co_yield counter_t
{i
, total
};
843 template<template<typename
> class Container
>
844 seastar::future
<> test_async_generator_throws_from_generator() {
846 auto count_to
= [total
=&total
](unsigned n
) -> seastar::future
<> {
847 auto count
= fiddle
<Container
>(coroutine::experimental::buffer_size_t
{2},
849 for (unsigned i
= 0; i
< 2 * n
; i
++) {
853 co_await
count_to(42).then_wrapped([&total
] (auto f
) {
854 BOOST_REQUIRE(f
.failed());
855 BOOST_REQUIRE_THROW(std::rethrow_exception(f
.get_exception()), std::invalid_argument
);
856 BOOST_REQUIRE_EQUAL(total
, 0);
860 SEASTAR_TEST_CASE(test_async_generator_throws_from_generator_buffered
) {
861 return test_async_generator_throws_from_generator
<buffered_container
>();
864 SEASTAR_TEST_CASE(test_async_generator_throws_from_generator_unbuffered
) {
865 return test_async_generator_throws_from_generator
<std::optional
>();
868 template<template<typename
> class Container
>
869 seastar::future
<> test_async_generator_throws_from_consumer() {
871 auto count_to
= [total
=&total
](unsigned n
) -> seastar::future
<> {
872 auto count
= fiddle
<Container
>(coroutine::experimental::buffer_size_t
{2},
874 for (unsigned i
= 0; i
< n
; i
++) {
876 throw std::invalid_argument("Eureka from consumer!");
881 co_await
count_to(42).then_wrapped([&total
] (auto f
) {
882 BOOST_REQUIRE(f
.failed());
883 BOOST_REQUIRE_THROW(std::rethrow_exception(f
.get_exception()), std::invalid_argument
);
884 BOOST_REQUIRE_EQUAL(total
, 0);
888 SEASTAR_TEST_CASE(test_async_generator_throws_from_consumer_buffered
) {
889 return test_async_generator_throws_from_consumer
<buffered_container
>();
892 SEASTAR_TEST_CASE(test_async_generator_throws_from_consumer_unbuffered
) {
893 return test_async_generator_throws_from_consumer
<std::optional
>();
898 SEASTAR_TEST_CASE(test_lambda_coroutine_in_continuation
) {
899 auto dist
= std::uniform_real_distribution
<>(0.0, 1.0);
900 auto rand_eng
= std::default_random_engine(std::random_device()());
901 double n
= dist(rand_eng
);
902 auto sin1
= std::sin(n
); // avoid optimizer tricks
903 auto boo
= std::array
<char, 1025>(); // bias coroutine size towards 1024 size class
904 auto sin2
= co_await
yield().then(coroutine::lambda([n
, boo
] () -> future
<double> {
905 // Expect coroutine capture to be freed after co_await without coroutine::lambda
907 // Try to overwrite recently-release coroutine frame by allocating in similar size-class
908 std::vector
<char*> garbage
;
909 for (size_t sz
= 1024; sz
< 2048; ++sz
) {
910 for (int ctr
= 0; ctr
< 100; ++ctr
) {
911 auto p
= static_cast<char*>(malloc(sz
));
912 std::memset(p
, 0, sz
);
913 garbage
.push_back(p
);
916 for (auto p
: garbage
) {
920 co_return
std::sin(n
);
922 BOOST_REQUIRE_EQUAL(sin1
, sin2
);