]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/tests/unit/distributed_test.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / tests / unit / distributed_test.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23#include <seastar/core/app-template.hh>
24#include <seastar/core/distributed.hh>
25#include <seastar/core/future-util.hh>
26#include <seastar/core/sleep.hh>
27#include <seastar/core/thread.hh>
9f95a23c
TL
28#include <seastar/core/print.hh>
29#include <seastar/util/defer.hh>
11fdf7f2
TL
30
31using namespace seastar;
9f95a23c 32using namespace std::chrono_literals;
11fdf7f2
TL
33
34struct async_service : public seastar::async_sharded_service<async_service> {
35 thread_local static bool deleted;
36 ~async_service() {
37 deleted = true;
38 }
39 void run() {
40 auto ref = shared_from_this();
9f95a23c
TL
41 // Wait a while and check.
42 (void)sleep(std::chrono::milliseconds(100 + 100 * engine().cpu_id())).then([this, ref] {
11fdf7f2
TL
43 check();
44 });
45 }
46 virtual void check() {
47 assert(!deleted);
48 }
49 future<> stop() { return make_ready_future<>(); }
50};
51
52thread_local bool async_service::deleted = false;
53
54struct X {
55 sstring echo(sstring arg) {
56 return arg;
57 }
58 int cpu_id_squared() const {
59 auto id = engine().cpu_id();
60 return id * id;
61 }
62 future<> stop() { return make_ready_future<>(); }
63};
64
65template <typename T, typename Func>
66future<> do_with_distributed(Func&& func) {
67 auto x = make_shared<distributed<T>>();
68 return func(*x).finally([x] {
69 return x->stop();
70 }).finally([x]{});
71}
72
73future<> test_that_each_core_gets_the_arguments() {
74 return do_with_distributed<X>([] (auto& x) {
75 return x.start().then([&x] {
76 return x.map_reduce([] (sstring msg){
77 if (msg != "hello") {
78 throw std::runtime_error("wrong message");
79 }
80 }, &X::echo, sstring("hello"));
81 });
82 });
83}
84
85future<> test_functor_version() {
86 return do_with_distributed<X>([] (auto& x) {
87 return x.start().then([&x] {
88 return x.map_reduce([] (sstring msg){
89 if (msg != "hello") {
90 throw std::runtime_error("wrong message");
91 }
92 }, [] (X& x) { return x.echo("hello"); });
93 });
94 });
95}
96
97struct Y {
98 sstring s;
99 Y(sstring s) : s(std::move(s)) {}
100 future<> stop() { return make_ready_future<>(); }
101};
102
103future<> test_constructor_argument_is_passed_to_each_core() {
104 return do_with_distributed<Y>([] (auto& y) {
105 return y.start(sstring("hello")).then([&y] {
106 return y.invoke_on_all([] (Y& y) {
107 if (y.s != "hello") {
108 throw std::runtime_error(format("expected message mismatch, is \"%s\"", y.s));
109 }
110 });
111 });
112 });
113}
114
115future<> test_map_reduce() {
116 return do_with_distributed<X>([] (distributed<X>& x) {
117 return x.start().then([&x] {
118 return x.map_reduce0(std::mem_fn(&X::cpu_id_squared),
119 0,
120 std::plus<int>()).then([] (int result) {
121 int n = smp::count - 1;
122 if (result != (n * (n + 1) * (2*n + 1)) / 6) {
123 throw std::runtime_error("map_reduce failed");
124 }
125 });
126 });
127 });
128}
129
130future<> test_async() {
131 return do_with_distributed<async_service>([] (distributed<async_service>& x) {
132 return x.start().then([&x] {
133 return x.invoke_on_all(&async_service::run);
134 });
135 }).then([] {
136 return sleep(std::chrono::milliseconds(100 * (smp::count + 1)));
137 });
138}
139
140future<> test_invoke_on_others() {
141 return seastar::async([] {
142 struct my_service {
143 int counter = 0;
144 void up() { ++counter; }
145 future<> stop() { return make_ready_future<>(); }
146 };
147 for (unsigned c = 0; c < smp::count; ++c) {
148 smp::submit_to(c, [c] {
149 return seastar::async([c] {
150 sharded<my_service> s;
151 s.start().get();
152 s.invoke_on_others([](auto& s) { s.up(); }).get();
153 if (s.local().counter != 0) {
154 throw std::runtime_error("local modified");
155 }
156 s.invoke_on_all([c](auto& remote) {
157 if (engine().cpu_id() != c) {
158 if (remote.counter != 1) {
159 throw std::runtime_error("remote not modified");
160 }
161 }
162 }).get();
163 s.stop().get();
164 });
165 }).get();
166 }
167 });
168}
169
9f95a23c
TL
170
171struct remote_worker {
172 unsigned current = 0;
173 unsigned max_concurrent_observed = 0;
174 future<> do_work() {
175 ++current;
176 max_concurrent_observed = std::max(current, max_concurrent_observed);
177 return sleep(100ms).then([this] {
178 max_concurrent_observed = std::max(current, max_concurrent_observed);
179 --current;
180 });
181 }
182 future<> do_remote_work(shard_id t, smp_service_group ssg) {
183 return smp::submit_to(t, ssg, [this] {
184 return do_work();
185 });
186 }
187};
188
189future<> test_smp_service_groups() {
190 return async([] {
191 smp_service_group_config ssgc1;
192 ssgc1.max_nonlocal_requests = 1;
193 auto ssg1 = create_smp_service_group(ssgc1).get0();
194 smp_service_group_config ssgc2;
195 ssgc2.max_nonlocal_requests = 1000;
196 auto ssg2 = create_smp_service_group(ssgc2).get0();
197 shard_id other_shard = smp::count - 1;
198 remote_worker rm1;
199 remote_worker rm2;
200 auto bunch1 = parallel_for_each(boost::irange(0, 20), [&] (int ignore) { return rm1.do_remote_work(other_shard, ssg1); });
201 auto bunch2 = parallel_for_each(boost::irange(0, 2000), [&] (int ignore) { return rm2.do_remote_work(other_shard, ssg2); });
202 bunch1.get();
203 bunch2.get();
204 if (smp::count > 1) {
205 assert(rm1.max_concurrent_observed == 1);
206 assert(rm2.max_concurrent_observed >= 1000 / (smp::count - 1) && rm2.max_concurrent_observed <= 1000);
207 }
208 destroy_smp_service_group(ssg1).get();
209 destroy_smp_service_group(ssg2).get();
210 });
211}
212
213future<> test_smp_service_groups_re_construction() {
214 // During development of the feature, we saw a bug where the vector
215 // holding the groups did not expand correctly. This test triggers the
216 // bug.
217 return async([] {
218 auto ssg1 = create_smp_service_group({}).get0();
219 auto ssg2 = create_smp_service_group({}).get0();
220 destroy_smp_service_group(ssg1).get();
221 auto ssg3 = create_smp_service_group({}).get0();
222 destroy_smp_service_group(ssg2).get();
223 destroy_smp_service_group(ssg3).get();
224 });
225}
226
227future<> test_smp_timeout() {
228 return async([] {
229 smp_service_group_config ssgc1;
230 ssgc1.max_nonlocal_requests = 1;
231 auto ssg1 = create_smp_service_group(ssgc1).get0();
232
233 auto _ = defer([ssg1] {
234 destroy_smp_service_group(ssg1).get();
235 });
236
237 const shard_id other_shard = smp::count - 1;
238
239 // Ugly but beats using sleeps.
240 std::mutex mut;
241 std::unique_lock<std::mutex> lk(mut);
242
243 // Submitted to the remote shard.
244 auto fut1 = smp::submit_to(other_shard, ssg1, [&mut] {
245 std::cout << "Running request no. 1" << std::endl;
246 std::unique_lock<std::mutex> lk(mut);
247 std::cout << "Request no. 1 done" << std::endl;
248 });
249 // Consume the only unit from the semaphore.
250 auto fut2 = smp::submit_to(other_shard, ssg1, [] {
251 std::cout << "Running request no. 2 - done" << std::endl;
252 });
253
254 auto fut_timedout = smp::submit_to(other_shard, smp_submit_to_options(ssg1, smp_timeout_clock::now() + 10ms), [] {
255 std::cout << "Running timed-out request - done" << std::endl;
256 });
257
258 {
259 auto notify = defer([lk = std::move(lk)] { });
260
261 try {
262 fut_timedout.get();
263 throw std::runtime_error("smp::submit_to() didn't timeout as expected");
264 } catch (semaphore_timed_out& e) {
265 std::cout << "Expected timeout received: " << e.what() << std::endl;
266 } catch (...) {
267 std::throw_with_nested(std::runtime_error("smp::submit_to() failed with unexpected exception"));
268 }
269 }
270
271 fut1.get();
272 fut2.get();
273 });
274}
275
11fdf7f2
TL
276int main(int argc, char** argv) {
277 app_template app;
278 return app.run(argc, argv, [] {
279 return test_that_each_core_gets_the_arguments().then([] {
280 return test_functor_version();
281 }).then([] {
282 return test_constructor_argument_is_passed_to_each_core();
283 }).then([] {
284 return test_map_reduce();
285 }).then([] {
286 return test_async();
287 }).then([] {
288 return test_invoke_on_others();
9f95a23c
TL
289 }).then([] {
290 return test_smp_service_groups();
291 }).then([] {
292 return test_smp_service_groups_re_construction();
293 }).then([] {
294 return test_smp_timeout();
11fdf7f2
TL
295 });
296 });
297}