]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/tests/unit/distributed_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / tests / unit / distributed_test.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
f67539c2
TL
23#include <seastar/testing/test_case.hh>
24#include <seastar/testing/thread_test_case.hh>
11fdf7f2 25#include <seastar/core/distributed.hh>
f67539c2
TL
26#include <seastar/core/loop.hh>
27#include <seastar/core/semaphore.hh>
11fdf7f2
TL
28#include <seastar/core/sleep.hh>
29#include <seastar/core/thread.hh>
9f95a23c
TL
30#include <seastar/core/print.hh>
31#include <seastar/util/defer.hh>
20effc67 32#include <seastar/util/closeable.hh>
f67539c2 33#include <mutex>
11fdf7f2
TL
34
35using namespace seastar;
9f95a23c 36using namespace std::chrono_literals;
11fdf7f2
TL
37
38struct async_service : public seastar::async_sharded_service<async_service> {
39 thread_local static bool deleted;
40 ~async_service() {
41 deleted = true;
42 }
43 void run() {
44 auto ref = shared_from_this();
9f95a23c 45 // Wait a while and check.
f67539c2 46 (void)sleep(std::chrono::milliseconds(100 + 100 * this_shard_id())).then([this, ref] {
11fdf7f2
TL
47 check();
48 });
49 }
50 virtual void check() {
51 assert(!deleted);
52 }
53 future<> stop() { return make_ready_future<>(); }
54};
55
56thread_local bool async_service::deleted = false;
57
58struct X {
59 sstring echo(sstring arg) {
60 return arg;
61 }
62 int cpu_id_squared() const {
f67539c2 63 auto id = this_shard_id();
11fdf7f2
TL
64 return id * id;
65 }
66 future<> stop() { return make_ready_future<>(); }
67};
68
69template <typename T, typename Func>
70future<> do_with_distributed(Func&& func) {
71 auto x = make_shared<distributed<T>>();
72 return func(*x).finally([x] {
73 return x->stop();
74 }).finally([x]{});
75}
76
f67539c2 77SEASTAR_TEST_CASE(test_that_each_core_gets_the_arguments) {
11fdf7f2
TL
78 return do_with_distributed<X>([] (auto& x) {
79 return x.start().then([&x] {
80 return x.map_reduce([] (sstring msg){
81 if (msg != "hello") {
82 throw std::runtime_error("wrong message");
83 }
84 }, &X::echo, sstring("hello"));
85 });
86 });
87}
88
f67539c2 89SEASTAR_TEST_CASE(test_functor_version) {
11fdf7f2
TL
90 return do_with_distributed<X>([] (auto& x) {
91 return x.start().then([&x] {
92 return x.map_reduce([] (sstring msg){
93 if (msg != "hello") {
94 throw std::runtime_error("wrong message");
95 }
96 }, [] (X& x) { return x.echo("hello"); });
97 });
98 });
99}
100
101struct Y {
102 sstring s;
103 Y(sstring s) : s(std::move(s)) {}
104 future<> stop() { return make_ready_future<>(); }
105};
106
f67539c2 107SEASTAR_TEST_CASE(test_constructor_argument_is_passed_to_each_core) {
11fdf7f2
TL
108 return do_with_distributed<Y>([] (auto& y) {
109 return y.start(sstring("hello")).then([&y] {
110 return y.invoke_on_all([] (Y& y) {
111 if (y.s != "hello") {
112 throw std::runtime_error(format("expected message mismatch, is \"%s\"", y.s));
113 }
114 });
115 });
116 });
117}
118
f67539c2 119SEASTAR_TEST_CASE(test_map_reduce) {
11fdf7f2
TL
120 return do_with_distributed<X>([] (distributed<X>& x) {
121 return x.start().then([&x] {
122 return x.map_reduce0(std::mem_fn(&X::cpu_id_squared),
123 0,
124 std::plus<int>()).then([] (int result) {
125 int n = smp::count - 1;
126 if (result != (n * (n + 1) * (2*n + 1)) / 6) {
127 throw std::runtime_error("map_reduce failed");
128 }
129 });
130 });
131 });
132}
133
f67539c2 134SEASTAR_TEST_CASE(test_async) {
11fdf7f2
TL
135 return do_with_distributed<async_service>([] (distributed<async_service>& x) {
136 return x.start().then([&x] {
137 return x.invoke_on_all(&async_service::run);
138 });
139 }).then([] {
140 return sleep(std::chrono::milliseconds(100 * (smp::count + 1)));
141 });
142}
143
f67539c2 144SEASTAR_TEST_CASE(test_invoke_on_others) {
11fdf7f2
TL
145 return seastar::async([] {
146 struct my_service {
147 int counter = 0;
148 void up() { ++counter; }
149 future<> stop() { return make_ready_future<>(); }
150 };
151 for (unsigned c = 0; c < smp::count; ++c) {
152 smp::submit_to(c, [c] {
153 return seastar::async([c] {
154 sharded<my_service> s;
155 s.start().get();
156 s.invoke_on_others([](auto& s) { s.up(); }).get();
157 if (s.local().counter != 0) {
158 throw std::runtime_error("local modified");
159 }
160 s.invoke_on_all([c](auto& remote) {
f67539c2 161 if (this_shard_id() != c) {
11fdf7f2
TL
162 if (remote.counter != 1) {
163 throw std::runtime_error("remote not modified");
164 }
165 }
166 }).get();
167 s.stop().get();
168 });
169 }).get();
170 }
171 });
172}
173
20effc67
TL
174SEASTAR_TEST_CASE(test_smp_invoke_on_others) {
175 return seastar::async([] {
176 std::vector<std::vector<int>> calls;
177 calls.reserve(smp::count);
178 for (unsigned i = 0; i < smp::count; i++) {
179 auto& sv = calls.emplace_back();
180 sv.reserve(smp::count);
181 }
182
183 smp::invoke_on_all([&calls] {
184 return smp::invoke_on_others([&calls, from = this_shard_id()] {
185 calls[this_shard_id()].emplace_back(from);
186 });
187 }).get();
188
189 for (unsigned i = 0; i < smp::count; i++) {
190 BOOST_REQUIRE_EQUAL(calls[i].size(), smp::count - 1);
191 for (unsigned f = 0; f < smp::count; f++) {
192 auto r = std::find(calls[i].begin(), calls[i].end(), f);
193 BOOST_REQUIRE_EQUAL(r == calls[i].end(), i == f);
194 }
195 }
196 });
197}
9f95a23c
TL
198
199struct remote_worker {
200 unsigned current = 0;
201 unsigned max_concurrent_observed = 0;
f67539c2
TL
202 unsigned expected_max;
203 semaphore sem{0};
204 remote_worker(unsigned expected_max) : expected_max(expected_max) {
205 }
9f95a23c
TL
206 future<> do_work() {
207 ++current;
208 max_concurrent_observed = std::max(current, max_concurrent_observed);
f67539c2
TL
209 if (max_concurrent_observed >= expected_max && sem.current() == 0) {
210 sem.signal(semaphore::max_counter());
211 }
212 return sem.wait().then([this] {
213 // Sleep a bit to check if the concurrency goes over the max
214 return sleep(100ms).then([this] {
215 max_concurrent_observed = std::max(current, max_concurrent_observed);
216 --current;
217 });
9f95a23c
TL
218 });
219 }
220 future<> do_remote_work(shard_id t, smp_service_group ssg) {
221 return smp::submit_to(t, ssg, [this] {
222 return do_work();
223 });
224 }
225};
226
f67539c2 227SEASTAR_TEST_CASE(test_smp_service_groups) {
9f95a23c
TL
228 return async([] {
229 smp_service_group_config ssgc1;
230 ssgc1.max_nonlocal_requests = 1;
231 auto ssg1 = create_smp_service_group(ssgc1).get0();
232 smp_service_group_config ssgc2;
233 ssgc2.max_nonlocal_requests = 1000;
234 auto ssg2 = create_smp_service_group(ssgc2).get0();
235 shard_id other_shard = smp::count - 1;
f67539c2
TL
236 remote_worker rm1(1);
237 remote_worker rm2(1000);
9f95a23c
TL
238 auto bunch1 = parallel_for_each(boost::irange(0, 20), [&] (int ignore) { return rm1.do_remote_work(other_shard, ssg1); });
239 auto bunch2 = parallel_for_each(boost::irange(0, 2000), [&] (int ignore) { return rm2.do_remote_work(other_shard, ssg2); });
240 bunch1.get();
241 bunch2.get();
242 if (smp::count > 1) {
243 assert(rm1.max_concurrent_observed == 1);
f67539c2 244 assert(rm2.max_concurrent_observed == 1000);
9f95a23c
TL
245 }
246 destroy_smp_service_group(ssg1).get();
247 destroy_smp_service_group(ssg2).get();
248 });
249}
250
f67539c2 251SEASTAR_TEST_CASE(test_smp_service_groups_re_construction) {
9f95a23c
TL
252 // During development of the feature, we saw a bug where the vector
253 // holding the groups did not expand correctly. This test triggers the
254 // bug.
255 return async([] {
256 auto ssg1 = create_smp_service_group({}).get0();
257 auto ssg2 = create_smp_service_group({}).get0();
258 destroy_smp_service_group(ssg1).get();
259 auto ssg3 = create_smp_service_group({}).get0();
260 destroy_smp_service_group(ssg2).get();
261 destroy_smp_service_group(ssg3).get();
262 });
263}
264
f67539c2 265SEASTAR_TEST_CASE(test_smp_timeout) {
9f95a23c
TL
266 return async([] {
267 smp_service_group_config ssgc1;
268 ssgc1.max_nonlocal_requests = 1;
269 auto ssg1 = create_smp_service_group(ssgc1).get0();
270
20effc67 271 auto _ = defer([ssg1] () noexcept {
9f95a23c
TL
272 destroy_smp_service_group(ssg1).get();
273 });
274
275 const shard_id other_shard = smp::count - 1;
276
277 // Ugly but beats using sleeps.
278 std::mutex mut;
279 std::unique_lock<std::mutex> lk(mut);
280
281 // Submitted to the remote shard.
282 auto fut1 = smp::submit_to(other_shard, ssg1, [&mut] {
283 std::cout << "Running request no. 1" << std::endl;
284 std::unique_lock<std::mutex> lk(mut);
285 std::cout << "Request no. 1 done" << std::endl;
286 });
287 // Consume the only unit from the semaphore.
288 auto fut2 = smp::submit_to(other_shard, ssg1, [] {
289 std::cout << "Running request no. 2 - done" << std::endl;
290 });
291
292 auto fut_timedout = smp::submit_to(other_shard, smp_submit_to_options(ssg1, smp_timeout_clock::now() + 10ms), [] {
293 std::cout << "Running timed-out request - done" << std::endl;
294 });
295
296 {
20effc67 297 auto notify = defer([lk = std::move(lk)] () noexcept { });
9f95a23c
TL
298
299 try {
300 fut_timedout.get();
301 throw std::runtime_error("smp::submit_to() didn't timeout as expected");
302 } catch (semaphore_timed_out& e) {
303 std::cout << "Expected timeout received: " << e.what() << std::endl;
304 } catch (...) {
305 std::throw_with_nested(std::runtime_error("smp::submit_to() failed with unexpected exception"));
306 }
307 }
308
309 fut1.get();
310 fut2.get();
311 });
312}
313
f67539c2
TL
314SEASTAR_THREAD_TEST_CASE(test_sharded_parameter) {
315 struct dependency {
316 unsigned val = this_shard_id() * 7;
317 };
318 struct some_service {
319 bool ok = false;
320 some_service(unsigned non_shard_dependent, unsigned shard_dependent, dependency& dep, unsigned shard_dependent_2) {
321 ok =
322 non_shard_dependent == 43
323 && shard_dependent == this_shard_id() * 3
324 && dep.val == this_shard_id() * 7
325 && shard_dependent_2 == -dep.val;
326 }
327 };
328 sharded<dependency> s_dep;
329 s_dep.start().get();
20effc67 330 auto undo1 = deferred_stop(s_dep);
f67539c2
TL
331
332 sharded<some_service> s_service;
333 s_service.start(
334 43, // should be copied verbatim
335 sharded_parameter([] { return this_shard_id() * 3; }),
336 std::ref(s_dep),
337 sharded_parameter([] (dependency& d) { return -d.val; }, std::ref(s_dep))
338 ).get();
20effc67 339 auto undo2 = deferred_stop(s_service);
f67539c2
TL
340
341 auto all_ok = s_service.map_reduce0(std::mem_fn(&some_service::ok), true, std::multiplies<>()).get0();
342 BOOST_REQUIRE(all_ok);
11fdf7f2 343}