]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/tests/unit/distributed_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / tests / unit / distributed_test.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23 #include <seastar/testing/test_case.hh>
24 #include <seastar/testing/thread_test_case.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/loop.hh>
27 #include <seastar/core/semaphore.hh>
28 #include <seastar/core/sleep.hh>
29 #include <seastar/core/thread.hh>
30 #include <seastar/core/print.hh>
31 #include <seastar/util/defer.hh>
32 #include <seastar/util/closeable.hh>
33 #include <seastar/util/later.hh>
34 #include <mutex>
35
36 using namespace seastar;
37 using namespace std::chrono_literals;
38
39 struct async_service : public seastar::async_sharded_service<async_service> {
40 thread_local static bool deleted;
41 ~async_service() {
42 deleted = true;
43 }
44 void run() {
45 auto ref = shared_from_this();
46 // Wait a while and check.
47 (void)sleep(std::chrono::milliseconds(100 + 100 * this_shard_id())).then([this, ref] {
48 check();
49 });
50 }
51 virtual void check() {
52 assert(!deleted);
53 }
54 future<> stop() { return make_ready_future<>(); }
55 };
56
57 thread_local bool async_service::deleted = false;
58
59 struct X {
60 sstring echo(sstring arg) {
61 return arg;
62 }
63 int cpu_id_squared() const {
64 auto id = this_shard_id();
65 return id * id;
66 }
67 future<> stop() { return make_ready_future<>(); }
68 };
69
70 template <typename T, typename Func>
71 future<> do_with_distributed(Func&& func) {
72 auto x = make_shared<distributed<T>>();
73 return func(*x).finally([x] {
74 return x->stop();
75 }).finally([x]{});
76 }
77
78 SEASTAR_TEST_CASE(test_that_each_core_gets_the_arguments) {
79 return do_with_distributed<X>([] (auto& x) {
80 return x.start().then([&x] {
81 return x.map_reduce([] (sstring msg){
82 if (msg != "hello") {
83 throw std::runtime_error("wrong message");
84 }
85 }, &X::echo, sstring("hello"));
86 });
87 });
88 }
89
90 SEASTAR_TEST_CASE(test_functor_version) {
91 return do_with_distributed<X>([] (auto& x) {
92 return x.start().then([&x] {
93 return x.map_reduce([] (sstring msg){
94 if (msg != "hello") {
95 throw std::runtime_error("wrong message");
96 }
97 }, [] (X& x) { return x.echo("hello"); });
98 });
99 });
100 }
101
102 struct Y {
103 sstring s;
104 Y(sstring s) : s(std::move(s)) {}
105 future<> stop() { return make_ready_future<>(); }
106 };
107
108 SEASTAR_TEST_CASE(test_constructor_argument_is_passed_to_each_core) {
109 return do_with_distributed<Y>([] (auto& y) {
110 return y.start(sstring("hello")).then([&y] {
111 return y.invoke_on_all([] (Y& y) {
112 if (y.s != "hello") {
113 throw std::runtime_error(format("expected message mismatch, is \"%s\"", y.s));
114 }
115 });
116 });
117 });
118 }
119
120 SEASTAR_TEST_CASE(test_map_reduce) {
121 return do_with_distributed<X>([] (distributed<X>& x) {
122 return x.start().then([&x] {
123 return x.map_reduce0(std::mem_fn(&X::cpu_id_squared),
124 0,
125 std::plus<int>()).then([] (int result) {
126 int n = smp::count - 1;
127 if (result != (n * (n + 1) * (2*n + 1)) / 6) {
128 throw std::runtime_error("map_reduce failed");
129 }
130 });
131 });
132 });
133 }
134
135 SEASTAR_TEST_CASE(test_map_reduce_lifetime) {
136 struct map {
137 bool destroyed = false;
138 ~map() {
139 destroyed = true;
140 }
141 auto operator()(const X& x) {
142 return yield().then([this, &x] {
143 BOOST_REQUIRE(!destroyed);
144 return x.cpu_id_squared();
145 });
146 }
147 };
148 struct reduce {
149 long& res;
150 bool destroyed = false;
151 ~reduce() {
152 destroyed = true;
153 }
154 auto operator()(int x) {
155 return yield().then([this, x] {
156 BOOST_REQUIRE(!destroyed);
157 res += x;
158 });
159 }
160 };
161 return do_with_distributed<X>([] (distributed<X>& x) {
162 return x.start().then([&x] {
163 return do_with(0L, [&x] (auto& result) {
164 return x.map_reduce(reduce{result}, map{}).then([&result] {
165 long n = smp::count - 1;
166 long expected = (n * (n + 1) * (2*n + 1)) / 6;
167 BOOST_REQUIRE_EQUAL(result, expected);
168 });
169 });
170 });
171 });
172 }
173
174 SEASTAR_TEST_CASE(test_map_reduce0_lifetime) {
175 struct map {
176 bool destroyed = false;
177 ~map() {
178 destroyed = true;
179 }
180 auto operator()(const X& x) const {
181 return yield().then([this, &x] {
182 BOOST_REQUIRE(!destroyed);
183 return x.cpu_id_squared();
184 });
185 }
186 };
187 struct reduce {
188 bool destroyed = false;
189 ~reduce() {
190 destroyed = true;
191 }
192 auto operator()(long res, int x) {
193 BOOST_REQUIRE(!destroyed);
194 return res + x;
195 }
196 };
197 return do_with_distributed<X>([] (distributed<X>& x) {
198 return x.start().then([&x] {
199 return x.map_reduce0(map{}, 0L, reduce{}).then([] (long result) {
200 long n = smp::count - 1;
201 long expected = (n * (n + 1) * (2*n + 1)) / 6;
202 BOOST_REQUIRE_EQUAL(result, expected);
203 });
204 });
205 });
206 }
207
208 SEASTAR_TEST_CASE(test_map_lifetime) {
209 struct map {
210 bool destroyed = false;
211 ~map() {
212 destroyed = true;
213 }
214 auto operator()(const X& x) const {
215 return yield().then([this, &x] {
216 BOOST_REQUIRE(!destroyed);
217 return x.cpu_id_squared();
218 });
219 }
220 };
221 return do_with_distributed<X>([] (distributed<X>& x) {
222 return x.start().then([&x] {
223 return x.map(map{}).then([] (std::vector<int> result) {
224 BOOST_REQUIRE_EQUAL(result.size(), smp::count);
225 for (size_t i = 0; i < (size_t)smp::count; i++) {
226 BOOST_REQUIRE_EQUAL(result[i], i * i);
227 }
228 });
229 });
230 });
231 }
232
233 SEASTAR_TEST_CASE(test_async) {
234 return do_with_distributed<async_service>([] (distributed<async_service>& x) {
235 return x.start().then([&x] {
236 return x.invoke_on_all(&async_service::run);
237 });
238 }).then([] {
239 return sleep(std::chrono::milliseconds(100 * (smp::count + 1)));
240 });
241 }
242
243 SEASTAR_TEST_CASE(test_invoke_on_others) {
244 return seastar::async([] {
245 struct my_service {
246 int counter = 0;
247 void up() { ++counter; }
248 future<> stop() { return make_ready_future<>(); }
249 };
250 for (unsigned c = 0; c < smp::count; ++c) {
251 smp::submit_to(c, [c] {
252 return seastar::async([c] {
253 sharded<my_service> s;
254 s.start().get();
255 s.invoke_on_others([](auto& s) { s.up(); }).get();
256 if (s.local().counter != 0) {
257 throw std::runtime_error("local modified");
258 }
259 s.invoke_on_all([c](auto& remote) {
260 if (this_shard_id() != c) {
261 if (remote.counter != 1) {
262 throw std::runtime_error("remote not modified");
263 }
264 }
265 }).get();
266 s.stop().get();
267 });
268 }).get();
269 }
270 });
271 }
272
273 SEASTAR_TEST_CASE(test_smp_invoke_on_others) {
274 return seastar::async([] {
275 std::vector<std::vector<int>> calls;
276 calls.reserve(smp::count);
277 for (unsigned i = 0; i < smp::count; i++) {
278 auto& sv = calls.emplace_back();
279 sv.reserve(smp::count);
280 }
281
282 smp::invoke_on_all([&calls] {
283 return smp::invoke_on_others([&calls, from = this_shard_id()] {
284 calls[this_shard_id()].emplace_back(from);
285 });
286 }).get();
287
288 for (unsigned i = 0; i < smp::count; i++) {
289 BOOST_REQUIRE_EQUAL(calls[i].size(), smp::count - 1);
290 for (unsigned f = 0; f < smp::count; f++) {
291 auto r = std::find(calls[i].begin(), calls[i].end(), f);
292 BOOST_REQUIRE_EQUAL(r == calls[i].end(), i == f);
293 }
294 }
295 });
296 }
297
298 struct remote_worker {
299 unsigned current = 0;
300 unsigned max_concurrent_observed = 0;
301 unsigned expected_max;
302 semaphore sem{0};
303 remote_worker(unsigned expected_max) : expected_max(expected_max) {
304 }
305 future<> do_work() {
306 ++current;
307 max_concurrent_observed = std::max(current, max_concurrent_observed);
308 if (max_concurrent_observed >= expected_max && sem.current() == 0) {
309 sem.signal(semaphore::max_counter());
310 }
311 return sem.wait().then([this] {
312 // Sleep a bit to check if the concurrency goes over the max
313 return sleep(100ms).then([this] {
314 max_concurrent_observed = std::max(current, max_concurrent_observed);
315 --current;
316 });
317 });
318 }
319 future<> do_remote_work(shard_id t, smp_service_group ssg) {
320 return smp::submit_to(t, ssg, [this] {
321 return do_work();
322 });
323 }
324 };
325
326 SEASTAR_TEST_CASE(test_smp_service_groups) {
327 return async([] {
328 smp_service_group_config ssgc1;
329 ssgc1.max_nonlocal_requests = 1;
330 auto ssg1 = create_smp_service_group(ssgc1).get0();
331 smp_service_group_config ssgc2;
332 ssgc2.max_nonlocal_requests = 1000;
333 auto ssg2 = create_smp_service_group(ssgc2).get0();
334 shard_id other_shard = smp::count - 1;
335 remote_worker rm1(1);
336 remote_worker rm2(1000);
337 auto bunch1 = parallel_for_each(boost::irange(0, 20), [&] (int ignore) { return rm1.do_remote_work(other_shard, ssg1); });
338 auto bunch2 = parallel_for_each(boost::irange(0, 2000), [&] (int ignore) { return rm2.do_remote_work(other_shard, ssg2); });
339 bunch1.get();
340 bunch2.get();
341 if (smp::count > 1) {
342 assert(rm1.max_concurrent_observed == 1);
343 assert(rm2.max_concurrent_observed == 1000);
344 }
345 destroy_smp_service_group(ssg1).get();
346 destroy_smp_service_group(ssg2).get();
347 });
348 }
349
350 SEASTAR_TEST_CASE(test_smp_service_groups_re_construction) {
351 // During development of the feature, we saw a bug where the vector
352 // holding the groups did not expand correctly. This test triggers the
353 // bug.
354 return async([] {
355 auto ssg1 = create_smp_service_group({}).get0();
356 auto ssg2 = create_smp_service_group({}).get0();
357 destroy_smp_service_group(ssg1).get();
358 auto ssg3 = create_smp_service_group({}).get0();
359 destroy_smp_service_group(ssg2).get();
360 destroy_smp_service_group(ssg3).get();
361 });
362 }
363
364 SEASTAR_TEST_CASE(test_smp_timeout) {
365 return async([] {
366 smp_service_group_config ssgc1;
367 ssgc1.max_nonlocal_requests = 1;
368 auto ssg1 = create_smp_service_group(ssgc1).get0();
369
370 auto _ = defer([ssg1] () noexcept {
371 destroy_smp_service_group(ssg1).get();
372 });
373
374 const shard_id other_shard = smp::count - 1;
375
376 // Ugly but beats using sleeps.
377 std::mutex mut;
378 std::unique_lock<std::mutex> lk(mut);
379
380 // Submitted to the remote shard.
381 auto fut1 = smp::submit_to(other_shard, ssg1, [&mut] {
382 std::cout << "Running request no. 1" << std::endl;
383 std::unique_lock<std::mutex> lk(mut);
384 std::cout << "Request no. 1 done" << std::endl;
385 });
386 // Consume the only unit from the semaphore.
387 auto fut2 = smp::submit_to(other_shard, ssg1, [] {
388 std::cout << "Running request no. 2 - done" << std::endl;
389 });
390
391 auto fut_timedout = smp::submit_to(other_shard, smp_submit_to_options(ssg1, smp_timeout_clock::now() + 10ms), [] {
392 std::cout << "Running timed-out request - done" << std::endl;
393 });
394
395 {
396 auto notify = defer([lk = std::move(lk)] () noexcept { });
397
398 try {
399 fut_timedout.get();
400 throw std::runtime_error("smp::submit_to() didn't timeout as expected");
401 } catch (semaphore_timed_out& e) {
402 std::cout << "Expected timeout received: " << e.what() << std::endl;
403 } catch (...) {
404 std::throw_with_nested(std::runtime_error("smp::submit_to() failed with unexpected exception"));
405 }
406 }
407
408 fut1.get();
409 fut2.get();
410 });
411 }
412
413 SEASTAR_THREAD_TEST_CASE(test_sharded_parameter) {
414 struct dependency {
415 unsigned val = this_shard_id() * 7;
416 };
417 struct some_service {
418 bool ok = false;
419 some_service(unsigned non_shard_dependent, unsigned shard_dependent, dependency& dep, unsigned shard_dependent_2) {
420 ok =
421 non_shard_dependent == 43
422 && shard_dependent == this_shard_id() * 3
423 && dep.val == this_shard_id() * 7
424 && shard_dependent_2 == -dep.val;
425 }
426 };
427 sharded<dependency> s_dep;
428 s_dep.start().get();
429 auto undo1 = deferred_stop(s_dep);
430
431 sharded<some_service> s_service;
432 s_service.start(
433 43, // should be copied verbatim
434 sharded_parameter([] { return this_shard_id() * 3; }),
435 std::ref(s_dep),
436 sharded_parameter([] (dependency& d) { return -d.val; }, std::ref(s_dep))
437 ).get();
438 auto undo2 = deferred_stop(s_service);
439
440 auto all_ok = s_service.map_reduce0(std::mem_fn(&some_service::ok), true, std::multiplies<>()).get0();
441 BOOST_REQUIRE(all_ok);
442 }