]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/tests/unit/distributed_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / tests / unit / distributed_test.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23 #include <seastar/core/app-template.hh>
24 #include <seastar/core/distributed.hh>
25 #include <seastar/core/future-util.hh>
26 #include <seastar/core/sleep.hh>
27 #include <seastar/core/thread.hh>
28
29 using namespace seastar;
30
31 struct async_service : public seastar::async_sharded_service<async_service> {
32 thread_local static bool deleted;
33 ~async_service() {
34 deleted = true;
35 }
36 void run() {
37 auto ref = shared_from_this();
38 sleep(std::chrono::milliseconds(100 + 100 * engine().cpu_id())).then([this, ref] {
39 check();
40 });
41 }
42 virtual void check() {
43 assert(!deleted);
44 }
45 future<> stop() { return make_ready_future<>(); }
46 };
47
48 thread_local bool async_service::deleted = false;
49
50 struct X {
51 sstring echo(sstring arg) {
52 return arg;
53 }
54 int cpu_id_squared() const {
55 auto id = engine().cpu_id();
56 return id * id;
57 }
58 future<> stop() { return make_ready_future<>(); }
59 };
60
61 template <typename T, typename Func>
62 future<> do_with_distributed(Func&& func) {
63 auto x = make_shared<distributed<T>>();
64 return func(*x).finally([x] {
65 return x->stop();
66 }).finally([x]{});
67 }
68
69 future<> test_that_each_core_gets_the_arguments() {
70 return do_with_distributed<X>([] (auto& x) {
71 return x.start().then([&x] {
72 return x.map_reduce([] (sstring msg){
73 if (msg != "hello") {
74 throw std::runtime_error("wrong message");
75 }
76 }, &X::echo, sstring("hello"));
77 });
78 });
79 }
80
81 future<> test_functor_version() {
82 return do_with_distributed<X>([] (auto& x) {
83 return x.start().then([&x] {
84 return x.map_reduce([] (sstring msg){
85 if (msg != "hello") {
86 throw std::runtime_error("wrong message");
87 }
88 }, [] (X& x) { return x.echo("hello"); });
89 });
90 });
91 }
92
93 struct Y {
94 sstring s;
95 Y(sstring s) : s(std::move(s)) {}
96 future<> stop() { return make_ready_future<>(); }
97 };
98
99 future<> test_constructor_argument_is_passed_to_each_core() {
100 return do_with_distributed<Y>([] (auto& y) {
101 return y.start(sstring("hello")).then([&y] {
102 return y.invoke_on_all([] (Y& y) {
103 if (y.s != "hello") {
104 throw std::runtime_error(format("expected message mismatch, is \"%s\"", y.s));
105 }
106 });
107 });
108 });
109 }
110
111 future<> test_map_reduce() {
112 return do_with_distributed<X>([] (distributed<X>& x) {
113 return x.start().then([&x] {
114 return x.map_reduce0(std::mem_fn(&X::cpu_id_squared),
115 0,
116 std::plus<int>()).then([] (int result) {
117 int n = smp::count - 1;
118 if (result != (n * (n + 1) * (2*n + 1)) / 6) {
119 throw std::runtime_error("map_reduce failed");
120 }
121 });
122 });
123 });
124 }
125
126 future<> test_async() {
127 return do_with_distributed<async_service>([] (distributed<async_service>& x) {
128 return x.start().then([&x] {
129 return x.invoke_on_all(&async_service::run);
130 });
131 }).then([] {
132 return sleep(std::chrono::milliseconds(100 * (smp::count + 1)));
133 });
134 }
135
136 future<> test_invoke_on_others() {
137 return seastar::async([] {
138 struct my_service {
139 int counter = 0;
140 void up() { ++counter; }
141 future<> stop() { return make_ready_future<>(); }
142 };
143 for (unsigned c = 0; c < smp::count; ++c) {
144 smp::submit_to(c, [c] {
145 return seastar::async([c] {
146 sharded<my_service> s;
147 s.start().get();
148 s.invoke_on_others([](auto& s) { s.up(); }).get();
149 if (s.local().counter != 0) {
150 throw std::runtime_error("local modified");
151 }
152 s.invoke_on_all([c](auto& remote) {
153 if (engine().cpu_id() != c) {
154 if (remote.counter != 1) {
155 throw std::runtime_error("remote not modified");
156 }
157 }
158 }).get();
159 s.stop().get();
160 });
161 }).get();
162 }
163 });
164 }
165
166 int main(int argc, char** argv) {
167 app_template app;
168 return app.run(argc, argv, [] {
169 return test_that_each_core_gets_the_arguments().then([] {
170 return test_functor_version();
171 }).then([] {
172 return test_constructor_argument_is_passed_to_each_core();
173 }).then([] {
174 return test_map_reduce();
175 }).then([] {
176 return test_async();
177 }).then([] {
178 return test_invoke_on_others();
179 });
180 });
181 }