2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
23 #include <seastar/testing/test_case.hh>
24 #include <seastar/testing/thread_test_case.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/loop.hh>
27 #include <seastar/core/semaphore.hh>
28 #include <seastar/core/sleep.hh>
29 #include <seastar/core/thread.hh>
30 #include <seastar/core/print.hh>
31 #include <seastar/util/defer.hh>
32 #include <seastar/util/closeable.hh>
35 using namespace seastar
;
36 using namespace std::chrono_literals
;
38 struct async_service
: public seastar::async_sharded_service
<async_service
> {
39 thread_local
static bool deleted
;
44 auto ref
= shared_from_this();
45 // Wait a while and check.
46 (void)sleep(std::chrono::milliseconds(100 + 100 * this_shard_id())).then([this, ref
] {
50 virtual void check() {
53 future
<> stop() { return make_ready_future
<>(); }
56 thread_local
bool async_service::deleted
= false;
59 sstring
echo(sstring arg
) {
62 int cpu_id_squared() const {
63 auto id
= this_shard_id();
66 future
<> stop() { return make_ready_future
<>(); }
69 template <typename T
, typename Func
>
70 future
<> do_with_distributed(Func
&& func
) {
71 auto x
= make_shared
<distributed
<T
>>();
72 return func(*x
).finally([x
] {
77 SEASTAR_TEST_CASE(test_that_each_core_gets_the_arguments
) {
78 return do_with_distributed
<X
>([] (auto& x
) {
79 return x
.start().then([&x
] {
80 return x
.map_reduce([] (sstring msg
){
82 throw std::runtime_error("wrong message");
84 }, &X::echo
, sstring("hello"));
89 SEASTAR_TEST_CASE(test_functor_version
) {
90 return do_with_distributed
<X
>([] (auto& x
) {
91 return x
.start().then([&x
] {
92 return x
.map_reduce([] (sstring msg
){
94 throw std::runtime_error("wrong message");
96 }, [] (X
& x
) { return x
.echo("hello"); });
103 Y(sstring s
) : s(std::move(s
)) {}
104 future
<> stop() { return make_ready_future
<>(); }
107 SEASTAR_TEST_CASE(test_constructor_argument_is_passed_to_each_core
) {
108 return do_with_distributed
<Y
>([] (auto& y
) {
109 return y
.start(sstring("hello")).then([&y
] {
110 return y
.invoke_on_all([] (Y
& y
) {
111 if (y
.s
!= "hello") {
112 throw std::runtime_error(format("expected message mismatch, is \"%s\"", y
.s
));
119 SEASTAR_TEST_CASE(test_map_reduce
) {
120 return do_with_distributed
<X
>([] (distributed
<X
>& x
) {
121 return x
.start().then([&x
] {
122 return x
.map_reduce0(std::mem_fn(&X::cpu_id_squared
),
124 std::plus
<int>()).then([] (int result
) {
125 int n
= smp::count
- 1;
126 if (result
!= (n
* (n
+ 1) * (2*n
+ 1)) / 6) {
127 throw std::runtime_error("map_reduce failed");
134 SEASTAR_TEST_CASE(test_async
) {
135 return do_with_distributed
<async_service
>([] (distributed
<async_service
>& x
) {
136 return x
.start().then([&x
] {
137 return x
.invoke_on_all(&async_service::run
);
140 return sleep(std::chrono::milliseconds(100 * (smp::count
+ 1)));
144 SEASTAR_TEST_CASE(test_invoke_on_others
) {
145 return seastar::async([] {
148 void up() { ++counter
; }
149 future
<> stop() { return make_ready_future
<>(); }
151 for (unsigned c
= 0; c
< smp::count
; ++c
) {
152 smp::submit_to(c
, [c
] {
153 return seastar::async([c
] {
154 sharded
<my_service
> s
;
156 s
.invoke_on_others([](auto& s
) { s
.up(); }).get();
157 if (s
.local().counter
!= 0) {
158 throw std::runtime_error("local modified");
160 s
.invoke_on_all([c
](auto& remote
) {
161 if (this_shard_id() != c
) {
162 if (remote
.counter
!= 1) {
163 throw std::runtime_error("remote not modified");
174 SEASTAR_TEST_CASE(test_smp_invoke_on_others
) {
175 return seastar::async([] {
176 std::vector
<std::vector
<int>> calls
;
177 calls
.reserve(smp::count
);
178 for (unsigned i
= 0; i
< smp::count
; i
++) {
179 auto& sv
= calls
.emplace_back();
180 sv
.reserve(smp::count
);
183 smp::invoke_on_all([&calls
] {
184 return smp::invoke_on_others([&calls
, from
= this_shard_id()] {
185 calls
[this_shard_id()].emplace_back(from
);
189 for (unsigned i
= 0; i
< smp::count
; i
++) {
190 BOOST_REQUIRE_EQUAL(calls
[i
].size(), smp::count
- 1);
191 for (unsigned f
= 0; f
< smp::count
; f
++) {
192 auto r
= std::find(calls
[i
].begin(), calls
[i
].end(), f
);
193 BOOST_REQUIRE_EQUAL(r
== calls
[i
].end(), i
== f
);
199 struct remote_worker
{
200 unsigned current
= 0;
201 unsigned max_concurrent_observed
= 0;
202 unsigned expected_max
;
204 remote_worker(unsigned expected_max
) : expected_max(expected_max
) {
208 max_concurrent_observed
= std::max(current
, max_concurrent_observed
);
209 if (max_concurrent_observed
>= expected_max
&& sem
.current() == 0) {
210 sem
.signal(semaphore::max_counter());
212 return sem
.wait().then([this] {
213 // Sleep a bit to check if the concurrency goes over the max
214 return sleep(100ms
).then([this] {
215 max_concurrent_observed
= std::max(current
, max_concurrent_observed
);
220 future
<> do_remote_work(shard_id t
, smp_service_group ssg
) {
221 return smp::submit_to(t
, ssg
, [this] {
227 SEASTAR_TEST_CASE(test_smp_service_groups
) {
229 smp_service_group_config ssgc1
;
230 ssgc1
.max_nonlocal_requests
= 1;
231 auto ssg1
= create_smp_service_group(ssgc1
).get0();
232 smp_service_group_config ssgc2
;
233 ssgc2
.max_nonlocal_requests
= 1000;
234 auto ssg2
= create_smp_service_group(ssgc2
).get0();
235 shard_id other_shard
= smp::count
- 1;
236 remote_worker
rm1(1);
237 remote_worker
rm2(1000);
238 auto bunch1
= parallel_for_each(boost::irange(0, 20), [&] (int ignore
) { return rm1
.do_remote_work(other_shard
, ssg1
); });
239 auto bunch2
= parallel_for_each(boost::irange(0, 2000), [&] (int ignore
) { return rm2
.do_remote_work(other_shard
, ssg2
); });
242 if (smp::count
> 1) {
243 assert(rm1
.max_concurrent_observed
== 1);
244 assert(rm2
.max_concurrent_observed
== 1000);
246 destroy_smp_service_group(ssg1
).get();
247 destroy_smp_service_group(ssg2
).get();
251 SEASTAR_TEST_CASE(test_smp_service_groups_re_construction
) {
252 // During development of the feature, we saw a bug where the vector
253 // holding the groups did not expand correctly. This test triggers the
256 auto ssg1
= create_smp_service_group({}).get0();
257 auto ssg2
= create_smp_service_group({}).get0();
258 destroy_smp_service_group(ssg1
).get();
259 auto ssg3
= create_smp_service_group({}).get0();
260 destroy_smp_service_group(ssg2
).get();
261 destroy_smp_service_group(ssg3
).get();
265 SEASTAR_TEST_CASE(test_smp_timeout
) {
267 smp_service_group_config ssgc1
;
268 ssgc1
.max_nonlocal_requests
= 1;
269 auto ssg1
= create_smp_service_group(ssgc1
).get0();
271 auto _
= defer([ssg1
] () noexcept
{
272 destroy_smp_service_group(ssg1
).get();
275 const shard_id other_shard
= smp::count
- 1;
277 // Ugly but beats using sleeps.
279 std::unique_lock
<std::mutex
> lk(mut
);
281 // Submitted to the remote shard.
282 auto fut1
= smp::submit_to(other_shard
, ssg1
, [&mut
] {
283 std::cout
<< "Running request no. 1" << std::endl
;
284 std::unique_lock
<std::mutex
> lk(mut
);
285 std::cout
<< "Request no. 1 done" << std::endl
;
287 // Consume the only unit from the semaphore.
288 auto fut2
= smp::submit_to(other_shard
, ssg1
, [] {
289 std::cout
<< "Running request no. 2 - done" << std::endl
;
292 auto fut_timedout
= smp::submit_to(other_shard
, smp_submit_to_options(ssg1
, smp_timeout_clock::now() + 10ms
), [] {
293 std::cout
<< "Running timed-out request - done" << std::endl
;
297 auto notify
= defer([lk
= std::move(lk
)] () noexcept
{ });
301 throw std::runtime_error("smp::submit_to() didn't timeout as expected");
302 } catch (semaphore_timed_out
& e
) {
303 std::cout
<< "Expected timeout received: " << e
.what() << std::endl
;
305 std::throw_with_nested(std::runtime_error("smp::submit_to() failed with unexpected exception"));
314 SEASTAR_THREAD_TEST_CASE(test_sharded_parameter
) {
316 unsigned val
= this_shard_id() * 7;
318 struct some_service
{
320 some_service(unsigned non_shard_dependent
, unsigned shard_dependent
, dependency
& dep
, unsigned shard_dependent_2
) {
322 non_shard_dependent
== 43
323 && shard_dependent
== this_shard_id() * 3
324 && dep
.val
== this_shard_id() * 7
325 && shard_dependent_2
== -dep
.val
;
328 sharded
<dependency
> s_dep
;
330 auto undo1
= deferred_stop(s_dep
);
332 sharded
<some_service
> s_service
;
334 43, // should be copied verbatim
335 sharded_parameter([] { return this_shard_id() * 3; }),
337 sharded_parameter([] (dependency
& d
) { return -d
.val
; }, std::ref(s_dep
))
339 auto undo2
= deferred_stop(s_service
);
341 auto all_ok
= s_service
.map_reduce0(std::mem_fn(&some_service::ok
), true, std::multiplies
<>()).get0();
342 BOOST_REQUIRE(all_ok
);