]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2019 ScyllaDB Ltd. | |
20 | */ | |
21 | ||
1e59de90 TL |
22 | #include <exception> |
23 | #include <numeric> | |
24 | #include <ranges> | |
25 | ||
26 | #include <seastar/core/circular_buffer.hh> | |
9f95a23c TL |
27 | #include <seastar/core/future-util.hh> |
28 | #include <seastar/testing/test_case.hh> | |
20effc67 | 29 | #include <seastar/core/sleep.hh> |
1e59de90 TL |
30 | #include <seastar/util/later.hh> |
31 | #include <seastar/core/thread.hh> | |
32 | #include <seastar/testing/random.hh> | |
9f95a23c TL |
33 | |
34 | using namespace seastar; | |
20effc67 | 35 | using namespace std::chrono_literals; |
9f95a23c | 36 | |
f67539c2 TL |
37 | #ifndef SEASTAR_COROUTINES_ENABLED |
38 | ||
39 | SEASTAR_TEST_CASE(test_coroutines_not_compiled_in) { | |
40 | return make_ready_future<>(); | |
41 | } | |
42 | ||
43 | #else | |
44 | ||
45 | #include <seastar/core/coroutine.hh> | |
20effc67 TL |
46 | #include <seastar/coroutine/all.hh> |
47 | #include <seastar/coroutine/maybe_yield.hh> | |
1e59de90 TL |
48 | #include <seastar/coroutine/switch_to.hh> |
49 | #include <seastar/coroutine/parallel_for_each.hh> | |
50 | #include <seastar/coroutine/as_future.hh> | |
51 | #include <seastar/coroutine/exception.hh> | |
52 | #include <seastar/coroutine/generator.hh> | |
f67539c2 | 53 | |
9f95a23c TL |
54 | namespace { |
55 | ||
56 | future<int> old_fashioned_continuations() { | |
1e59de90 | 57 | return yield().then([] { |
9f95a23c TL |
58 | return 42; |
59 | }); | |
60 | } | |
61 | ||
62 | future<int> simple_coroutine() { | |
1e59de90 | 63 | co_await yield(); |
9f95a23c TL |
64 | co_return 53; |
65 | } | |
66 | ||
67 | future<int> ready_coroutine() { | |
68 | co_return 64; | |
69 | } | |
70 | ||
f67539c2 | 71 | future<std::tuple<int, double>> tuple_coroutine() { |
9f95a23c TL |
72 | co_return std::tuple(1, 2.); |
73 | } | |
74 | ||
75 | future<int> failing_coroutine() { | |
1e59de90 | 76 | co_await yield(); |
9f95a23c TL |
77 | throw 42; |
78 | } | |
79 | ||
20effc67 TL |
80 | [[gnu::noinline]] int throw_exception(int x) { |
81 | throw x; | |
82 | } | |
83 | ||
84 | future<int> failing_coroutine2() noexcept { | |
1e59de90 | 85 | co_await yield(); |
20effc67 TL |
86 | co_return throw_exception(17); |
87 | } | |
88 | ||
9f95a23c TL |
89 | } |
90 | ||
91 | SEASTAR_TEST_CASE(test_simple_coroutines) { | |
92 | BOOST_REQUIRE_EQUAL(co_await old_fashioned_continuations(), 42); | |
93 | BOOST_REQUIRE_EQUAL(co_await simple_coroutine(), 53); | |
94 | BOOST_REQUIRE_EQUAL(ready_coroutine().get0(), 64); | |
95 | BOOST_REQUIRE(co_await tuple_coroutine() == std::tuple(1, 2.)); | |
96 | BOOST_REQUIRE_EXCEPTION((void)co_await failing_coroutine(), int, [] (auto v) { return v == 42; }); | |
20effc67 TL |
97 | BOOST_CHECK_EQUAL(co_await failing_coroutine().then_wrapped([] (future<int> f) -> future<int> { |
98 | BOOST_REQUIRE(f.failed()); | |
99 | try { | |
100 | std::rethrow_exception(f.get_exception()); | |
101 | } catch (int v) { | |
102 | co_return v; | |
103 | } | |
104 | }), 42); | |
105 | BOOST_REQUIRE_EXCEPTION((void)co_await failing_coroutine2(), int, [] (auto v) { return v == 17; }); | |
106 | BOOST_CHECK_EQUAL(co_await failing_coroutine2().then_wrapped([] (future<int> f) -> future<int> { | |
107 | BOOST_REQUIRE(f.failed()); | |
108 | try { | |
109 | std::rethrow_exception(f.get_exception()); | |
110 | } catch (int v) { | |
111 | co_return v; | |
112 | } | |
113 | }), 17); | |
f67539c2 TL |
114 | } |
115 | ||
9f95a23c | 116 | SEASTAR_TEST_CASE(test_abandond_coroutine) { |
f67539c2 | 117 | std::optional<future<int>> f; |
9f95a23c TL |
118 | { |
119 | auto p1 = promise<>(); | |
120 | auto p2 = promise<>(); | |
121 | auto p3 = promise<>(); | |
122 | f = p1.get_future().then([&] () -> future<int> { | |
123 | p2.set_value(); | |
124 | BOOST_CHECK_THROW(co_await p3.get_future(), broken_promise); | |
125 | co_return 1; | |
126 | }); | |
127 | p1.set_value(); | |
128 | co_await p2.get_future(); | |
129 | } | |
130 | BOOST_CHECK_EQUAL(co_await std::move(*f), 1); | |
131 | } | |
132 | ||
133 | SEASTAR_TEST_CASE(test_scheduling_group) { | |
134 | auto other_sg = co_await create_scheduling_group("the other group", 10.f); | |
1e59de90 TL |
135 | std::exception_ptr ex; |
136 | ||
137 | try { | |
138 | auto p1 = promise<>(); | |
139 | auto p2 = promise<>(); | |
140 | ||
141 | auto p1b = promise<>(); | |
142 | auto p2b = promise<>(); | |
143 | auto f1 = p1b.get_future(); | |
144 | auto f2 = p2b.get_future(); | |
145 | ||
146 | BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group()); | |
147 | auto f_ret = with_scheduling_group(other_sg, | |
148 | [other_sg_cap = other_sg] (future<> f1, future<> f2, promise<> p1, promise<> p2) -> future<int> { | |
149 | // Make a copy in the coroutine before the lambda is destroyed. | |
150 | auto other_sg = other_sg_cap; | |
151 | BOOST_REQUIRE(current_scheduling_group() == other_sg); | |
152 | BOOST_REQUIRE(other_sg == other_sg); | |
153 | p1.set_value(); | |
154 | co_await std::move(f1); | |
155 | BOOST_REQUIRE(current_scheduling_group() == other_sg); | |
156 | p2.set_value(); | |
157 | co_await std::move(f2); | |
158 | BOOST_REQUIRE(current_scheduling_group() == other_sg); | |
159 | co_return 42; | |
160 | }, p1.get_future(), p2.get_future(), std::move(p1b), std::move(p2b)); | |
9f95a23c | 161 | |
9f95a23c | 162 | co_await std::move(f1); |
1e59de90 TL |
163 | BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group()); |
164 | p1.set_value(); | |
9f95a23c | 165 | co_await std::move(f2); |
1e59de90 TL |
166 | BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group()); |
167 | p2.set_value(); | |
168 | BOOST_REQUIRE_EQUAL(co_await std::move(f_ret), 42); | |
169 | BOOST_REQUIRE(current_scheduling_group() == default_scheduling_group()); | |
170 | } catch (...) { | |
171 | ex = std::current_exception(); | |
172 | } | |
173 | co_await destroy_scheduling_group(other_sg); | |
174 | if (ex) { | |
175 | std::rethrow_exception(std::move(ex)); | |
176 | } | |
177 | } | |
178 | ||
179 | future<scheduling_group> switch_to_with_context(scheduling_group& sg) { | |
180 | scheduling_group new_sg = co_await coroutine::switch_to(sg); | |
181 | BOOST_REQUIRE(current_scheduling_group() == sg); | |
182 | co_return new_sg; | |
183 | } | |
184 | ||
185 | SEASTAR_TEST_CASE(test_switch_to) { | |
186 | auto other_sg0 = co_await create_scheduling_group("other group 0", 10.f); | |
187 | auto other_sg1 = co_await create_scheduling_group("other group 1", 10.f); | |
188 | auto other_sg2 = co_await create_scheduling_group("other group 2", 10.f); | |
189 | std::exception_ptr ex; | |
190 | ||
191 | try { | |
192 | auto base_sg = current_scheduling_group(); | |
193 | ||
194 | auto prev_sg = co_await coroutine::switch_to(other_sg0); | |
195 | BOOST_REQUIRE(current_scheduling_group() == other_sg0); | |
196 | BOOST_REQUIRE(prev_sg == base_sg); | |
9f95a23c | 197 | |
1e59de90 TL |
198 | auto same_sg = co_await coroutine::switch_to(other_sg0); |
199 | BOOST_REQUIRE(current_scheduling_group() == other_sg0); | |
200 | BOOST_REQUIRE(same_sg == other_sg0); | |
201 | ||
202 | auto nested_sg = co_await coroutine::switch_to(other_sg1); | |
203 | BOOST_REQUIRE(current_scheduling_group() == other_sg1); | |
204 | BOOST_REQUIRE(nested_sg == other_sg0); | |
205 | ||
206 | co_await switch_to_with_context(other_sg2); | |
207 | ||
208 | co_await coroutine::switch_to(base_sg); | |
209 | BOOST_REQUIRE(current_scheduling_group() == base_sg); | |
210 | } catch (...) { | |
211 | ex = std::current_exception(); | |
212 | } | |
213 | ||
214 | co_await destroy_scheduling_group(other_sg1); | |
215 | co_await destroy_scheduling_group(other_sg0); | |
216 | if (ex) { | |
217 | std::rethrow_exception(std::move(ex)); | |
218 | } | |
219 | } | |
220 | ||
221 | future<> check_thread_inherits_sg_from_coroutine_frame(scheduling_group expected_sg) { | |
222 | return seastar::async([expected_sg] { | |
223 | BOOST_REQUIRE(current_scheduling_group() == expected_sg); | |
224 | }); | |
225 | } | |
226 | ||
227 | future<> check_coroutine_inherits_sg_from_another_one(scheduling_group expected_sg) { | |
228 | co_await yield(); | |
229 | BOOST_REQUIRE(current_scheduling_group() == expected_sg); | |
230 | } | |
231 | ||
232 | future<> switch_to_sg_and_perform_inheriting_checks(scheduling_group base_sg, scheduling_group new_sg) { | |
233 | BOOST_REQUIRE(current_scheduling_group() == base_sg); | |
234 | co_await coroutine::switch_to(new_sg); | |
235 | BOOST_REQUIRE(current_scheduling_group() == new_sg); | |
236 | ||
237 | co_await check_thread_inherits_sg_from_coroutine_frame(new_sg); | |
238 | co_await check_coroutine_inherits_sg_from_another_one(new_sg); | |
239 | ||
240 | // don't restore previous sg on purpose, expecting it will be restored once coroutine goes out of scope | |
241 | } | |
242 | ||
243 | SEASTAR_TEST_CASE(test_switch_to_sg_restoration_and_inheriting) { | |
244 | auto new_sg = co_await create_scheduling_group("other group 0", 10.f); | |
245 | std::exception_ptr ex; | |
246 | ||
247 | try { | |
248 | auto base_sg = current_scheduling_group(); | |
249 | ||
250 | co_await switch_to_sg_and_perform_inheriting_checks(base_sg, new_sg); | |
251 | // seastar automatically restores base_sg once it goes out of coroutine frame | |
252 | BOOST_REQUIRE(current_scheduling_group() == base_sg); | |
253 | ||
254 | co_await seastar::async([base_sg, new_sg] { | |
255 | switch_to_sg_and_perform_inheriting_checks(base_sg, new_sg).get(); | |
256 | BOOST_REQUIRE(current_scheduling_group() == base_sg); | |
257 | }); | |
258 | ||
259 | co_await switch_to_sg_and_perform_inheriting_checks(base_sg, new_sg).finally([base_sg] { | |
260 | BOOST_REQUIRE(current_scheduling_group() == base_sg); | |
261 | }); | |
262 | } catch (...) { | |
263 | ex = std::current_exception(); | |
264 | } | |
265 | ||
266 | co_await destroy_scheduling_group(new_sg); | |
267 | if (ex) { | |
268 | std::rethrow_exception(std::move(ex)); | |
269 | } | |
9f95a23c | 270 | } |
f67539c2 | 271 | |
20effc67 TL |
272 | SEASTAR_TEST_CASE(test_preemption) { |
273 | bool x = false; | |
274 | unsigned preempted = 0; | |
1e59de90 | 275 | auto f = yield().then([&x] { |
20effc67 TL |
276 | x = true; |
277 | }); | |
278 | ||
279 | // try to preempt 1000 times. 1 should be enough if not for | |
280 | // task queue shaffling in debug mode which may cause co-routine | |
281 | // continuation to run first. | |
282 | while(preempted < 1000 && !x) { | |
283 | preempted += need_preempt(); | |
284 | co_await make_ready_future<>(); | |
285 | } | |
286 | auto save_x = x; | |
1e59de90 | 287 | // wait for yield() to complete |
20effc67 TL |
288 | co_await std::move(f); |
289 | BOOST_REQUIRE(save_x); | |
290 | co_return; | |
291 | } | |
292 | ||
1e59de90 TL |
293 | SEASTAR_TEST_CASE(test_no_preemption) { |
294 | bool x = false; | |
295 | unsigned preempted = 0; | |
296 | auto f = yield().then([&x] { | |
297 | x = true; | |
298 | }); | |
299 | ||
300 | // preemption should not happen, we explicitly asked for continuing if possible | |
301 | while(preempted < 1000 && !x) { | |
302 | preempted += need_preempt(); | |
303 | co_await coroutine::without_preemption_check(make_ready_future<>()); | |
304 | } | |
305 | auto save_x = x; | |
306 | // wait for yield() to complete | |
307 | co_await std::move(f); | |
308 | BOOST_REQUIRE(!save_x); | |
309 | co_return; | |
310 | } | |
311 | ||
20effc67 TL |
312 | SEASTAR_TEST_CASE(test_all_simple) { |
313 | auto [a, b] = co_await coroutine::all( | |
314 | [] { return make_ready_future<int>(1); }, | |
315 | [] { return make_ready_future<int>(2); } | |
316 | ); | |
317 | BOOST_REQUIRE_EQUAL(a, 1); | |
318 | BOOST_REQUIRE_EQUAL(b, 2); | |
319 | } | |
320 | ||
321 | SEASTAR_TEST_CASE(test_all_permutations) { | |
322 | std::vector<std::chrono::milliseconds> delays = { 0ms, 0ms, 2ms, 2ms, 4ms, 6ms }; | |
323 | auto make_delayed_future_returning_nr = [&] (int nr) { | |
324 | return [=] { | |
325 | auto delay = delays[nr]; | |
326 | return delay == 0ms ? make_ready_future<int>(nr) : sleep(delay).then([nr] { return make_ready_future<int>(nr); }); | |
327 | }; | |
328 | }; | |
329 | do { | |
330 | auto [a, b, c, d, e, f] = co_await coroutine::all( | |
331 | make_delayed_future_returning_nr(0), | |
332 | make_delayed_future_returning_nr(1), | |
333 | make_delayed_future_returning_nr(2), | |
334 | make_delayed_future_returning_nr(3), | |
335 | make_delayed_future_returning_nr(4), | |
336 | make_delayed_future_returning_nr(5) | |
337 | ); | |
338 | BOOST_REQUIRE_EQUAL(a, 0); | |
339 | BOOST_REQUIRE_EQUAL(b, 1); | |
340 | BOOST_REQUIRE_EQUAL(c, 2); | |
341 | BOOST_REQUIRE_EQUAL(d, 3); | |
342 | BOOST_REQUIRE_EQUAL(e, 4); | |
343 | BOOST_REQUIRE_EQUAL(f, 5); | |
344 | } while (std::ranges::next_permutation(delays).found); | |
345 | } | |
346 | ||
347 | SEASTAR_TEST_CASE(test_all_ready_exceptions) { | |
348 | try { | |
349 | co_await coroutine::all( | |
350 | [] () -> future<> { throw 1; }, | |
351 | [] () -> future<> { throw 2; } | |
352 | ); | |
353 | } catch (int e) { | |
354 | BOOST_REQUIRE(e == 1 || e == 2); | |
355 | } | |
356 | } | |
357 | ||
358 | SEASTAR_TEST_CASE(test_all_nonready_exceptions) { | |
359 | try { | |
360 | co_await coroutine::all( | |
361 | [] () -> future<> { | |
362 | co_await sleep(1ms); | |
363 | throw 1; | |
364 | }, | |
365 | [] () -> future<> { | |
366 | co_await sleep(1ms); | |
367 | throw 2; | |
368 | } | |
369 | ); | |
370 | } catch (int e) { | |
371 | BOOST_REQUIRE(e == 1 || e == 2); | |
372 | } | |
373 | } | |
374 | ||
375 | SEASTAR_TEST_CASE(test_all_heterogeneous_types) { | |
376 | auto [a, b] = co_await coroutine::all( | |
377 | [] () -> future<int> { | |
378 | co_await sleep(1ms); | |
379 | co_return 1; | |
380 | }, | |
381 | [] () -> future<> { | |
382 | co_await sleep(1ms); | |
383 | }, | |
384 | [] () -> future<long> { | |
385 | co_await sleep(1ms); | |
386 | co_return 2L; | |
387 | } | |
388 | ); | |
389 | BOOST_REQUIRE_EQUAL(a, 1); | |
390 | BOOST_REQUIRE_EQUAL(b, 2L); | |
391 | } | |
392 | ||
393 | SEASTAR_TEST_CASE(test_all_noncopyable_types) { | |
394 | auto [a] = co_await coroutine::all( | |
395 | [] () -> future<std::unique_ptr<int>> { | |
396 | co_return std::make_unique<int>(6); | |
397 | } | |
398 | ); | |
399 | BOOST_REQUIRE_EQUAL(*a, 6); | |
400 | } | |
401 | ||
402 | SEASTAR_TEST_CASE(test_all_throw_in_input_func) { | |
403 | int nr_completed = 0; | |
404 | bool exception_seen = false; | |
405 | try { | |
406 | co_await coroutine::all( | |
407 | [&] () -> future<int> { | |
408 | co_await sleep(1ms); | |
409 | ++nr_completed; | |
410 | co_return 7; | |
411 | }, | |
412 | [&] () -> future<int> { | |
413 | throw 9; | |
414 | }, | |
415 | [&] () -> future<int> { | |
416 | co_await sleep(1ms); | |
417 | ++nr_completed; | |
418 | co_return 7; | |
419 | } | |
420 | ); | |
421 | } catch (int n) { | |
422 | BOOST_REQUIRE_EQUAL(n, 9); | |
423 | exception_seen = true; | |
424 | } | |
425 | BOOST_REQUIRE_EQUAL(nr_completed, 2); | |
426 | BOOST_REQUIRE(exception_seen); | |
427 | } | |
428 | ||
1e59de90 TL |
429 | struct counter_ref { |
430 | private: | |
431 | int& _counter; | |
432 | ||
433 | public: | |
434 | explicit counter_ref(int& cnt) | |
435 | : _counter(cnt) { | |
436 | ++_counter; | |
437 | } | |
438 | ||
439 | ~counter_ref() { | |
440 | --_counter; | |
441 | } | |
442 | }; | |
443 | ||
444 | template<typename Ex> | |
445 | static future<> check_coroutine_throws(auto fun) { | |
446 | // The counter keeps track of the number of alive "counter_ref" objects. | |
447 | // If it is not zero then it means that some destructors weren't run | |
448 | // while quitting the coroutine. | |
449 | int counter = 0; | |
450 | BOOST_REQUIRE_THROW(co_await fun(counter), Ex); | |
451 | BOOST_REQUIRE_EQUAL(counter, 0); | |
452 | co_await fun(counter).then_wrapped([&counter] (auto f) { | |
453 | BOOST_REQUIRE(f.failed()); | |
454 | BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), Ex); | |
455 | BOOST_REQUIRE_EQUAL(counter, 0); | |
456 | }); | |
457 | } | |
458 | ||
20effc67 | 459 | SEASTAR_TEST_CASE(test_coroutine_exception) { |
1e59de90 TL |
460 | co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<int> { |
461 | counter_ref ref{counter}; | |
20effc67 | 462 | co_return coroutine::exception(std::make_exception_ptr(std::runtime_error("threw"))); |
20effc67 | 463 | }); |
1e59de90 TL |
464 | co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<int> { |
465 | counter_ref ref{counter}; | |
466 | co_await coroutine::exception(std::make_exception_ptr(std::runtime_error("threw"))); | |
467 | co_return 42; | |
468 | }); | |
469 | co_await check_coroutine_throws<std::logic_error>([] (int& counter) -> future<> { | |
470 | counter_ref ref{counter}; | |
471 | co_await coroutine::return_exception(std::logic_error("threw")); | |
472 | co_return; | |
473 | }); | |
474 | co_await check_coroutine_throws<int>([] (int& counter) -> future<> { | |
475 | counter_ref ref{counter}; | |
476 | co_await coroutine::return_exception(42); | |
477 | co_return; | |
478 | }); | |
479 | } | |
20effc67 | 480 | |
1e59de90 TL |
481 | SEASTAR_TEST_CASE(test_coroutine_return_exception_ptr) { |
482 | co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<> { | |
483 | co_await coroutine::return_exception(std::runtime_error("threw")); | |
484 | }); | |
485 | co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<> { | |
486 | auto ex = std::make_exception_ptr(std::runtime_error("threw")); | |
487 | co_await coroutine::return_exception_ptr(std::move(ex)); | |
488 | }); | |
489 | co_await check_coroutine_throws<std::runtime_error>([] (int& counter) -> future<> { | |
490 | auto ex = std::make_exception_ptr(std::runtime_error("threw")); | |
491 | co_await coroutine::return_exception_ptr(ex); | |
492 | }); | |
493 | co_await check_coroutine_throws<int>([] (int& counter) -> future<> { | |
494 | co_await coroutine::return_exception_ptr(std::make_exception_ptr(3)); | |
20effc67 TL |
495 | }); |
496 | } | |
497 | ||
498 | SEASTAR_TEST_CASE(test_maybe_yield) { | |
499 | int var = 0; | |
500 | bool done = false; | |
501 | auto spinner = [&] () -> future<> { | |
502 | // increment a variable continuously, but yield so an observer can see it. | |
503 | while (!done) { | |
504 | ++var; | |
505 | co_await coroutine::maybe_yield(); | |
506 | } | |
507 | }; | |
508 | auto spinner_fut = spinner(); | |
509 | int snapshot = var; | |
510 | for (int nr_changes = 0; nr_changes < 10; ++nr_changes) { | |
511 | // Try to observe the value changing in time, yield to | |
512 | // allow the spinner to advance it. | |
513 | while (snapshot == var) { | |
514 | co_await coroutine::maybe_yield(); | |
515 | } | |
516 | snapshot = var; | |
517 | } | |
518 | done = true; | |
519 | co_await std::move(spinner_fut); | |
520 | BOOST_REQUIRE(true); // the test will hang if it doesn't work. | |
521 | } | |
522 | ||
1e59de90 TL |
523 | #if __has_include(<coroutine>) && !defined(__clang__) |
524 | ||
525 | #include "tl-generator.hh" | |
526 | tl::generator<int> simple_generator(int max) | |
527 | { | |
528 | for (int i = 0; i < max; ++i) { | |
529 | co_yield i; | |
530 | } | |
531 | } | |
532 | ||
533 | SEASTAR_TEST_CASE(generator) | |
534 | { | |
535 | // test ability of seastar::parallel_for_each to deal with move-only views | |
536 | int accum = 0; | |
537 | co_await seastar::parallel_for_each(simple_generator(10), [&](int i) { | |
538 | accum += i; | |
539 | return seastar::make_ready_future<>(); | |
540 | }); | |
541 | BOOST_REQUIRE_EQUAL(accum, 45); | |
542 | ||
543 | // test ability of seastar::max_concurrent_for_each to deal with move-only views | |
544 | accum = 0; | |
545 | co_await seastar::max_concurrent_for_each(simple_generator(10), 10, [&](int i) { | |
546 | accum += i; | |
547 | return seastar::make_ready_future<>(); | |
548 | }); | |
549 | BOOST_REQUIRE_EQUAL(accum, 45); | |
550 | } | |
551 | ||
552 | #endif | |
553 | ||
554 | SEASTAR_TEST_CASE(test_parallel_for_each_empty) { | |
555 | std::vector<int> values; | |
556 | int count = 0; | |
557 | ||
558 | co_await coroutine::parallel_for_each(values, [&] (int x) { | |
559 | ++count; | |
560 | }); | |
561 | BOOST_REQUIRE_EQUAL(count, 0); // the test will hang if it doesn't work. | |
562 | } | |
563 | ||
564 | SEASTAR_TEST_CASE(test_parallel_for_each_exception) { | |
565 | std::array<int, 5> values = { 10, 2, 1, 4, 8 }; | |
566 | int count = 0; | |
567 | auto& eng = testing::local_random_engine; | |
568 | auto dist = std::uniform_int_distribution<unsigned>(); | |
569 | int throw_at = dist(eng) % values.size(); | |
570 | ||
571 | BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at, values.size())); | |
572 | ||
573 | auto f0 = coroutine::parallel_for_each(values, [&] (int x) { | |
574 | if (count++ == throw_at) { | |
575 | BOOST_TEST_MESSAGE("throw"); | |
576 | throw std::runtime_error("test"); | |
577 | } | |
578 | }); | |
579 | // An exception thrown by the functor must be propagated | |
580 | BOOST_REQUIRE_THROW(co_await std::move(f0), std::runtime_error); | |
581 | // Functor must be called on all values, even if there's an exception | |
582 | BOOST_REQUIRE_EQUAL(count, values.size()); | |
583 | ||
584 | count = 0; | |
585 | throw_at = dist(eng) % values.size(); | |
586 | BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at, values.size())); | |
587 | ||
588 | auto f1 = coroutine::parallel_for_each(values, [&] (int x) -> future<> { | |
589 | co_await sleep(std::chrono::milliseconds(x)); | |
590 | if (count++ == throw_at) { | |
591 | throw std::runtime_error("test"); | |
592 | } | |
593 | }); | |
594 | BOOST_REQUIRE_THROW(co_await std::move(f1), std::runtime_error); | |
595 | BOOST_REQUIRE_EQUAL(count, values.size()); | |
596 | } | |
597 | ||
598 | SEASTAR_TEST_CASE(test_parallel_for_each) { | |
599 | std::vector<int> values = { 3, 1, 4 }; | |
600 | int sum_of_squares = 0; | |
601 | ||
602 | int expected = std::accumulate(values.begin(), values.end(), 0, [] (int sum, int x) { | |
603 | return sum + x * x; | |
604 | }); | |
605 | ||
606 | // Test all-ready futures | |
607 | co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) { | |
608 | sum_of_squares += x * x; | |
609 | }); | |
610 | BOOST_REQUIRE_EQUAL(sum_of_squares, expected); | |
611 | ||
612 | // Test non-ready futures | |
613 | sum_of_squares = 0; | |
614 | co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) -> future<> { | |
615 | if (x > 1) { | |
616 | co_await sleep(std::chrono::milliseconds(x)); | |
617 | } | |
618 | sum_of_squares += x * x; | |
619 | }); | |
620 | BOOST_REQUIRE_EQUAL(sum_of_squares, expected); | |
621 | ||
622 | // Test legacy subrange | |
623 | sum_of_squares = 0; | |
624 | co_await coroutine::parallel_for_each(values.begin(), values.end() - 1, [&sum_of_squares] (int x) -> future<> { | |
625 | if (x > 1) { | |
626 | co_await sleep(std::chrono::milliseconds(x)); | |
627 | } | |
628 | sum_of_squares += x * x; | |
629 | }); | |
630 | BOOST_REQUIRE_EQUAL(sum_of_squares, 10); | |
631 | ||
632 | // clang 13.0.1 doesn't support subrange | |
633 | // so provide also a Iterator/Sentinel based constructor. | |
634 | // See https://github.com/llvm/llvm-project/issues/46091 | |
635 | #ifndef __clang__ | |
636 | // Test std::ranges::subrange | |
637 | sum_of_squares = 0; | |
638 | co_await coroutine::parallel_for_each(std::ranges::subrange(values.begin(), values.end() - 1), [&sum_of_squares] (int x) -> future<> { | |
639 | if (x > 1) { | |
640 | co_await sleep(std::chrono::milliseconds(x)); | |
641 | } | |
642 | sum_of_squares += x * x; | |
643 | }); | |
644 | BOOST_REQUIRE_EQUAL(sum_of_squares, 10); | |
645 | #endif | |
646 | } | |
647 | ||
648 | SEASTAR_TEST_CASE(test_void_as_future) { | |
649 | auto f = co_await coroutine::as_future(make_ready_future<>()); | |
650 | BOOST_REQUIRE(f.available()); | |
651 | ||
652 | f = co_await coroutine::as_future(make_exception_future<>(std::runtime_error("exception"))); | |
653 | BOOST_REQUIRE_THROW(f.get(), std::runtime_error); | |
654 | ||
655 | semaphore sem(0); | |
656 | (void)sleep(1ms).then([&] { sem.signal(); }); | |
657 | f = co_await coroutine::as_future(sem.wait()); | |
658 | BOOST_REQUIRE(f.available()); | |
659 | ||
660 | f = co_await coroutine::as_future(sem.wait(duration_cast<semaphore::duration>(1ms))); | |
661 | BOOST_REQUIRE_THROW(f.get(), semaphore_timed_out); | |
662 | } | |
663 | ||
664 | SEASTAR_TEST_CASE(test_void_as_future_without_preemption_check) { | |
665 | auto f = co_await coroutine::as_future_without_preemption_check(make_ready_future<>()); | |
666 | BOOST_REQUIRE(f.available()); | |
667 | ||
668 | f = co_await coroutine::as_future_without_preemption_check(make_exception_future<>(std::runtime_error("exception"))); | |
669 | BOOST_REQUIRE_THROW(f.get(), std::runtime_error); | |
670 | ||
671 | semaphore sem(0); | |
672 | (void)sleep(1ms).then([&] { sem.signal(); }); | |
673 | f = co_await coroutine::as_future_without_preemption_check(sem.wait()); | |
674 | BOOST_REQUIRE(f.available()); | |
675 | ||
676 | f = co_await coroutine::as_future_without_preemption_check(sem.wait(duration_cast<semaphore::duration>(1ms))); | |
677 | BOOST_REQUIRE_THROW(f.get(), semaphore_timed_out); | |
678 | } | |
679 | ||
680 | SEASTAR_TEST_CASE(test_non_void_as_future) { | |
681 | auto f = co_await coroutine::as_future(make_ready_future<int>(42)); | |
682 | BOOST_REQUIRE_EQUAL(f.get0(), 42); | |
683 | ||
684 | f = co_await coroutine::as_future(make_exception_future<int>(std::runtime_error("exception"))); | |
685 | BOOST_REQUIRE_THROW(f.get(), std::runtime_error); | |
686 | ||
687 | auto p = promise<int>(); | |
688 | (void)sleep(1ms).then([&] { p.set_value(314); }); | |
689 | f = co_await coroutine::as_future(p.get_future()); | |
690 | BOOST_REQUIRE_EQUAL(f.get0(), 314); | |
691 | ||
692 | auto gen_exception = [] () -> future<int> { | |
693 | co_await sleep(1ms); | |
694 | co_return coroutine::exception(std::make_exception_ptr(std::runtime_error("exception"))); | |
695 | }; | |
696 | f = co_await coroutine::as_future(gen_exception()); | |
697 | BOOST_REQUIRE_THROW(f.get(), std::runtime_error); | |
698 | } | |
699 | ||
700 | SEASTAR_TEST_CASE(test_non_void_as_future_without_preemption_check) { | |
701 | auto f = co_await coroutine::as_future_without_preemption_check(make_ready_future<int>(42)); | |
702 | BOOST_REQUIRE_EQUAL(f.get0(), 42); | |
703 | ||
704 | f = co_await coroutine::as_future_without_preemption_check(make_exception_future<int>(std::runtime_error("exception"))); | |
705 | BOOST_REQUIRE_THROW(f.get(), std::runtime_error); | |
706 | ||
707 | auto p = promise<int>(); | |
708 | (void)sleep(1ms).then([&] { p.set_value(314); }); | |
709 | f = co_await coroutine::as_future_without_preemption_check(p.get_future()); | |
710 | BOOST_REQUIRE_EQUAL(f.get0(), 314); | |
711 | ||
712 | auto gen_exception = [] () -> future<int> { | |
713 | co_await sleep(1ms); | |
714 | co_return coroutine::exception(std::make_exception_ptr(std::runtime_error("exception"))); | |
715 | }; | |
716 | f = co_await coroutine::as_future_without_preemption_check(gen_exception()); | |
717 | BOOST_REQUIRE_THROW(f.get(), std::runtime_error); | |
718 | } | |
719 | ||
720 | SEASTAR_TEST_CASE(test_as_future_preemption) { | |
721 | bool stop = false; | |
722 | ||
723 | auto get_ready_future = [&] { | |
724 | return stop ? make_exception_future<>(std::runtime_error("exception")) : make_ready_future<>(); | |
725 | }; | |
726 | ||
727 | auto wait_for_stop = [&] () -> future<bool> { | |
728 | for (;;) { | |
729 | auto f = co_await coroutine::as_future(get_ready_future()); | |
730 | if (f.failed()) { | |
731 | co_return coroutine::exception(f.get_exception()); | |
732 | } | |
733 | } | |
734 | }; | |
735 | ||
736 | auto f0 = wait_for_stop(); | |
737 | ||
738 | auto set_stop = [&] () -> future<> { | |
739 | for (;;) { | |
740 | stop = true; | |
741 | if (f0.available()) { | |
742 | co_return; | |
743 | } | |
744 | co_await coroutine::maybe_yield(); | |
745 | } | |
746 | }; | |
747 | ||
748 | co_await set_stop(); | |
749 | ||
750 | BOOST_REQUIRE_THROW(f0.get(), std::runtime_error); | |
751 | } | |
752 | ||
753 | #ifndef __clang__ | |
754 | ||
755 | template<template<typename> class Container> | |
756 | coroutine::experimental::generator<int, Container> | |
757 | fibonacci_sequence(coroutine::experimental::buffer_size_t size, unsigned count) { | |
758 | auto a = 0, b = 1; | |
759 | for (unsigned i = 0; i < count; ++i) { | |
760 | if (std::numeric_limits<decltype(a)>::max() - a < b) { | |
761 | throw std::out_of_range( | |
762 | fmt::format("fibonacci[{}] is greater than the largest value of int", i)); | |
763 | } | |
764 | co_yield std::exchange(a, std::exchange(b, a + b)); | |
765 | } | |
766 | } | |
767 | ||
768 | template<template<typename> class Container> | |
769 | seastar::future<> test_async_generator_drained() { | |
770 | auto expected_fibs = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55}; | |
771 | auto fib = fibonacci_sequence<Container>(coroutine::experimental::buffer_size_t{2}, | |
772 | std::size(expected_fibs)); | |
773 | for (auto expected_fib : expected_fibs) { | |
774 | auto actual_fib = co_await fib(); | |
775 | BOOST_REQUIRE(actual_fib.has_value()); | |
776 | BOOST_REQUIRE_EQUAL(actual_fib.value(), expected_fib); | |
777 | } | |
778 | auto sentinel = co_await fib(); | |
779 | BOOST_REQUIRE(!sentinel.has_value()); | |
780 | } | |
781 | ||
782 | template<typename T> | |
783 | using buffered_container = circular_buffer<T>; | |
784 | ||
785 | SEASTAR_TEST_CASE(test_async_generator_drained_buffered) { | |
786 | return test_async_generator_drained<buffered_container>(); | |
787 | } | |
788 | ||
789 | SEASTAR_TEST_CASE(test_async_generator_drained_unbuffered) { | |
790 | return test_async_generator_drained<std::optional>(); | |
791 | } | |
792 | ||
793 | template<template<typename> class Container> | |
794 | seastar::future<> test_async_generator_not_drained() { | |
795 | auto fib = fibonacci_sequence<Container>(coroutine::experimental::buffer_size_t{2}, | |
796 | 42); | |
797 | auto actual_fib = co_await fib(); | |
798 | BOOST_REQUIRE(actual_fib.has_value()); | |
799 | BOOST_REQUIRE_EQUAL(actual_fib.value(), 0); | |
800 | } | |
801 | ||
802 | SEASTAR_TEST_CASE(test_async_generator_not_drained_buffered) { | |
803 | return test_async_generator_not_drained<buffered_container>(); | |
804 | } | |
805 | ||
806 | SEASTAR_TEST_CASE(test_async_generator_not_drained_unbuffered) { | |
807 | return test_async_generator_not_drained<std::optional>(); | |
808 | } | |
809 | ||
810 | struct counter_t { | |
811 | int n; | |
812 | int* count; | |
813 | counter_t(counter_t&& other) noexcept | |
814 | : n{std::exchange(other.n, -1)}, | |
815 | count{std::exchange(other.count, nullptr)} | |
816 | {} | |
817 | counter_t(int n, int* count) noexcept | |
818 | : n{n}, count{count} { | |
819 | ++(*count); | |
820 | } | |
821 | ~counter_t() noexcept { | |
822 | if (count) { | |
823 | --(*count); | |
824 | } | |
825 | } | |
826 | }; | |
827 | std::ostream& operator<<(std::ostream& os, const counter_t& c) { | |
828 | return os << c.n; | |
829 | } | |
830 | ||
831 | template<template<typename> class Container> | |
832 | coroutine::experimental::generator<counter_t, Container> | |
833 | fiddle(coroutine::experimental::buffer_size_t size, int n, int* total) { | |
834 | int i = 0; | |
835 | while (true) { | |
836 | if (i++ == n) { | |
837 | throw std::invalid_argument("Eureka from generator!"); | |
838 | } | |
839 | co_yield counter_t{i, total}; | |
840 | } | |
841 | } | |
842 | ||
843 | template<template<typename> class Container> | |
844 | seastar::future<> test_async_generator_throws_from_generator() { | |
845 | int total = 0; | |
846 | auto count_to = [total=&total](unsigned n) -> seastar::future<> { | |
847 | auto count = fiddle<Container>(coroutine::experimental::buffer_size_t{2}, | |
848 | n, total); | |
849 | for (unsigned i = 0; i < 2 * n; i++) { | |
850 | co_await count(); | |
851 | } | |
852 | }; | |
853 | co_await count_to(42).then_wrapped([&total] (auto f) { | |
854 | BOOST_REQUIRE(f.failed()); | |
855 | BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), std::invalid_argument); | |
856 | BOOST_REQUIRE_EQUAL(total, 0); | |
857 | }); | |
858 | } | |
859 | ||
860 | SEASTAR_TEST_CASE(test_async_generator_throws_from_generator_buffered) { | |
861 | return test_async_generator_throws_from_generator<buffered_container>(); | |
862 | } | |
863 | ||
864 | SEASTAR_TEST_CASE(test_async_generator_throws_from_generator_unbuffered) { | |
865 | return test_async_generator_throws_from_generator<std::optional>(); | |
866 | } | |
867 | ||
868 | template<template<typename> class Container> | |
869 | seastar::future<> test_async_generator_throws_from_consumer() { | |
870 | int total = 0; | |
871 | auto count_to = [total=&total](unsigned n) -> seastar::future<> { | |
872 | auto count = fiddle<Container>(coroutine::experimental::buffer_size_t{2}, | |
873 | n, total); | |
874 | for (unsigned i = 0; i < n; i++) { | |
875 | if (i == n / 2) { | |
876 | throw std::invalid_argument("Eureka from consumer!"); | |
877 | } | |
878 | co_await count(); | |
879 | } | |
880 | }; | |
881 | co_await count_to(42).then_wrapped([&total] (auto f) { | |
882 | BOOST_REQUIRE(f.failed()); | |
883 | BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), std::invalid_argument); | |
884 | BOOST_REQUIRE_EQUAL(total, 0); | |
885 | }); | |
886 | } | |
887 | ||
888 | SEASTAR_TEST_CASE(test_async_generator_throws_from_consumer_buffered) { | |
889 | return test_async_generator_throws_from_consumer<buffered_container>(); | |
890 | } | |
891 | ||
892 | SEASTAR_TEST_CASE(test_async_generator_throws_from_consumer_unbuffered) { | |
893 | return test_async_generator_throws_from_consumer<std::optional>(); | |
894 | } | |
895 | ||
896 | #endif | |
897 | ||
898 | SEASTAR_TEST_CASE(test_lambda_coroutine_in_continuation) { | |
899 | auto dist = std::uniform_real_distribution<>(0.0, 1.0); | |
900 | auto rand_eng = std::default_random_engine(std::random_device()()); | |
901 | double n = dist(rand_eng); | |
902 | auto sin1 = std::sin(n); // avoid optimizer tricks | |
903 | auto boo = std::array<char, 1025>(); // bias coroutine size towards 1024 size class | |
904 | auto sin2 = co_await yield().then(coroutine::lambda([n, boo] () -> future<double> { | |
905 | // Expect coroutine capture to be freed after co_await without coroutine::lambda | |
906 | co_await yield(); | |
907 | // Try to overwrite recently-release coroutine frame by allocating in similar size-class | |
908 | std::vector<char*> garbage; | |
909 | for (size_t sz = 1024; sz < 2048; ++sz) { | |
910 | for (int ctr = 0; ctr < 100; ++ctr) { | |
911 | auto p = static_cast<char*>(malloc(sz)); | |
912 | std::memset(p, 0, sz); | |
913 | garbage.push_back(p); | |
914 | } | |
915 | } | |
916 | for (auto p : garbage) { | |
917 | std::free(p); | |
918 | } | |
919 | (void)boo; | |
920 | co_return std::sin(n); | |
921 | })); | |
922 | BOOST_REQUIRE_EQUAL(sin1, sin2); | |
923 | } | |
924 | ||
f67539c2 | 925 | #endif |