]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/tests/unit/coroutines_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / tests / unit / coroutines_test.cc
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2019 ScyllaDB Ltd.
20 */
21
1e59de90
TL
22#include <exception>
23#include <numeric>
24#include <ranges>
25
26#include <seastar/core/circular_buffer.hh>
9f95a23c
TL
27#include <seastar/core/future-util.hh>
28#include <seastar/testing/test_case.hh>
20effc67 29#include <seastar/core/sleep.hh>
1e59de90
TL
30#include <seastar/util/later.hh>
31#include <seastar/core/thread.hh>
32#include <seastar/testing/random.hh>
9f95a23c
TL
33
34using namespace seastar;
20effc67 35using namespace std::chrono_literals;
9f95a23c 36
f67539c2
TL
37#ifndef SEASTAR_COROUTINES_ENABLED
38
39SEASTAR_TEST_CASE(test_coroutines_not_compiled_in) {
40 return make_ready_future<>();
41}
42
43#else
44
45#include <seastar/core/coroutine.hh>
20effc67
TL
46#include <seastar/coroutine/all.hh>
47#include <seastar/coroutine/maybe_yield.hh>
1e59de90
TL
48#include <seastar/coroutine/switch_to.hh>
49#include <seastar/coroutine/parallel_for_each.hh>
50#include <seastar/coroutine/as_future.hh>
51#include <seastar/coroutine/exception.hh>
52#include <seastar/coroutine/generator.hh>
f67539c2 53
9f95a23c
TL
54namespace {
55
56future<int> old_fashioned_continuations() {
1e59de90 57 return yield().then([] {
9f95a23c
TL
58 return 42;
59 });
60}
61
62future<int> simple_coroutine() {
1e59de90 63 co_await yield();
9f95a23c
TL
64 co_return 53;
65}
66
67future<int> ready_coroutine() {
68 co_return 64;
69}
70
f67539c2 71future<std::tuple<int, double>> tuple_coroutine() {
9f95a23c
TL
72 co_return std::tuple(1, 2.);
73}
74
75future<int> failing_coroutine() {
1e59de90 76 co_await yield();
9f95a23c
TL
77 throw 42;
78}
79
20effc67
TL
80[[gnu::noinline]] int throw_exception(int x) {
81 throw x;
82}
83
84future<int> failing_coroutine2() noexcept {
1e59de90 85 co_await yield();
20effc67
TL
86 co_return throw_exception(17);
87}
88
9f95a23c
TL
89}
90
91SEASTAR_TEST_CASE(test_simple_coroutines) {
92 BOOST_REQUIRE_EQUAL(co_await old_fashioned_continuations(), 42);
93 BOOST_REQUIRE_EQUAL(co_await simple_coroutine(), 53);
94 BOOST_REQUIRE_EQUAL(ready_coroutine().get0(), 64);
95 BOOST_REQUIRE(co_await tuple_coroutine() == std::tuple(1, 2.));
96 BOOST_REQUIRE_EXCEPTION((void)co_await failing_coroutine(), int, [] (auto v) { return v == 42; });
20effc67
TL
97 BOOST_CHECK_EQUAL(co_await failing_coroutine().then_wrapped([] (future<int> f) -> future<int> {
98 BOOST_REQUIRE(f.failed());
99 try {
100 std::rethrow_exception(f.get_exception());
101 } catch (int v) {
102 co_return v;
103 }
104 }), 42);
105 BOOST_REQUIRE_EXCEPTION((void)co_await failing_coroutine2(), int, [] (auto v) { return v == 17; });
106 BOOST_CHECK_EQUAL(co_await failing_coroutine2().then_wrapped([] (future<int> f) -> future<int> {
107 BOOST_REQUIRE(f.failed());
108 try {
109 std::rethrow_exception(f.get_exception());
110 } catch (int v) {
111 co_return v;
112 }
113 }), 17);
f67539c2
TL
114}
115
9f95a23c 116SEASTAR_TEST_CASE(test_abandond_coroutine) {
f67539c2 117 std::optional<future<int>> f;
9f95a23c
TL
118 {
119 auto p1 = promise<>();
120 auto p2 = promise<>();
121 auto p3 = promise<>();
122 f = p1.get_future().then([&] () -> future<int> {
123 p2.set_value();
124 BOOST_CHECK_THROW(co_await p3.get_future(), broken_promise);
125 co_return 1;
126 });
127 p1.set_value();
128 co_await p2.get_future();
129 }
130 BOOST_CHECK_EQUAL(co_await std::move(*f), 1);
131}
132
133SEASTAR_TEST_CASE(test_scheduling_group) {
134 auto other_sg = co_await create_scheduling_group("the other group", 10.f);
1e59de90
TL
135 std::exception_ptr ex;
136
137 try {
138 auto p1 = promise<>();
139 auto p2 = promise<>();
140
141 auto p1b = promise<>();
142 auto p2b = promise<>();
143 auto f1 = p1b.get_future();
144 auto f2 = p2b.get_future();
145
146 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
147 auto f_ret = with_scheduling_group(other_sg,
148 [other_sg_cap = other_sg] (future<> f1, future<> f2, promise<> p1, promise<> p2) -> future<int> {
149 // Make a copy in the coroutine before the lambda is destroyed.
150 auto other_sg = other_sg_cap;
151 BOOST_REQUIRE(current_scheduling_group() == other_sg);
152 BOOST_REQUIRE(other_sg == other_sg);
153 p1.set_value();
154 co_await std::move(f1);
155 BOOST_REQUIRE(current_scheduling_group() == other_sg);
156 p2.set_value();
157 co_await std::move(f2);
158 BOOST_REQUIRE(current_scheduling_group() == other_sg);
159 co_return 42;
160 }, p1.get_future(), p2.get_future(), std::move(p1b), std::move(p2b));
9f95a23c 161
9f95a23c 162 co_await std::move(f1);
1e59de90
TL
163 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
164 p1.set_value();
9f95a23c 165 co_await std::move(f2);
1e59de90
TL
166 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
167 p2.set_value();
168 BOOST_REQUIRE_EQUAL(co_await std::move(f_ret), 42);
169 BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group());
170 } catch (...) {
171 ex = std::current_exception();
172 }
173 co_await destroy_scheduling_group(other_sg);
174 if (ex) {
175 std::rethrow_exception(std::move(ex));
176 }
177}
178
179future<scheduling_group> switch_to_with_context(scheduling_group& sg) {
180 scheduling_group new_sg = co_await coroutine::switch_to(sg);
181 BOOST_REQUIRE(current_scheduling_group() == sg);
182 co_return new_sg;
183}
184
185SEASTAR_TEST_CASE(test_switch_to) {
186 auto other_sg0 = co_await create_scheduling_group("other group 0", 10.f);
187 auto other_sg1 = co_await create_scheduling_group("other group 1", 10.f);
188 auto other_sg2 = co_await create_scheduling_group("other group 2", 10.f);
189 std::exception_ptr ex;
190
191 try {
192 auto base_sg = current_scheduling_group();
193
194 auto prev_sg = co_await coroutine::switch_to(other_sg0);
195 BOOST_REQUIRE(current_scheduling_group() == other_sg0);
196 BOOST_REQUIRE(prev_sg == base_sg);
9f95a23c 197
1e59de90
TL
198 auto same_sg = co_await coroutine::switch_to(other_sg0);
199 BOOST_REQUIRE(current_scheduling_group() == other_sg0);
200 BOOST_REQUIRE(same_sg == other_sg0);
201
202 auto nested_sg = co_await coroutine::switch_to(other_sg1);
203 BOOST_REQUIRE(current_scheduling_group() == other_sg1);
204 BOOST_REQUIRE(nested_sg == other_sg0);
205
206 co_await switch_to_with_context(other_sg2);
207
208 co_await coroutine::switch_to(base_sg);
209 BOOST_REQUIRE(current_scheduling_group() == base_sg);
210 } catch (...) {
211 ex = std::current_exception();
212 }
213
214 co_await destroy_scheduling_group(other_sg1);
215 co_await destroy_scheduling_group(other_sg0);
216 if (ex) {
217 std::rethrow_exception(std::move(ex));
218 }
219}
220
221future<> check_thread_inherits_sg_from_coroutine_frame(scheduling_group expected_sg) {
222 return seastar::async([expected_sg] {
223 BOOST_REQUIRE(current_scheduling_group() == expected_sg);
224 });
225}
226
227future<> check_coroutine_inherits_sg_from_another_one(scheduling_group expected_sg) {
228 co_await yield();
229 BOOST_REQUIRE(current_scheduling_group() == expected_sg);
230}
231
232future<> switch_to_sg_and_perform_inheriting_checks(scheduling_group base_sg, scheduling_group new_sg) {
233 BOOST_REQUIRE(current_scheduling_group() == base_sg);
234 co_await coroutine::switch_to(new_sg);
235 BOOST_REQUIRE(current_scheduling_group() == new_sg);
236
237 co_await check_thread_inherits_sg_from_coroutine_frame(new_sg);
238 co_await check_coroutine_inherits_sg_from_another_one(new_sg);
239
240 // don't restore previous sg on purpose, expecting it will be restored once coroutine goes out of scope
241}
242
243SEASTAR_TEST_CASE(test_switch_to_sg_restoration_and_inheriting) {
244 auto new_sg = co_await create_scheduling_group("other group 0", 10.f);
245 std::exception_ptr ex;
246
247 try {
248 auto base_sg = current_scheduling_group();
249
250 co_await switch_to_sg_and_perform_inheriting_checks(base_sg, new_sg);
251 // seastar automatically restores base_sg once it goes out of coroutine frame
252 BOOST_REQUIRE(current_scheduling_group() == base_sg);
253
254 co_await seastar::async([base_sg, new_sg] {
255 switch_to_sg_and_perform_inheriting_checks(base_sg, new_sg).get();
256 BOOST_REQUIRE(current_scheduling_group() == base_sg);
257 });
258
259 co_await switch_to_sg_and_perform_inheriting_checks(base_sg, new_sg).finally([base_sg] {
260 BOOST_REQUIRE(current_scheduling_group() == base_sg);
261 });
262 } catch (...) {
263 ex = std::current_exception();
264 }
265
266 co_await destroy_scheduling_group(new_sg);
267 if (ex) {
268 std::rethrow_exception(std::move(ex));
269 }
9f95a23c 270}
f67539c2 271
20effc67
TL
272SEASTAR_TEST_CASE(test_preemption) {
273 bool x = false;
274 unsigned preempted = 0;
1e59de90 275 auto f = yield().then([&x] {
20effc67
TL
276 x = true;
277 });
278
279 // try to preempt 1000 times. 1 should be enough if not for
280 // task queue shaffling in debug mode which may cause co-routine
281 // continuation to run first.
282 while(preempted < 1000 && !x) {
283 preempted += need_preempt();
284 co_await make_ready_future<>();
285 }
286 auto save_x = x;
1e59de90 287 // wait for yield() to complete
20effc67
TL
288 co_await std::move(f);
289 BOOST_REQUIRE(save_x);
290 co_return;
291}
292
1e59de90
TL
293SEASTAR_TEST_CASE(test_no_preemption) {
294 bool x = false;
295 unsigned preempted = 0;
296 auto f = yield().then([&x] {
297 x = true;
298 });
299
300 // preemption should not happen, we explicitly asked for continuing if possible
301 while(preempted < 1000 && !x) {
302 preempted += need_preempt();
303 co_await coroutine::without_preemption_check(make_ready_future<>());
304 }
305 auto save_x = x;
306 // wait for yield() to complete
307 co_await std::move(f);
308 BOOST_REQUIRE(!save_x);
309 co_return;
310}
311
20effc67
TL
312SEASTAR_TEST_CASE(test_all_simple) {
313 auto [a, b] = co_await coroutine::all(
314 [] { return make_ready_future<int>(1); },
315 [] { return make_ready_future<int>(2); }
316 );
317 BOOST_REQUIRE_EQUAL(a, 1);
318 BOOST_REQUIRE_EQUAL(b, 2);
319}
320
321SEASTAR_TEST_CASE(test_all_permutations) {
322 std::vector<std::chrono::milliseconds> delays = { 0ms, 0ms, 2ms, 2ms, 4ms, 6ms };
323 auto make_delayed_future_returning_nr = [&] (int nr) {
324 return [=] {
325 auto delay = delays[nr];
326 return delay == 0ms ? make_ready_future<int>(nr) : sleep(delay).then([nr] { return make_ready_future<int>(nr); });
327 };
328 };
329 do {
330 auto [a, b, c, d, e, f] = co_await coroutine::all(
331 make_delayed_future_returning_nr(0),
332 make_delayed_future_returning_nr(1),
333 make_delayed_future_returning_nr(2),
334 make_delayed_future_returning_nr(3),
335 make_delayed_future_returning_nr(4),
336 make_delayed_future_returning_nr(5)
337 );
338 BOOST_REQUIRE_EQUAL(a, 0);
339 BOOST_REQUIRE_EQUAL(b, 1);
340 BOOST_REQUIRE_EQUAL(c, 2);
341 BOOST_REQUIRE_EQUAL(d, 3);
342 BOOST_REQUIRE_EQUAL(e, 4);
343 BOOST_REQUIRE_EQUAL(f, 5);
344 } while (std::ranges::next_permutation(delays).found);
345}
346
347SEASTAR_TEST_CASE(test_all_ready_exceptions) {
348 try {
349 co_await coroutine::all(
350 [] () -> future<> { throw 1; },
351 [] () -> future<> { throw 2; }
352 );
353 } catch (int e) {
354 BOOST_REQUIRE(e == 1 || e == 2);
355 }
356}
357
358SEASTAR_TEST_CASE(test_all_nonready_exceptions) {
359 try {
360 co_await coroutine::all(
361 [] () -> future<> {
362 co_await sleep(1ms);
363 throw 1;
364 },
365 [] () -> future<> {
366 co_await sleep(1ms);
367 throw 2;
368 }
369 );
370 } catch (int e) {
371 BOOST_REQUIRE(e == 1 || e == 2);
372 }
373}
374
375SEASTAR_TEST_CASE(test_all_heterogeneous_types) {
376 auto [a, b] = co_await coroutine::all(
377 [] () -> future<int> {
378 co_await sleep(1ms);
379 co_return 1;
380 },
381 [] () -> future<> {
382 co_await sleep(1ms);
383 },
384 [] () -> future<long> {
385 co_await sleep(1ms);
386 co_return 2L;
387 }
388 );
389 BOOST_REQUIRE_EQUAL(a, 1);
390 BOOST_REQUIRE_EQUAL(b, 2L);
391}
392
393SEASTAR_TEST_CASE(test_all_noncopyable_types) {
394 auto [a] = co_await coroutine::all(
395 [] () -> future<std::unique_ptr<int>> {
396 co_return std::make_unique<int>(6);
397 }
398 );
399 BOOST_REQUIRE_EQUAL(*a, 6);
400}
401
402SEASTAR_TEST_CASE(test_all_throw_in_input_func) {
403 int nr_completed = 0;
404 bool exception_seen = false;
405 try {
406 co_await coroutine::all(
407 [&] () -> future<int> {
408 co_await sleep(1ms);
409 ++nr_completed;
410 co_return 7;
411 },
412 [&] () -> future<int> {
413 throw 9;
414 },
415 [&] () -> future<int> {
416 co_await sleep(1ms);
417 ++nr_completed;
418 co_return 7;
419 }
420 );
421 } catch (int n) {
422 BOOST_REQUIRE_EQUAL(n, 9);
423 exception_seen = true;
424 }
425 BOOST_REQUIRE_EQUAL(nr_completed, 2);
426 BOOST_REQUIRE(exception_seen);
427}
428
1e59de90
TL
429struct counter_ref {
430private:
431 int& _counter;
432
433public:
434 explicit counter_ref(int& cnt)
435 : _counter(cnt) {
436 ++_counter;
437 }
438
439 ~counter_ref() {
440 --_counter;
441 }
442};
443
444template<typename Ex>
445static future<> check_coroutine_throws(auto fun) {
446 // The counter keeps track of the number of alive "counter_ref" objects.
447 // If it is not zero then it means that some destructors weren't run
448 // while quitting the coroutine.
449 int counter = 0;
450 BOOST_REQUIRE_THROW(co_await fun(counter), Ex);
451 BOOST_REQUIRE_EQUAL(counter, 0);
452 co_await fun(counter).then_wrapped([&counter] (auto f) {
453 BOOST_REQUIRE(f.failed());
454 BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), Ex);
455 BOOST_REQUIRE_EQUAL(counter, 0);
456 });
457}
458
20effc67 459SEASTAR_TEST_CASE(test_coroutine_exception) {
1e59de90
TL
460 co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<int> {
461 counter_ref ref{counter};
20effc67 462 co_return coroutine::exception(std::make_exception_ptr(std::runtime_error("threw")));
20effc67 463 });
1e59de90
TL
464 co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<int> {
465 counter_ref ref{counter};
466 co_await coroutine::exception(std::make_exception_ptr(std::runtime_error("threw")));
467 co_return 42;
468 });
469 co_await check_coroutine_throws<std::logic_error>([] (int& counter) -> future<> {
470 counter_ref ref{counter};
471 co_await coroutine::return_exception(std::logic_error("threw"));
472 co_return;
473 });
474 co_await check_coroutine_throws<int>([] (int& counter) -> future<> {
475 counter_ref ref{counter};
476 co_await coroutine::return_exception(42);
477 co_return;
478 });
479}
20effc67 480
1e59de90
TL
481SEASTAR_TEST_CASE(test_coroutine_return_exception_ptr) {
482 co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<> {
483 co_await coroutine::return_exception(std::runtime_error("threw"));
484 });
485 co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<> {
486 auto ex = std::make_exception_ptr(std::runtime_error("threw"));
487 co_await coroutine::return_exception_ptr(std::move(ex));
488 });
489 co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<> {
490 auto ex = std::make_exception_ptr(std::runtime_error("threw"));
491 co_await coroutine::return_exception_ptr(ex);
492 });
493 co_await check_coroutine_throws<int>([] (int& counter) -> future<> {
494 co_await coroutine::return_exception_ptr(std::make_exception_ptr(3));
20effc67
TL
495 });
496}
497
498SEASTAR_TEST_CASE(test_maybe_yield) {
499 int var = 0;
500 bool done = false;
501 auto spinner = [&] () -> future<> {
502 // increment a variable continuously, but yield so an observer can see it.
503 while (!done) {
504 ++var;
505 co_await coroutine::maybe_yield();
506 }
507 };
508 auto spinner_fut = spinner();
509 int snapshot = var;
510 for (int nr_changes = 0; nr_changes < 10; ++nr_changes) {
511 // Try to observe the value changing in time, yield to
512 // allow the spinner to advance it.
513 while (snapshot == var) {
514 co_await coroutine::maybe_yield();
515 }
516 snapshot = var;
517 }
518 done = true;
519 co_await std::move(spinner_fut);
520 BOOST_REQUIRE(true); // the test will hang if it doesn't work.
521}
522
1e59de90
TL
523#if __has_include(<coroutine>) && !defined(__clang__)
524
525#include "tl-generator.hh"
526tl::generator<int> simple_generator(int max)
527{
528 for (int i = 0; i < max; ++i) {
529 co_yield i;
530 }
531}
532
533SEASTAR_TEST_CASE(generator)
534{
535 // test ability of seastar::parallel_for_each to deal with move-only views
536 int accum = 0;
537 co_await seastar::parallel_for_each(simple_generator(10), [&](int i) {
538 accum += i;
539 return seastar::make_ready_future<>();
540 });
541 BOOST_REQUIRE_EQUAL(accum, 45);
542
543 // test ability of seastar::max_concurrent_for_each to deal with move-only views
544 accum = 0;
545 co_await seastar::max_concurrent_for_each(simple_generator(10), 10, [&](int i) {
546 accum += i;
547 return seastar::make_ready_future<>();
548 });
549 BOOST_REQUIRE_EQUAL(accum, 45);
550}
551
552#endif
553
554SEASTAR_TEST_CASE(test_parallel_for_each_empty) {
555 std::vector<int> values;
556 int count = 0;
557
558 co_await coroutine::parallel_for_each(values, [&] (int x) {
559 ++count;
560 });
561 BOOST_REQUIRE_EQUAL(count, 0); // the test will hang if it doesn't work.
562}
563
564SEASTAR_TEST_CASE(test_parallel_for_each_exception) {
565 std::array<int, 5> values = { 10, 2, 1, 4, 8 };
566 int count = 0;
567 auto& eng = testing::local_random_engine;
568 auto dist = std::uniform_int_distribution<unsigned>();
569 int throw_at = dist(eng) % values.size();
570
571 BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at, values.size()));
572
573 auto f0 = coroutine::parallel_for_each(values, [&] (int x) {
574 if (count++ == throw_at) {
575 BOOST_TEST_MESSAGE("throw");
576 throw std::runtime_error("test");
577 }
578 });
579 // An exception thrown by the functor must be propagated
580 BOOST_REQUIRE_THROW(co_await std::move(f0), std::runtime_error);
581 // Functor must be called on all values, even if there's an exception
582 BOOST_REQUIRE_EQUAL(count, values.size());
583
584 count = 0;
585 throw_at = dist(eng) % values.size();
586 BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at, values.size()));
587
588 auto f1 = coroutine::parallel_for_each(values, [&] (int x) -> future<> {
589 co_await sleep(std::chrono::milliseconds(x));
590 if (count++ == throw_at) {
591 throw std::runtime_error("test");
592 }
593 });
594 BOOST_REQUIRE_THROW(co_await std::move(f1), std::runtime_error);
595 BOOST_REQUIRE_EQUAL(count, values.size());
596}
597
598SEASTAR_TEST_CASE(test_parallel_for_each) {
599 std::vector<int> values = { 3, 1, 4 };
600 int sum_of_squares = 0;
601
602 int expected = std::accumulate(values.begin(), values.end(), 0, [] (int sum, int x) {
603 return sum + x * x;
604 });
605
606 // Test all-ready futures
607 co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) {
608 sum_of_squares += x * x;
609 });
610 BOOST_REQUIRE_EQUAL(sum_of_squares, expected);
611
612 // Test non-ready futures
613 sum_of_squares = 0;
614 co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) -> future<> {
615 if (x > 1) {
616 co_await sleep(std::chrono::milliseconds(x));
617 }
618 sum_of_squares += x * x;
619 });
620 BOOST_REQUIRE_EQUAL(sum_of_squares, expected);
621
622 // Test legacy subrange
623 sum_of_squares = 0;
624 co_await coroutine::parallel_for_each(values.begin(), values.end() - 1, [&sum_of_squares] (int x) -> future<> {
625 if (x > 1) {
626 co_await sleep(std::chrono::milliseconds(x));
627 }
628 sum_of_squares += x * x;
629 });
630 BOOST_REQUIRE_EQUAL(sum_of_squares, 10);
631
632 // clang 13.0.1 doesn't support subrange
633 // so provide also a Iterator/Sentinel based constructor.
634 // See https://github.com/llvm/llvm-project/issues/46091
635#ifndef __clang__
636 // Test std::ranges::subrange
637 sum_of_squares = 0;
638 co_await coroutine::parallel_for_each(std::ranges::subrange(values.begin(), values.end() - 1), [&sum_of_squares] (int x) -> future<> {
639 if (x > 1) {
640 co_await sleep(std::chrono::milliseconds(x));
641 }
642 sum_of_squares += x * x;
643 });
644 BOOST_REQUIRE_EQUAL(sum_of_squares, 10);
645#endif
646}
647
648SEASTAR_TEST_CASE(test_void_as_future) {
649 auto f = co_await coroutine::as_future(make_ready_future<>());
650 BOOST_REQUIRE(f.available());
651
652 f = co_await coroutine::as_future(make_exception_future<>(std::runtime_error("exception")));
653 BOOST_REQUIRE_THROW(f.get(), std::runtime_error);
654
655 semaphore sem(0);
656 (void)sleep(1ms).then([&] { sem.signal(); });
657 f = co_await coroutine::as_future(sem.wait());
658 BOOST_REQUIRE(f.available());
659
660 f = co_await coroutine::as_future(sem.wait(duration_cast<semaphore::duration>(1ms)));
661 BOOST_REQUIRE_THROW(f.get(), semaphore_timed_out);
662}
663
664SEASTAR_TEST_CASE(test_void_as_future_without_preemption_check) {
665 auto f = co_await coroutine::as_future_without_preemption_check(make_ready_future<>());
666 BOOST_REQUIRE(f.available());
667
668 f = co_await coroutine::as_future_without_preemption_check(make_exception_future<>(std::runtime_error("exception")));
669 BOOST_REQUIRE_THROW(f.get(), std::runtime_error);
670
671 semaphore sem(0);
672 (void)sleep(1ms).then([&] { sem.signal(); });
673 f = co_await coroutine::as_future_without_preemption_check(sem.wait());
674 BOOST_REQUIRE(f.available());
675
676 f = co_await coroutine::as_future_without_preemption_check(sem.wait(duration_cast<semaphore::duration>(1ms)));
677 BOOST_REQUIRE_THROW(f.get(), semaphore_timed_out);
678}
679
680SEASTAR_TEST_CASE(test_non_void_as_future) {
681 auto f = co_await coroutine::as_future(make_ready_future<int>(42));
682 BOOST_REQUIRE_EQUAL(f.get0(), 42);
683
684 f = co_await coroutine::as_future(make_exception_future<int>(std::runtime_error("exception")));
685 BOOST_REQUIRE_THROW(f.get(), std::runtime_error);
686
687 auto p = promise<int>();
688 (void)sleep(1ms).then([&] { p.set_value(314); });
689 f = co_await coroutine::as_future(p.get_future());
690 BOOST_REQUIRE_EQUAL(f.get0(), 314);
691
692 auto gen_exception = [] () -> future<int> {
693 co_await sleep(1ms);
694 co_return coroutine::exception(std::make_exception_ptr(std::runtime_error("exception")));
695 };
696 f = co_await coroutine::as_future(gen_exception());
697 BOOST_REQUIRE_THROW(f.get(), std::runtime_error);
698}
699
700SEASTAR_TEST_CASE(test_non_void_as_future_without_preemption_check) {
701 auto f = co_await coroutine::as_future_without_preemption_check(make_ready_future<int>(42));
702 BOOST_REQUIRE_EQUAL(f.get0(), 42);
703
704 f = co_await coroutine::as_future_without_preemption_check(make_exception_future<int>(std::runtime_error("exception")));
705 BOOST_REQUIRE_THROW(f.get(), std::runtime_error);
706
707 auto p = promise<int>();
708 (void)sleep(1ms).then([&] { p.set_value(314); });
709 f = co_await coroutine::as_future_without_preemption_check(p.get_future());
710 BOOST_REQUIRE_EQUAL(f.get0(), 314);
711
712 auto gen_exception = [] () -> future<int> {
713 co_await sleep(1ms);
714 co_return coroutine::exception(std::make_exception_ptr(std::runtime_error("exception")));
715 };
716 f = co_await coroutine::as_future_without_preemption_check(gen_exception());
717 BOOST_REQUIRE_THROW(f.get(), std::runtime_error);
718}
719
720SEASTAR_TEST_CASE(test_as_future_preemption) {
721 bool stop = false;
722
723 auto get_ready_future = [&] {
724 return stop ? make_exception_future<>(std::runtime_error("exception")) : make_ready_future<>();
725 };
726
727 auto wait_for_stop = [&] () -> future<bool> {
728 for (;;) {
729 auto f = co_await coroutine::as_future(get_ready_future());
730 if (f.failed()) {
731 co_return coroutine::exception(f.get_exception());
732 }
733 }
734 };
735
736 auto f0 = wait_for_stop();
737
738 auto set_stop = [&] () -> future<> {
739 for (;;) {
740 stop = true;
741 if (f0.available()) {
742 co_return;
743 }
744 co_await coroutine::maybe_yield();
745 }
746 };
747
748 co_await set_stop();
749
750 BOOST_REQUIRE_THROW(f0.get(), std::runtime_error);
751}
752
753#ifndef __clang__
754
755template<template<typename> class Container>
756coroutine::experimental::generator<int, Container>
757fibonacci_sequence(coroutine::experimental::buffer_size_t size, unsigned count) {
758 auto a = 0, b = 1;
759 for (unsigned i = 0; i < count; ++i) {
760 if (std::numeric_limits<decltype(a)>::max() - a < b) {
761 throw std::out_of_range(
762 fmt::format("fibonacci[{}] is greater than the largest value of int", i));
763 }
764 co_yield std::exchange(a, std::exchange(b, a + b));
765 }
766}
767
768template<template<typename> class Container>
769seastar::future<> test_async_generator_drained() {
770 auto expected_fibs = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55};
771 auto fib = fibonacci_sequence<Container>(coroutine::experimental::buffer_size_t{2},
772 std::size(expected_fibs));
773 for (auto expected_fib : expected_fibs) {
774 auto actual_fib = co_await fib();
775 BOOST_REQUIRE(actual_fib.has_value());
776 BOOST_REQUIRE_EQUAL(actual_fib.value(), expected_fib);
777 }
778 auto sentinel = co_await fib();
779 BOOST_REQUIRE(!sentinel.has_value());
780}
781
782template<typename T>
783using buffered_container = circular_buffer<T>;
784
785SEASTAR_TEST_CASE(test_async_generator_drained_buffered) {
786 return test_async_generator_drained<buffered_container>();
787}
788
789SEASTAR_TEST_CASE(test_async_generator_drained_unbuffered) {
790 return test_async_generator_drained<std::optional>();
791}
792
793template<template<typename> class Container>
794seastar::future<> test_async_generator_not_drained() {
795 auto fib = fibonacci_sequence<Container>(coroutine::experimental::buffer_size_t{2},
796 42);
797 auto actual_fib = co_await fib();
798 BOOST_REQUIRE(actual_fib.has_value());
799 BOOST_REQUIRE_EQUAL(actual_fib.value(), 0);
800}
801
802SEASTAR_TEST_CASE(test_async_generator_not_drained_buffered) {
803 return test_async_generator_not_drained<buffered_container>();
804}
805
806SEASTAR_TEST_CASE(test_async_generator_not_drained_unbuffered) {
807 return test_async_generator_not_drained<std::optional>();
808}
809
810struct counter_t {
811 int n;
812 int* count;
813 counter_t(counter_t&& other) noexcept
814 : n{std::exchange(other.n, -1)},
815 count{std::exchange(other.count, nullptr)}
816 {}
817 counter_t(int n, int* count) noexcept
818 : n{n}, count{count} {
819 ++(*count);
820 }
821 ~counter_t() noexcept {
822 if (count) {
823 --(*count);
824 }
825 }
826};
827std::ostream& operator<<(std::ostream& os, const counter_t& c) {
828 return os << c.n;
829}
830
831template<template<typename> class Container>
832coroutine::experimental::generator<counter_t, Container>
833fiddle(coroutine::experimental::buffer_size_t size, int n, int* total) {
834 int i = 0;
835 while (true) {
836 if (i++ == n) {
837 throw std::invalid_argument("Eureka from generator!");
838 }
839 co_yield counter_t{i, total};
840 }
841}
842
843template<template<typename> class Container>
844seastar::future<> test_async_generator_throws_from_generator() {
845 int total = 0;
846 auto count_to = [total=&total](unsigned n) -> seastar::future<> {
847 auto count = fiddle<Container>(coroutine::experimental::buffer_size_t{2},
848 n, total);
849 for (unsigned i = 0; i < 2 * n; i++) {
850 co_await count();
851 }
852 };
853 co_await count_to(42).then_wrapped([&total] (auto f) {
854 BOOST_REQUIRE(f.failed());
855 BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), std::invalid_argument);
856 BOOST_REQUIRE_EQUAL(total, 0);
857 });
858}
859
860SEASTAR_TEST_CASE(test_async_generator_throws_from_generator_buffered) {
861 return test_async_generator_throws_from_generator<buffered_container>();
862}
863
864SEASTAR_TEST_CASE(test_async_generator_throws_from_generator_unbuffered) {
865 return test_async_generator_throws_from_generator<std::optional>();
866}
867
868template<template<typename> class Container>
869seastar::future<> test_async_generator_throws_from_consumer() {
870 int total = 0;
871 auto count_to = [total=&total](unsigned n) -> seastar::future<> {
872 auto count = fiddle<Container>(coroutine::experimental::buffer_size_t{2},
873 n, total);
874 for (unsigned i = 0; i < n; i++) {
875 if (i == n / 2) {
876 throw std::invalid_argument("Eureka from consumer!");
877 }
878 co_await count();
879 }
880 };
881 co_await count_to(42).then_wrapped([&total] (auto f) {
882 BOOST_REQUIRE(f.failed());
883 BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), std::invalid_argument);
884 BOOST_REQUIRE_EQUAL(total, 0);
885 });
886}
887
888SEASTAR_TEST_CASE(test_async_generator_throws_from_consumer_buffered) {
889 return test_async_generator_throws_from_consumer<buffered_container>();
890}
891
892SEASTAR_TEST_CASE(test_async_generator_throws_from_consumer_unbuffered) {
893 return test_async_generator_throws_from_consumer<std::optional>();
894}
895
896#endif
897
898SEASTAR_TEST_CASE(test_lambda_coroutine_in_continuation) {
899 auto dist = std::uniform_real_distribution<>(0.0, 1.0);
900 auto rand_eng = std::default_random_engine(std::random_device()());
901 double n = dist(rand_eng);
902 auto sin1 = std::sin(n); // avoid optimizer tricks
903 auto boo = std::array<char, 1025>(); // bias coroutine size towards 1024 size class
904 auto sin2 = co_await yield().then(coroutine::lambda([n, boo] () -> future<double> {
905 // Expect coroutine capture to be freed after co_await without coroutine::lambda
906 co_await yield();
907 // Try to overwrite recently-release coroutine frame by allocating in similar size-class
908 std::vector<char*> garbage;
909 for (size_t sz = 1024; sz < 2048; ++sz) {
910 for (int ctr = 0; ctr < 100; ++ctr) {
911 auto p = static_cast<char*>(malloc(sz));
912 std::memset(p, 0, sz);
913 garbage.push_back(p);
914 }
915 }
916 for (auto p : garbage) {
917 std::free(p);
918 }
919 (void)boo;
920 co_return std::sin(n);
921 }));
922 BOOST_REQUIRE_EQUAL(sin1, sin2);
923}
924
f67539c2 925#endif