]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
21 | */ | |
22 | ||
23 | #include <seastar/core/app-template.hh> | |
24 | #include <seastar/core/distributed.hh> | |
25 | #include <seastar/core/future-util.hh> | |
26 | #include <seastar/core/sleep.hh> | |
27 | #include <seastar/core/thread.hh> | |
9f95a23c TL |
28 | #include <seastar/core/print.hh> |
29 | #include <seastar/util/defer.hh> | |
11fdf7f2 TL |
30 | |
31 | using namespace seastar; | |
9f95a23c | 32 | using namespace std::chrono_literals; |
11fdf7f2 TL |
33 | |
34 | struct async_service : public seastar::async_sharded_service<async_service> { | |
35 | thread_local static bool deleted; | |
36 | ~async_service() { | |
37 | deleted = true; | |
38 | } | |
39 | void run() { | |
40 | auto ref = shared_from_this(); | |
9f95a23c TL |
41 | // Wait a while and check. |
42 | (void)sleep(std::chrono::milliseconds(100 + 100 * engine().cpu_id())).then([this, ref] { | |
11fdf7f2 TL |
43 | check(); |
44 | }); | |
45 | } | |
46 | virtual void check() { | |
47 | assert(!deleted); | |
48 | } | |
49 | future<> stop() { return make_ready_future<>(); } | |
50 | }; | |
51 | ||
52 | thread_local bool async_service::deleted = false; | |
53 | ||
54 | struct X { | |
55 | sstring echo(sstring arg) { | |
56 | return arg; | |
57 | } | |
58 | int cpu_id_squared() const { | |
59 | auto id = engine().cpu_id(); | |
60 | return id * id; | |
61 | } | |
62 | future<> stop() { return make_ready_future<>(); } | |
63 | }; | |
64 | ||
65 | template <typename T, typename Func> | |
66 | future<> do_with_distributed(Func&& func) { | |
67 | auto x = make_shared<distributed<T>>(); | |
68 | return func(*x).finally([x] { | |
69 | return x->stop(); | |
70 | }).finally([x]{}); | |
71 | } | |
72 | ||
73 | future<> test_that_each_core_gets_the_arguments() { | |
74 | return do_with_distributed<X>([] (auto& x) { | |
75 | return x.start().then([&x] { | |
76 | return x.map_reduce([] (sstring msg){ | |
77 | if (msg != "hello") { | |
78 | throw std::runtime_error("wrong message"); | |
79 | } | |
80 | }, &X::echo, sstring("hello")); | |
81 | }); | |
82 | }); | |
83 | } | |
84 | ||
85 | future<> test_functor_version() { | |
86 | return do_with_distributed<X>([] (auto& x) { | |
87 | return x.start().then([&x] { | |
88 | return x.map_reduce([] (sstring msg){ | |
89 | if (msg != "hello") { | |
90 | throw std::runtime_error("wrong message"); | |
91 | } | |
92 | }, [] (X& x) { return x.echo("hello"); }); | |
93 | }); | |
94 | }); | |
95 | } | |
96 | ||
97 | struct Y { | |
98 | sstring s; | |
99 | Y(sstring s) : s(std::move(s)) {} | |
100 | future<> stop() { return make_ready_future<>(); } | |
101 | }; | |
102 | ||
103 | future<> test_constructor_argument_is_passed_to_each_core() { | |
104 | return do_with_distributed<Y>([] (auto& y) { | |
105 | return y.start(sstring("hello")).then([&y] { | |
106 | return y.invoke_on_all([] (Y& y) { | |
107 | if (y.s != "hello") { | |
108 | throw std::runtime_error(format("expected message mismatch, is \"%s\"", y.s)); | |
109 | } | |
110 | }); | |
111 | }); | |
112 | }); | |
113 | } | |
114 | ||
115 | future<> test_map_reduce() { | |
116 | return do_with_distributed<X>([] (distributed<X>& x) { | |
117 | return x.start().then([&x] { | |
118 | return x.map_reduce0(std::mem_fn(&X::cpu_id_squared), | |
119 | 0, | |
120 | std::plus<int>()).then([] (int result) { | |
121 | int n = smp::count - 1; | |
122 | if (result != (n * (n + 1) * (2*n + 1)) / 6) { | |
123 | throw std::runtime_error("map_reduce failed"); | |
124 | } | |
125 | }); | |
126 | }); | |
127 | }); | |
128 | } | |
129 | ||
130 | future<> test_async() { | |
131 | return do_with_distributed<async_service>([] (distributed<async_service>& x) { | |
132 | return x.start().then([&x] { | |
133 | return x.invoke_on_all(&async_service::run); | |
134 | }); | |
135 | }).then([] { | |
136 | return sleep(std::chrono::milliseconds(100 * (smp::count + 1))); | |
137 | }); | |
138 | } | |
139 | ||
140 | future<> test_invoke_on_others() { | |
141 | return seastar::async([] { | |
142 | struct my_service { | |
143 | int counter = 0; | |
144 | void up() { ++counter; } | |
145 | future<> stop() { return make_ready_future<>(); } | |
146 | }; | |
147 | for (unsigned c = 0; c < smp::count; ++c) { | |
148 | smp::submit_to(c, [c] { | |
149 | return seastar::async([c] { | |
150 | sharded<my_service> s; | |
151 | s.start().get(); | |
152 | s.invoke_on_others([](auto& s) { s.up(); }).get(); | |
153 | if (s.local().counter != 0) { | |
154 | throw std::runtime_error("local modified"); | |
155 | } | |
156 | s.invoke_on_all([c](auto& remote) { | |
157 | if (engine().cpu_id() != c) { | |
158 | if (remote.counter != 1) { | |
159 | throw std::runtime_error("remote not modified"); | |
160 | } | |
161 | } | |
162 | }).get(); | |
163 | s.stop().get(); | |
164 | }); | |
165 | }).get(); | |
166 | } | |
167 | }); | |
168 | } | |
169 | ||
9f95a23c TL |
170 | |
171 | struct remote_worker { | |
172 | unsigned current = 0; | |
173 | unsigned max_concurrent_observed = 0; | |
174 | future<> do_work() { | |
175 | ++current; | |
176 | max_concurrent_observed = std::max(current, max_concurrent_observed); | |
177 | return sleep(100ms).then([this] { | |
178 | max_concurrent_observed = std::max(current, max_concurrent_observed); | |
179 | --current; | |
180 | }); | |
181 | } | |
182 | future<> do_remote_work(shard_id t, smp_service_group ssg) { | |
183 | return smp::submit_to(t, ssg, [this] { | |
184 | return do_work(); | |
185 | }); | |
186 | } | |
187 | }; | |
188 | ||
189 | future<> test_smp_service_groups() { | |
190 | return async([] { | |
191 | smp_service_group_config ssgc1; | |
192 | ssgc1.max_nonlocal_requests = 1; | |
193 | auto ssg1 = create_smp_service_group(ssgc1).get0(); | |
194 | smp_service_group_config ssgc2; | |
195 | ssgc2.max_nonlocal_requests = 1000; | |
196 | auto ssg2 = create_smp_service_group(ssgc2).get0(); | |
197 | shard_id other_shard = smp::count - 1; | |
198 | remote_worker rm1; | |
199 | remote_worker rm2; | |
200 | auto bunch1 = parallel_for_each(boost::irange(0, 20), [&] (int ignore) { return rm1.do_remote_work(other_shard, ssg1); }); | |
201 | auto bunch2 = parallel_for_each(boost::irange(0, 2000), [&] (int ignore) { return rm2.do_remote_work(other_shard, ssg2); }); | |
202 | bunch1.get(); | |
203 | bunch2.get(); | |
204 | if (smp::count > 1) { | |
205 | assert(rm1.max_concurrent_observed == 1); | |
206 | assert(rm2.max_concurrent_observed >= 1000 / (smp::count - 1) && rm2.max_concurrent_observed <= 1000); | |
207 | } | |
208 | destroy_smp_service_group(ssg1).get(); | |
209 | destroy_smp_service_group(ssg2).get(); | |
210 | }); | |
211 | } | |
212 | ||
213 | future<> test_smp_service_groups_re_construction() { | |
214 | // During development of the feature, we saw a bug where the vector | |
215 | // holding the groups did not expand correctly. This test triggers the | |
216 | // bug. | |
217 | return async([] { | |
218 | auto ssg1 = create_smp_service_group({}).get0(); | |
219 | auto ssg2 = create_smp_service_group({}).get0(); | |
220 | destroy_smp_service_group(ssg1).get(); | |
221 | auto ssg3 = create_smp_service_group({}).get0(); | |
222 | destroy_smp_service_group(ssg2).get(); | |
223 | destroy_smp_service_group(ssg3).get(); | |
224 | }); | |
225 | } | |
226 | ||
227 | future<> test_smp_timeout() { | |
228 | return async([] { | |
229 | smp_service_group_config ssgc1; | |
230 | ssgc1.max_nonlocal_requests = 1; | |
231 | auto ssg1 = create_smp_service_group(ssgc1).get0(); | |
232 | ||
233 | auto _ = defer([ssg1] { | |
234 | destroy_smp_service_group(ssg1).get(); | |
235 | }); | |
236 | ||
237 | const shard_id other_shard = smp::count - 1; | |
238 | ||
239 | // Ugly but beats using sleeps. | |
240 | std::mutex mut; | |
241 | std::unique_lock<std::mutex> lk(mut); | |
242 | ||
243 | // Submitted to the remote shard. | |
244 | auto fut1 = smp::submit_to(other_shard, ssg1, [&mut] { | |
245 | std::cout << "Running request no. 1" << std::endl; | |
246 | std::unique_lock<std::mutex> lk(mut); | |
247 | std::cout << "Request no. 1 done" << std::endl; | |
248 | }); | |
249 | // Consume the only unit from the semaphore. | |
250 | auto fut2 = smp::submit_to(other_shard, ssg1, [] { | |
251 | std::cout << "Running request no. 2 - done" << std::endl; | |
252 | }); | |
253 | ||
254 | auto fut_timedout = smp::submit_to(other_shard, smp_submit_to_options(ssg1, smp_timeout_clock::now() + 10ms), [] { | |
255 | std::cout << "Running timed-out request - done" << std::endl; | |
256 | }); | |
257 | ||
258 | { | |
259 | auto notify = defer([lk = std::move(lk)] { }); | |
260 | ||
261 | try { | |
262 | fut_timedout.get(); | |
263 | throw std::runtime_error("smp::submit_to() didn't timeout as expected"); | |
264 | } catch (semaphore_timed_out& e) { | |
265 | std::cout << "Expected timeout received: " << e.what() << std::endl; | |
266 | } catch (...) { | |
267 | std::throw_with_nested(std::runtime_error("smp::submit_to() failed with unexpected exception")); | |
268 | } | |
269 | } | |
270 | ||
271 | fut1.get(); | |
272 | fut2.get(); | |
273 | }); | |
274 | } | |
275 | ||
11fdf7f2 TL |
276 | int main(int argc, char** argv) { |
277 | app_template app; | |
278 | return app.run(argc, argv, [] { | |
279 | return test_that_each_core_gets_the_arguments().then([] { | |
280 | return test_functor_version(); | |
281 | }).then([] { | |
282 | return test_constructor_argument_is_passed_to_each_core(); | |
283 | }).then([] { | |
284 | return test_map_reduce(); | |
285 | }).then([] { | |
286 | return test_async(); | |
287 | }).then([] { | |
288 | return test_invoke_on_others(); | |
9f95a23c TL |
289 | }).then([] { |
290 | return test_smp_service_groups(); | |
291 | }).then([] { | |
292 | return test_smp_service_groups_re_construction(); | |
293 | }).then([] { | |
294 | return test_smp_timeout(); | |
11fdf7f2 TL |
295 | }); |
296 | }); | |
297 | } |