2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
23 #include <seastar/testing/test_case.hh>
24 #include <seastar/testing/thread_test_case.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/loop.hh>
27 #include <seastar/core/semaphore.hh>
28 #include <seastar/core/sleep.hh>
29 #include <seastar/core/thread.hh>
30 #include <seastar/core/print.hh>
31 #include <seastar/util/defer.hh>
32 #include <seastar/util/closeable.hh>
33 #include <seastar/util/later.hh>
36 using namespace seastar
;
37 using namespace std::chrono_literals
;
39 struct async_service
: public seastar::async_sharded_service
<async_service
> {
40 thread_local
static bool deleted
;
45 auto ref
= shared_from_this();
46 // Wait a while and check.
47 (void)sleep(std::chrono::milliseconds(100 + 100 * this_shard_id())).then([this, ref
] {
51 virtual void check() {
54 future
<> stop() { return make_ready_future
<>(); }
57 thread_local
bool async_service::deleted
= false;
60 sstring
echo(sstring arg
) {
63 int cpu_id_squared() const {
64 auto id
= this_shard_id();
67 future
<> stop() { return make_ready_future
<>(); }
70 template <typename T
, typename Func
>
71 future
<> do_with_distributed(Func
&& func
) {
72 auto x
= make_shared
<distributed
<T
>>();
73 return func(*x
).finally([x
] {
78 SEASTAR_TEST_CASE(test_that_each_core_gets_the_arguments
) {
79 return do_with_distributed
<X
>([] (auto& x
) {
80 return x
.start().then([&x
] {
81 return x
.map_reduce([] (sstring msg
){
83 throw std::runtime_error("wrong message");
85 }, &X::echo
, sstring("hello"));
90 SEASTAR_TEST_CASE(test_functor_version
) {
91 return do_with_distributed
<X
>([] (auto& x
) {
92 return x
.start().then([&x
] {
93 return x
.map_reduce([] (sstring msg
){
95 throw std::runtime_error("wrong message");
97 }, [] (X
& x
) { return x
.echo("hello"); });
104 Y(sstring s
) : s(std::move(s
)) {}
105 future
<> stop() { return make_ready_future
<>(); }
108 SEASTAR_TEST_CASE(test_constructor_argument_is_passed_to_each_core
) {
109 return do_with_distributed
<Y
>([] (auto& y
) {
110 return y
.start(sstring("hello")).then([&y
] {
111 return y
.invoke_on_all([] (Y
& y
) {
112 if (y
.s
!= "hello") {
113 throw std::runtime_error(format("expected message mismatch, is \"%s\"", y
.s
));
120 SEASTAR_TEST_CASE(test_map_reduce
) {
121 return do_with_distributed
<X
>([] (distributed
<X
>& x
) {
122 return x
.start().then([&x
] {
123 return x
.map_reduce0(std::mem_fn(&X::cpu_id_squared
),
125 std::plus
<int>()).then([] (int result
) {
126 int n
= smp::count
- 1;
127 if (result
!= (n
* (n
+ 1) * (2*n
+ 1)) / 6) {
128 throw std::runtime_error("map_reduce failed");
135 SEASTAR_TEST_CASE(test_map_reduce_lifetime
) {
137 bool destroyed
= false;
141 auto operator()(const X
& x
) {
142 return yield().then([this, &x
] {
143 BOOST_REQUIRE(!destroyed
);
144 return x
.cpu_id_squared();
150 bool destroyed
= false;
154 auto operator()(int x
) {
155 return yield().then([this, x
] {
156 BOOST_REQUIRE(!destroyed
);
161 return do_with_distributed
<X
>([] (distributed
<X
>& x
) {
162 return x
.start().then([&x
] {
163 return do_with(0L, [&x
] (auto& result
) {
164 return x
.map_reduce(reduce
{result
}, map
{}).then([&result
] {
165 long n
= smp::count
- 1;
166 long expected
= (n
* (n
+ 1) * (2*n
+ 1)) / 6;
167 BOOST_REQUIRE_EQUAL(result
, expected
);
174 SEASTAR_TEST_CASE(test_map_reduce0_lifetime
) {
176 bool destroyed
= false;
180 auto operator()(const X
& x
) const {
181 return yield().then([this, &x
] {
182 BOOST_REQUIRE(!destroyed
);
183 return x
.cpu_id_squared();
188 bool destroyed
= false;
192 auto operator()(long res
, int x
) {
193 BOOST_REQUIRE(!destroyed
);
197 return do_with_distributed
<X
>([] (distributed
<X
>& x
) {
198 return x
.start().then([&x
] {
199 return x
.map_reduce0(map
{}, 0L, reduce
{}).then([] (long result
) {
200 long n
= smp::count
- 1;
201 long expected
= (n
* (n
+ 1) * (2*n
+ 1)) / 6;
202 BOOST_REQUIRE_EQUAL(result
, expected
);
208 SEASTAR_TEST_CASE(test_map_lifetime
) {
210 bool destroyed
= false;
214 auto operator()(const X
& x
) const {
215 return yield().then([this, &x
] {
216 BOOST_REQUIRE(!destroyed
);
217 return x
.cpu_id_squared();
221 return do_with_distributed
<X
>([] (distributed
<X
>& x
) {
222 return x
.start().then([&x
] {
223 return x
.map(map
{}).then([] (std::vector
<int> result
) {
224 BOOST_REQUIRE_EQUAL(result
.size(), smp::count
);
225 for (size_t i
= 0; i
< (size_t)smp::count
; i
++) {
226 BOOST_REQUIRE_EQUAL(result
[i
], i
* i
);
233 SEASTAR_TEST_CASE(test_async
) {
234 return do_with_distributed
<async_service
>([] (distributed
<async_service
>& x
) {
235 return x
.start().then([&x
] {
236 return x
.invoke_on_all(&async_service::run
);
239 return sleep(std::chrono::milliseconds(100 * (smp::count
+ 1)));
243 SEASTAR_TEST_CASE(test_invoke_on_others
) {
244 return seastar::async([] {
247 void up() { ++counter
; }
248 future
<> stop() { return make_ready_future
<>(); }
250 for (unsigned c
= 0; c
< smp::count
; ++c
) {
251 smp::submit_to(c
, [c
] {
252 return seastar::async([c
] {
253 sharded
<my_service
> s
;
255 s
.invoke_on_others([](auto& s
) { s
.up(); }).get();
256 if (s
.local().counter
!= 0) {
257 throw std::runtime_error("local modified");
259 s
.invoke_on_all([c
](auto& remote
) {
260 if (this_shard_id() != c
) {
261 if (remote
.counter
!= 1) {
262 throw std::runtime_error("remote not modified");
273 SEASTAR_TEST_CASE(test_smp_invoke_on_others
) {
274 return seastar::async([] {
275 std::vector
<std::vector
<int>> calls
;
276 calls
.reserve(smp::count
);
277 for (unsigned i
= 0; i
< smp::count
; i
++) {
278 auto& sv
= calls
.emplace_back();
279 sv
.reserve(smp::count
);
282 smp::invoke_on_all([&calls
] {
283 return smp::invoke_on_others([&calls
, from
= this_shard_id()] {
284 calls
[this_shard_id()].emplace_back(from
);
288 for (unsigned i
= 0; i
< smp::count
; i
++) {
289 BOOST_REQUIRE_EQUAL(calls
[i
].size(), smp::count
- 1);
290 for (unsigned f
= 0; f
< smp::count
; f
++) {
291 auto r
= std::find(calls
[i
].begin(), calls
[i
].end(), f
);
292 BOOST_REQUIRE_EQUAL(r
== calls
[i
].end(), i
== f
);
298 struct remote_worker
{
299 unsigned current
= 0;
300 unsigned max_concurrent_observed
= 0;
301 unsigned expected_max
;
303 remote_worker(unsigned expected_max
) : expected_max(expected_max
) {
307 max_concurrent_observed
= std::max(current
, max_concurrent_observed
);
308 if (max_concurrent_observed
>= expected_max
&& sem
.current() == 0) {
309 sem
.signal(semaphore::max_counter());
311 return sem
.wait().then([this] {
312 // Sleep a bit to check if the concurrency goes over the max
313 return sleep(100ms
).then([this] {
314 max_concurrent_observed
= std::max(current
, max_concurrent_observed
);
319 future
<> do_remote_work(shard_id t
, smp_service_group ssg
) {
320 return smp::submit_to(t
, ssg
, [this] {
326 SEASTAR_TEST_CASE(test_smp_service_groups
) {
328 smp_service_group_config ssgc1
;
329 ssgc1
.max_nonlocal_requests
= 1;
330 auto ssg1
= create_smp_service_group(ssgc1
).get0();
331 smp_service_group_config ssgc2
;
332 ssgc2
.max_nonlocal_requests
= 1000;
333 auto ssg2
= create_smp_service_group(ssgc2
).get0();
334 shard_id other_shard
= smp::count
- 1;
335 remote_worker
rm1(1);
336 remote_worker
rm2(1000);
337 auto bunch1
= parallel_for_each(boost::irange(0, 20), [&] (int ignore
) { return rm1
.do_remote_work(other_shard
, ssg1
); });
338 auto bunch2
= parallel_for_each(boost::irange(0, 2000), [&] (int ignore
) { return rm2
.do_remote_work(other_shard
, ssg2
); });
341 if (smp::count
> 1) {
342 assert(rm1
.max_concurrent_observed
== 1);
343 assert(rm2
.max_concurrent_observed
== 1000);
345 destroy_smp_service_group(ssg1
).get();
346 destroy_smp_service_group(ssg2
).get();
350 SEASTAR_TEST_CASE(test_smp_service_groups_re_construction
) {
351 // During development of the feature, we saw a bug where the vector
352 // holding the groups did not expand correctly. This test triggers the
355 auto ssg1
= create_smp_service_group({}).get0();
356 auto ssg2
= create_smp_service_group({}).get0();
357 destroy_smp_service_group(ssg1
).get();
358 auto ssg3
= create_smp_service_group({}).get0();
359 destroy_smp_service_group(ssg2
).get();
360 destroy_smp_service_group(ssg3
).get();
364 SEASTAR_TEST_CASE(test_smp_timeout
) {
366 smp_service_group_config ssgc1
;
367 ssgc1
.max_nonlocal_requests
= 1;
368 auto ssg1
= create_smp_service_group(ssgc1
).get0();
370 auto _
= defer([ssg1
] () noexcept
{
371 destroy_smp_service_group(ssg1
).get();
374 const shard_id other_shard
= smp::count
- 1;
376 // Ugly but beats using sleeps.
378 std::unique_lock
<std::mutex
> lk(mut
);
380 // Submitted to the remote shard.
381 auto fut1
= smp::submit_to(other_shard
, ssg1
, [&mut
] {
382 std::cout
<< "Running request no. 1" << std::endl
;
383 std::unique_lock
<std::mutex
> lk(mut
);
384 std::cout
<< "Request no. 1 done" << std::endl
;
386 // Consume the only unit from the semaphore.
387 auto fut2
= smp::submit_to(other_shard
, ssg1
, [] {
388 std::cout
<< "Running request no. 2 - done" << std::endl
;
391 auto fut_timedout
= smp::submit_to(other_shard
, smp_submit_to_options(ssg1
, smp_timeout_clock::now() + 10ms
), [] {
392 std::cout
<< "Running timed-out request - done" << std::endl
;
396 auto notify
= defer([lk
= std::move(lk
)] () noexcept
{ });
400 throw std::runtime_error("smp::submit_to() didn't timeout as expected");
401 } catch (semaphore_timed_out
& e
) {
402 std::cout
<< "Expected timeout received: " << e
.what() << std::endl
;
404 std::throw_with_nested(std::runtime_error("smp::submit_to() failed with unexpected exception"));
413 SEASTAR_THREAD_TEST_CASE(test_sharded_parameter
) {
415 unsigned val
= this_shard_id() * 7;
417 struct some_service
{
419 some_service(unsigned non_shard_dependent
, unsigned shard_dependent
, dependency
& dep
, unsigned shard_dependent_2
) {
421 non_shard_dependent
== 43
422 && shard_dependent
== this_shard_id() * 3
423 && dep
.val
== this_shard_id() * 7
424 && shard_dependent_2
== -dep
.val
;
427 sharded
<dependency
> s_dep
;
429 auto undo1
= deferred_stop(s_dep
);
431 sharded
<some_service
> s_service
;
433 43, // should be copied verbatim
434 sharded_parameter([] { return this_shard_id() * 3; }),
436 sharded_parameter([] (dependency
& d
) { return -d
.val
; }, std::ref(s_dep
))
438 auto undo2
= deferred_stop(s_service
);
440 auto all_ok
= s_service
.map_reduce0(std::mem_fn(&some_service::ok
), true, std::multiplies
<>()).get0();
441 BOOST_REQUIRE(all_ok
);