]>
git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/tests/unit/distributed_test.cc
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
23 #include <seastar/core/app-template.hh>
24 #include <seastar/core/distributed.hh>
25 #include <seastar/core/future-util.hh>
26 #include <seastar/core/sleep.hh>
27 #include <seastar/core/thread.hh>
29 using namespace seastar
;
31 struct async_service
: public seastar::async_sharded_service
<async_service
> {
32 thread_local
static bool deleted
;
37 auto ref
= shared_from_this();
38 sleep(std::chrono::milliseconds(100 + 100 * engine().cpu_id())).then([this, ref
] {
42 virtual void check() {
45 future
<> stop() { return make_ready_future
<>(); }
48 thread_local
bool async_service::deleted
= false;
51 sstring
echo(sstring arg
) {
54 int cpu_id_squared() const {
55 auto id
= engine().cpu_id();
58 future
<> stop() { return make_ready_future
<>(); }
61 template <typename T
, typename Func
>
62 future
<> do_with_distributed(Func
&& func
) {
63 auto x
= make_shared
<distributed
<T
>>();
64 return func(*x
).finally([x
] {
69 future
<> test_that_each_core_gets_the_arguments() {
70 return do_with_distributed
<X
>([] (auto& x
) {
71 return x
.start().then([&x
] {
72 return x
.map_reduce([] (sstring msg
){
74 throw std::runtime_error("wrong message");
76 }, &X::echo
, sstring("hello"));
81 future
<> test_functor_version() {
82 return do_with_distributed
<X
>([] (auto& x
) {
83 return x
.start().then([&x
] {
84 return x
.map_reduce([] (sstring msg
){
86 throw std::runtime_error("wrong message");
88 }, [] (X
& x
) { return x
.echo("hello"); });
95 Y(sstring s
) : s(std::move(s
)) {}
96 future
<> stop() { return make_ready_future
<>(); }
99 future
<> test_constructor_argument_is_passed_to_each_core() {
100 return do_with_distributed
<Y
>([] (auto& y
) {
101 return y
.start(sstring("hello")).then([&y
] {
102 return y
.invoke_on_all([] (Y
& y
) {
103 if (y
.s
!= "hello") {
104 throw std::runtime_error(format("expected message mismatch, is \"%s\"", y
.s
));
111 future
<> test_map_reduce() {
112 return do_with_distributed
<X
>([] (distributed
<X
>& x
) {
113 return x
.start().then([&x
] {
114 return x
.map_reduce0(std::mem_fn(&X::cpu_id_squared
),
116 std::plus
<int>()).then([] (int result
) {
117 int n
= smp::count
- 1;
118 if (result
!= (n
* (n
+ 1) * (2*n
+ 1)) / 6) {
119 throw std::runtime_error("map_reduce failed");
126 future
<> test_async() {
127 return do_with_distributed
<async_service
>([] (distributed
<async_service
>& x
) {
128 return x
.start().then([&x
] {
129 return x
.invoke_on_all(&async_service::run
);
132 return sleep(std::chrono::milliseconds(100 * (smp::count
+ 1)));
136 future
<> test_invoke_on_others() {
137 return seastar::async([] {
140 void up() { ++counter
; }
141 future
<> stop() { return make_ready_future
<>(); }
143 for (unsigned c
= 0; c
< smp::count
; ++c
) {
144 smp::submit_to(c
, [c
] {
145 return seastar::async([c
] {
146 sharded
<my_service
> s
;
148 s
.invoke_on_others([](auto& s
) { s
.up(); }).get();
149 if (s
.local().counter
!= 0) {
150 throw std::runtime_error("local modified");
152 s
.invoke_on_all([c
](auto& remote
) {
153 if (engine().cpu_id() != c
) {
154 if (remote
.counter
!= 1) {
155 throw std::runtime_error("remote not modified");
166 int main(int argc
, char** argv
) {
168 return app
.run(argc
, argv
, [] {
169 return test_that_each_core_gets_the_arguments().then([] {
170 return test_functor_version();
172 return test_constructor_argument_is_passed_to_each_core();
174 return test_map_reduce();
178 return test_invoke_on_others();