]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
21 | */ | |
22 | ||
f67539c2 TL |
23 | #include <seastar/testing/test_case.hh> |
24 | #include <seastar/testing/thread_test_case.hh> | |
11fdf7f2 | 25 | #include <seastar/core/distributed.hh> |
f67539c2 TL |
26 | #include <seastar/core/loop.hh> |
27 | #include <seastar/core/semaphore.hh> | |
11fdf7f2 TL |
28 | #include <seastar/core/sleep.hh> |
29 | #include <seastar/core/thread.hh> | |
9f95a23c TL |
30 | #include <seastar/core/print.hh> |
31 | #include <seastar/util/defer.hh> | |
20effc67 | 32 | #include <seastar/util/closeable.hh> |
f67539c2 | 33 | #include <mutex> |
11fdf7f2 TL |
34 | |
35 | using namespace seastar; | |
9f95a23c | 36 | using namespace std::chrono_literals; |
11fdf7f2 TL |
37 | |
38 | struct async_service : public seastar::async_sharded_service<async_service> { | |
39 | thread_local static bool deleted; | |
40 | ~async_service() { | |
41 | deleted = true; | |
42 | } | |
43 | void run() { | |
44 | auto ref = shared_from_this(); | |
9f95a23c | 45 | // Wait a while and check. |
f67539c2 | 46 | (void)sleep(std::chrono::milliseconds(100 + 100 * this_shard_id())).then([this, ref] { |
11fdf7f2 TL |
47 | check(); |
48 | }); | |
49 | } | |
50 | virtual void check() { | |
51 | assert(!deleted); | |
52 | } | |
53 | future<> stop() { return make_ready_future<>(); } | |
54 | }; | |
55 | ||
56 | thread_local bool async_service::deleted = false; | |
57 | ||
58 | struct X { | |
59 | sstring echo(sstring arg) { | |
60 | return arg; | |
61 | } | |
62 | int cpu_id_squared() const { | |
f67539c2 | 63 | auto id = this_shard_id(); |
11fdf7f2 TL |
64 | return id * id; |
65 | } | |
66 | future<> stop() { return make_ready_future<>(); } | |
67 | }; | |
68 | ||
69 | template <typename T, typename Func> | |
70 | future<> do_with_distributed(Func&& func) { | |
71 | auto x = make_shared<distributed<T>>(); | |
72 | return func(*x).finally([x] { | |
73 | return x->stop(); | |
74 | }).finally([x]{}); | |
75 | } | |
76 | ||
f67539c2 | 77 | SEASTAR_TEST_CASE(test_that_each_core_gets_the_arguments) { |
11fdf7f2 TL |
78 | return do_with_distributed<X>([] (auto& x) { |
79 | return x.start().then([&x] { | |
80 | return x.map_reduce([] (sstring msg){ | |
81 | if (msg != "hello") { | |
82 | throw std::runtime_error("wrong message"); | |
83 | } | |
84 | }, &X::echo, sstring("hello")); | |
85 | }); | |
86 | }); | |
87 | } | |
88 | ||
f67539c2 | 89 | SEASTAR_TEST_CASE(test_functor_version) { |
11fdf7f2 TL |
90 | return do_with_distributed<X>([] (auto& x) { |
91 | return x.start().then([&x] { | |
92 | return x.map_reduce([] (sstring msg){ | |
93 | if (msg != "hello") { | |
94 | throw std::runtime_error("wrong message"); | |
95 | } | |
96 | }, [] (X& x) { return x.echo("hello"); }); | |
97 | }); | |
98 | }); | |
99 | } | |
100 | ||
101 | struct Y { | |
102 | sstring s; | |
103 | Y(sstring s) : s(std::move(s)) {} | |
104 | future<> stop() { return make_ready_future<>(); } | |
105 | }; | |
106 | ||
f67539c2 | 107 | SEASTAR_TEST_CASE(test_constructor_argument_is_passed_to_each_core) { |
11fdf7f2 TL |
108 | return do_with_distributed<Y>([] (auto& y) { |
109 | return y.start(sstring("hello")).then([&y] { | |
110 | return y.invoke_on_all([] (Y& y) { | |
111 | if (y.s != "hello") { | |
112 | throw std::runtime_error(format("expected message mismatch, is \"%s\"", y.s)); | |
113 | } | |
114 | }); | |
115 | }); | |
116 | }); | |
117 | } | |
118 | ||
f67539c2 | 119 | SEASTAR_TEST_CASE(test_map_reduce) { |
11fdf7f2 TL |
120 | return do_with_distributed<X>([] (distributed<X>& x) { |
121 | return x.start().then([&x] { | |
122 | return x.map_reduce0(std::mem_fn(&X::cpu_id_squared), | |
123 | 0, | |
124 | std::plus<int>()).then([] (int result) { | |
125 | int n = smp::count - 1; | |
126 | if (result != (n * (n + 1) * (2*n + 1)) / 6) { | |
127 | throw std::runtime_error("map_reduce failed"); | |
128 | } | |
129 | }); | |
130 | }); | |
131 | }); | |
132 | } | |
133 | ||
f67539c2 | 134 | SEASTAR_TEST_CASE(test_async) { |
11fdf7f2 TL |
135 | return do_with_distributed<async_service>([] (distributed<async_service>& x) { |
136 | return x.start().then([&x] { | |
137 | return x.invoke_on_all(&async_service::run); | |
138 | }); | |
139 | }).then([] { | |
140 | return sleep(std::chrono::milliseconds(100 * (smp::count + 1))); | |
141 | }); | |
142 | } | |
143 | ||
f67539c2 | 144 | SEASTAR_TEST_CASE(test_invoke_on_others) { |
11fdf7f2 TL |
145 | return seastar::async([] { |
146 | struct my_service { | |
147 | int counter = 0; | |
148 | void up() { ++counter; } | |
149 | future<> stop() { return make_ready_future<>(); } | |
150 | }; | |
151 | for (unsigned c = 0; c < smp::count; ++c) { | |
152 | smp::submit_to(c, [c] { | |
153 | return seastar::async([c] { | |
154 | sharded<my_service> s; | |
155 | s.start().get(); | |
156 | s.invoke_on_others([](auto& s) { s.up(); }).get(); | |
157 | if (s.local().counter != 0) { | |
158 | throw std::runtime_error("local modified"); | |
159 | } | |
160 | s.invoke_on_all([c](auto& remote) { | |
f67539c2 | 161 | if (this_shard_id() != c) { |
11fdf7f2 TL |
162 | if (remote.counter != 1) { |
163 | throw std::runtime_error("remote not modified"); | |
164 | } | |
165 | } | |
166 | }).get(); | |
167 | s.stop().get(); | |
168 | }); | |
169 | }).get(); | |
170 | } | |
171 | }); | |
172 | } | |
173 | ||
20effc67 TL |
174 | SEASTAR_TEST_CASE(test_smp_invoke_on_others) { |
175 | return seastar::async([] { | |
176 | std::vector<std::vector<int>> calls; | |
177 | calls.reserve(smp::count); | |
178 | for (unsigned i = 0; i < smp::count; i++) { | |
179 | auto& sv = calls.emplace_back(); | |
180 | sv.reserve(smp::count); | |
181 | } | |
182 | ||
183 | smp::invoke_on_all([&calls] { | |
184 | return smp::invoke_on_others([&calls, from = this_shard_id()] { | |
185 | calls[this_shard_id()].emplace_back(from); | |
186 | }); | |
187 | }).get(); | |
188 | ||
189 | for (unsigned i = 0; i < smp::count; i++) { | |
190 | BOOST_REQUIRE_EQUAL(calls[i].size(), smp::count - 1); | |
191 | for (unsigned f = 0; f < smp::count; f++) { | |
192 | auto r = std::find(calls[i].begin(), calls[i].end(), f); | |
193 | BOOST_REQUIRE_EQUAL(r == calls[i].end(), i == f); | |
194 | } | |
195 | } | |
196 | }); | |
197 | } | |
9f95a23c TL |
198 | |
199 | struct remote_worker { | |
200 | unsigned current = 0; | |
201 | unsigned max_concurrent_observed = 0; | |
f67539c2 TL |
202 | unsigned expected_max; |
203 | semaphore sem{0}; | |
204 | remote_worker(unsigned expected_max) : expected_max(expected_max) { | |
205 | } | |
9f95a23c TL |
206 | future<> do_work() { |
207 | ++current; | |
208 | max_concurrent_observed = std::max(current, max_concurrent_observed); | |
f67539c2 TL |
209 | if (max_concurrent_observed >= expected_max && sem.current() == 0) { |
210 | sem.signal(semaphore::max_counter()); | |
211 | } | |
212 | return sem.wait().then([this] { | |
213 | // Sleep a bit to check if the concurrency goes over the max | |
214 | return sleep(100ms).then([this] { | |
215 | max_concurrent_observed = std::max(current, max_concurrent_observed); | |
216 | --current; | |
217 | }); | |
9f95a23c TL |
218 | }); |
219 | } | |
220 | future<> do_remote_work(shard_id t, smp_service_group ssg) { | |
221 | return smp::submit_to(t, ssg, [this] { | |
222 | return do_work(); | |
223 | }); | |
224 | } | |
225 | }; | |
226 | ||
f67539c2 | 227 | SEASTAR_TEST_CASE(test_smp_service_groups) { |
9f95a23c TL |
228 | return async([] { |
229 | smp_service_group_config ssgc1; | |
230 | ssgc1.max_nonlocal_requests = 1; | |
231 | auto ssg1 = create_smp_service_group(ssgc1).get0(); | |
232 | smp_service_group_config ssgc2; | |
233 | ssgc2.max_nonlocal_requests = 1000; | |
234 | auto ssg2 = create_smp_service_group(ssgc2).get0(); | |
235 | shard_id other_shard = smp::count - 1; | |
f67539c2 TL |
236 | remote_worker rm1(1); |
237 | remote_worker rm2(1000); | |
9f95a23c TL |
238 | auto bunch1 = parallel_for_each(boost::irange(0, 20), [&] (int ignore) { return rm1.do_remote_work(other_shard, ssg1); }); |
239 | auto bunch2 = parallel_for_each(boost::irange(0, 2000), [&] (int ignore) { return rm2.do_remote_work(other_shard, ssg2); }); | |
240 | bunch1.get(); | |
241 | bunch2.get(); | |
242 | if (smp::count > 1) { | |
243 | assert(rm1.max_concurrent_observed == 1); | |
f67539c2 | 244 | assert(rm2.max_concurrent_observed == 1000); |
9f95a23c TL |
245 | } |
246 | destroy_smp_service_group(ssg1).get(); | |
247 | destroy_smp_service_group(ssg2).get(); | |
248 | }); | |
249 | } | |
250 | ||
f67539c2 | 251 | SEASTAR_TEST_CASE(test_smp_service_groups_re_construction) { |
9f95a23c TL |
252 | // During development of the feature, we saw a bug where the vector |
253 | // holding the groups did not expand correctly. This test triggers the | |
254 | // bug. | |
255 | return async([] { | |
256 | auto ssg1 = create_smp_service_group({}).get0(); | |
257 | auto ssg2 = create_smp_service_group({}).get0(); | |
258 | destroy_smp_service_group(ssg1).get(); | |
259 | auto ssg3 = create_smp_service_group({}).get0(); | |
260 | destroy_smp_service_group(ssg2).get(); | |
261 | destroy_smp_service_group(ssg3).get(); | |
262 | }); | |
263 | } | |
264 | ||
f67539c2 | 265 | SEASTAR_TEST_CASE(test_smp_timeout) { |
9f95a23c TL |
266 | return async([] { |
267 | smp_service_group_config ssgc1; | |
268 | ssgc1.max_nonlocal_requests = 1; | |
269 | auto ssg1 = create_smp_service_group(ssgc1).get0(); | |
270 | ||
20effc67 | 271 | auto _ = defer([ssg1] () noexcept { |
9f95a23c TL |
272 | destroy_smp_service_group(ssg1).get(); |
273 | }); | |
274 | ||
275 | const shard_id other_shard = smp::count - 1; | |
276 | ||
277 | // Ugly but beats using sleeps. | |
278 | std::mutex mut; | |
279 | std::unique_lock<std::mutex> lk(mut); | |
280 | ||
281 | // Submitted to the remote shard. | |
282 | auto fut1 = smp::submit_to(other_shard, ssg1, [&mut] { | |
283 | std::cout << "Running request no. 1" << std::endl; | |
284 | std::unique_lock<std::mutex> lk(mut); | |
285 | std::cout << "Request no. 1 done" << std::endl; | |
286 | }); | |
287 | // Consume the only unit from the semaphore. | |
288 | auto fut2 = smp::submit_to(other_shard, ssg1, [] { | |
289 | std::cout << "Running request no. 2 - done" << std::endl; | |
290 | }); | |
291 | ||
292 | auto fut_timedout = smp::submit_to(other_shard, smp_submit_to_options(ssg1, smp_timeout_clock::now() + 10ms), [] { | |
293 | std::cout << "Running timed-out request - done" << std::endl; | |
294 | }); | |
295 | ||
296 | { | |
20effc67 | 297 | auto notify = defer([lk = std::move(lk)] () noexcept { }); |
9f95a23c TL |
298 | |
299 | try { | |
300 | fut_timedout.get(); | |
301 | throw std::runtime_error("smp::submit_to() didn't timeout as expected"); | |
302 | } catch (semaphore_timed_out& e) { | |
303 | std::cout << "Expected timeout received: " << e.what() << std::endl; | |
304 | } catch (...) { | |
305 | std::throw_with_nested(std::runtime_error("smp::submit_to() failed with unexpected exception")); | |
306 | } | |
307 | } | |
308 | ||
309 | fut1.get(); | |
310 | fut2.get(); | |
311 | }); | |
312 | } | |
313 | ||
f67539c2 TL |
314 | SEASTAR_THREAD_TEST_CASE(test_sharded_parameter) { |
315 | struct dependency { | |
316 | unsigned val = this_shard_id() * 7; | |
317 | }; | |
318 | struct some_service { | |
319 | bool ok = false; | |
320 | some_service(unsigned non_shard_dependent, unsigned shard_dependent, dependency& dep, unsigned shard_dependent_2) { | |
321 | ok = | |
322 | non_shard_dependent == 43 | |
323 | && shard_dependent == this_shard_id() * 3 | |
324 | && dep.val == this_shard_id() * 7 | |
325 | && shard_dependent_2 == -dep.val; | |
326 | } | |
327 | }; | |
328 | sharded<dependency> s_dep; | |
329 | s_dep.start().get(); | |
20effc67 | 330 | auto undo1 = deferred_stop(s_dep); |
f67539c2 TL |
331 | |
332 | sharded<some_service> s_service; | |
333 | s_service.start( | |
334 | 43, // should be copied verbatim | |
335 | sharded_parameter([] { return this_shard_id() * 3; }), | |
336 | std::ref(s_dep), | |
337 | sharded_parameter([] (dependency& d) { return -d.val; }, std::ref(s_dep)) | |
338 | ).get(); | |
20effc67 | 339 | auto undo2 = deferred_stop(s_service); |
f67539c2 TL |
340 | |
341 | auto all_ok = s_service.map_reduce0(std::mem_fn(&some_service::ok), true, std::multiplies<>()).get0(); | |
342 | BOOST_REQUIRE(all_ok); | |
11fdf7f2 | 343 | } |