]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/test_messenger.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / crimson / test_messenger.cc
1 #include "common/ceph_time.h"
2 #include "messages/MPing.h"
3 #include "crimson/common/log.h"
4 #include "crimson/net/Connection.h"
5 #include "crimson/net/Dispatcher.h"
6 #include "crimson/net/Messenger.h"
7
8 #include <map>
9 #include <random>
10 #include <boost/program_options.hpp>
11 #include <seastar/core/app-template.hh>
12 #include <seastar/core/do_with.hh>
13 #include <seastar/core/future-util.hh>
14 #include <seastar/core/reactor.hh>
15 #include <seastar/core/sleep.hh>
16
17 namespace bpo = boost::program_options;
18
19 namespace {
20
21 seastar::logger& logger() {
22 return ceph::get_logger(ceph_subsys_ms);
23 }
24
25 static std::random_device rd;
26 static std::default_random_engine rng{rd()};
27 static bool verbose = false;
28
29 static seastar::future<> test_echo(unsigned rounds,
30 double keepalive_ratio)
31 {
32 struct test_state {
33 struct Server final
34 : public ceph::net::Dispatcher,
35 public seastar::peering_sharded_service<Server> {
36 ceph::net::Messenger *msgr = nullptr;
37 MessageRef msg_pong{new MPing(), false};
38
39 Dispatcher* get_local_shard() override {
40 return &(container().local());
41 }
42 seastar::future<> stop() {
43 return seastar::make_ready_future<>();
44 }
45 seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
46 MessageRef m) override {
47 if (verbose) {
48 logger().info("server got {}", *m);
49 }
50 // reply with a pong
51 return c->send(msg_pong);
52 }
53
54 seastar::future<> init(const entity_name_t& name,
55 const std::string& lname,
56 const uint64_t nonce,
57 const entity_addr_t& addr) {
58 auto&& fut = ceph::net::Messenger::create(name, lname, nonce);
59 return fut.then([this, addr](ceph::net::Messenger *messenger) {
60 return container().invoke_on_all([messenger](auto& server) {
61 server.msgr = messenger->get_local_shard();
62 }).then([messenger, addr] {
63 return messenger->bind(entity_addrvec_t{addr});
64 }).then([this, messenger] {
65 return messenger->start(this);
66 });
67 });
68 }
69 seastar::future<> shutdown() {
70 ceph_assert(msgr);
71 return msgr->shutdown();
72 }
73 };
74
75 struct Client final
76 : public ceph::net::Dispatcher,
77 public seastar::peering_sharded_service<Client> {
78
79 struct PingSession : public seastar::enable_shared_from_this<PingSession> {
80 unsigned count = 0u;
81 mono_time connected_time;
82 mono_time finish_time;
83 };
84 using PingSessionRef = seastar::shared_ptr<PingSession>;
85
86 unsigned rounds;
87 std::bernoulli_distribution keepalive_dist;
88 ceph::net::Messenger *msgr = nullptr;
89 std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
90 std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
91 MessageRef msg_ping{new MPing(), false};
92
93 Client(unsigned rounds, double keepalive_ratio)
94 : rounds(rounds),
95 keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
96
97 PingSessionRef find_session(ceph::net::ConnectionRef c) {
98 auto found = sessions.find(c);
99 if (found == sessions.end()) {
100 ceph_assert(false);
101 }
102 return found->second;
103 }
104
105 Dispatcher* get_local_shard() override {
106 return &(container().local());
107 }
108 seastar::future<> stop() {
109 return seastar::now();
110 }
111 seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
112 logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
113 auto session = seastar::make_shared<PingSession>();
114 auto [i, added] = sessions.emplace(conn, session);
115 std::ignore = i;
116 ceph_assert(added);
117 session->connected_time = mono_clock::now();
118 return seastar::now();
119 }
120 seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
121 MessageRef m) override {
122 auto session = find_session(c);
123 ++(session->count);
124 if (verbose) {
125 logger().info("client ms_dispatch {}", session->count);
126 }
127
128 if (session->count == rounds) {
129 logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
130 session->finish_time = mono_clock::now();
131 return container().invoke_on_all([conn = c.get()](auto &client) {
132 auto found = client.pending_conns.find(conn);
133 ceph_assert(found != client.pending_conns.end());
134 found->second.set_value();
135 });
136 } else {
137 return seastar::now();
138 }
139 }
140
141 seastar::future<> init(const entity_name_t& name,
142 const std::string& lname,
143 const uint64_t nonce) {
144 return ceph::net::Messenger::create(name, lname, nonce)
145 .then([this](ceph::net::Messenger *messenger) {
146 return container().invoke_on_all([messenger](auto& client) {
147 client.msgr = messenger->get_local_shard();
148 }).then([this, messenger] {
149 return messenger->start(this);
150 });
151 });
152 }
153
154 seastar::future<> shutdown() {
155 ceph_assert(msgr);
156 return msgr->shutdown();
157 }
158
159 seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
160 mono_time start_time = mono_clock::now();
161 return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
162 .then([this, foreign_dispatch, start_time](auto conn) {
163 return seastar::futurize_apply([this, conn, foreign_dispatch] {
164 if (foreign_dispatch) {
165 return do_dispatch_pingpong(&**conn);
166 } else {
167 // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
168 return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
169 return client.do_dispatch_pingpong(conn);
170 });
171 }
172 }).finally([this, conn, start_time] {
173 return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
174 auto session = client.find_session((*conn)->shared_from_this());
175 std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
176 std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
177 logger().info("{}: handshake {}, pingpong {}",
178 **conn, dur_handshake.count(), dur_pingpong.count());
179 });
180 });
181 });
182 }
183
184 private:
185 seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) {
186 return container().invoke_on_all([conn](auto& client) {
187 auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
188 std::ignore = i;
189 ceph_assert(added);
190 }).then([this, conn] {
191 return seastar::do_with(0u, 0u,
192 [this, conn](auto &count_ping, auto &count_keepalive) {
193 return seastar::do_until(
194 [this, conn, &count_ping, &count_keepalive] {
195 bool stop = (count_ping == rounds);
196 if (stop) {
197 logger().info("{}: finished sending {} pings with {} keepalives",
198 *conn, count_ping, count_keepalive);
199 }
200 return stop;
201 },
202 [this, conn, &count_ping, &count_keepalive] {
203 return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
204 if (keepalive_dist(rng)) {
205 return conn->keepalive()
206 .then([&count_keepalive] {
207 count_keepalive += 1;
208 return seastar::make_ready_future<seastar::stop_iteration>(
209 seastar::stop_iteration::no);
210 });
211 } else {
212 return conn->send(msg_ping)
213 .then([&count_ping] {
214 count_ping += 1;
215 return seastar::make_ready_future<seastar::stop_iteration>(
216 seastar::stop_iteration::yes);
217 });
218 }
219 });
220 }).then([this, conn] {
221 auto found = pending_conns.find(conn);
222 return found->second.get_future();
223 });
224 });
225 });
226 }
227 };
228 };
229
230 logger().info("test_echo():");
231 return seastar::when_all_succeed(
232 ceph::net::create_sharded<test_state::Server>(),
233 ceph::net::create_sharded<test_state::Server>(),
234 ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
235 ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
236 .then([rounds, keepalive_ratio](test_state::Server *server1,
237 test_state::Server *server2,
238 test_state::Client *client1,
239 test_state::Client *client2) {
240 // start servers and clients
241 entity_addr_t addr1;
242 addr1.parse("127.0.0.1:9010", nullptr);
243 addr1.set_type(entity_addr_t::TYPE_LEGACY);
244 entity_addr_t addr2;
245 addr2.parse("127.0.0.1:9011", nullptr);
246 addr2.set_type(entity_addr_t::TYPE_LEGACY);
247 return seastar::when_all_succeed(
248 server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
249 server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
250 client1->init(entity_name_t::OSD(2), "client1", 3),
251 client2->init(entity_name_t::OSD(3), "client2", 4))
252 // dispatch pingpoing
253 .then([client1, client2, server1, server2] {
254 return seastar::when_all_succeed(
255 // test connecting in parallel, accepting in parallel,
256 // and operating the connection reference from a foreign/local core
257 client1->dispatch_pingpong(server1->msgr->get_myaddr(), true),
258 client1->dispatch_pingpong(server2->msgr->get_myaddr(), false),
259 client2->dispatch_pingpong(server1->msgr->get_myaddr(), false),
260 client2->dispatch_pingpong(server2->msgr->get_myaddr(), true));
261 // shutdown
262 }).finally([client1] {
263 logger().info("client1 shutdown...");
264 return client1->shutdown();
265 }).finally([client2] {
266 logger().info("client2 shutdown...");
267 return client2->shutdown();
268 }).finally([server1] {
269 logger().info("server1 shutdown...");
270 return server1->shutdown();
271 }).finally([server2] {
272 logger().info("server2 shutdown...");
273 return server2->shutdown();
274 });
275 });
276 }
277
278 static seastar::future<> test_concurrent_dispatch()
279 {
280 struct test_state {
281 struct Server final
282 : public ceph::net::Dispatcher,
283 public seastar::peering_sharded_service<Server> {
284 ceph::net::Messenger *msgr = nullptr;
285 int count = 0;
286 seastar::promise<> on_second; // satisfied on second dispatch
287 seastar::promise<> on_done; // satisfied when first dispatch unblocks
288
289 seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
290 MessageRef m) override {
291 switch (++count) {
292 case 1:
293 // block on the first request until we reenter with the second
294 return on_second.get_future()
295 .then([this] {
296 return container().invoke_on_all([](Server& server) {
297 server.on_done.set_value();
298 });
299 });
300 case 2:
301 on_second.set_value();
302 return seastar::now();
303 default:
304 throw std::runtime_error("unexpected count");
305 }
306 }
307
308 seastar::future<> wait() { return on_done.get_future(); }
309
310 seastar::future<> init(const entity_name_t& name,
311 const std::string& lname,
312 const uint64_t nonce,
313 const entity_addr_t& addr) {
314 return ceph::net::Messenger::create(name, lname, nonce)
315 .then([this, addr](ceph::net::Messenger *messenger) {
316 return container().invoke_on_all([messenger](auto& server) {
317 server.msgr = messenger->get_local_shard();
318 }).then([messenger, addr] {
319 return messenger->bind(entity_addrvec_t{addr});
320 }).then([this, messenger] {
321 return messenger->start(this);
322 });
323 });
324 }
325
326 Dispatcher* get_local_shard() override {
327 return &(container().local());
328 }
329 seastar::future<> stop() {
330 return seastar::make_ready_future<>();
331 }
332 };
333
334 struct Client final
335 : public ceph::net::Dispatcher,
336 public seastar::peering_sharded_service<Client> {
337 ceph::net::Messenger *msgr = nullptr;
338
339 seastar::future<> init(const entity_name_t& name,
340 const std::string& lname,
341 const uint64_t nonce) {
342 return ceph::net::Messenger::create(name, lname, nonce)
343 .then([this](ceph::net::Messenger *messenger) {
344 return container().invoke_on_all([messenger](auto& client) {
345 client.msgr = messenger->get_local_shard();
346 }).then([this, messenger] {
347 return messenger->start(this);
348 });
349 });
350 }
351
352 Dispatcher* get_local_shard() override {
353 return &(container().local());
354 }
355 seastar::future<> stop() {
356 return seastar::make_ready_future<>();
357 }
358 };
359 };
360
361 logger().info("test_concurrent_dispatch():");
362 return seastar::when_all_succeed(
363 ceph::net::create_sharded<test_state::Server>(),
364 ceph::net::create_sharded<test_state::Client>())
365 .then([](test_state::Server *server,
366 test_state::Client *client) {
367 entity_addr_t addr;
368 addr.parse("127.0.0.1:9010", nullptr);
369 addr.set_type(entity_addr_t::TYPE_LEGACY);
370 addr.set_family(AF_INET);
371 return seastar::when_all_succeed(
372 server->init(entity_name_t::OSD(4), "server3", 5, addr),
373 client->init(entity_name_t::OSD(5), "client3", 6))
374 .then([server, client] {
375 return client->msgr->connect(server->msgr->get_myaddr(),
376 entity_name_t::TYPE_OSD);
377 }).then([](ceph::net::ConnectionXRef conn) {
378 // send two messages
379 (*conn)->send(MessageRef{new MPing, false});
380 (*conn)->send(MessageRef{new MPing, false});
381 }).then([server] {
382 server->wait();
383 }).finally([client] {
384 logger().info("client shutdown...");
385 return client->msgr->shutdown();
386 }).finally([server] {
387 logger().info("server shutdown...");
388 return server->msgr->shutdown();
389 });
390 });
391 }
392
393 }
394
395 int main(int argc, char** argv)
396 {
397 seastar::app_template app;
398 app.add_options()
399 ("verbose,v", bpo::value<bool>()->default_value(false),
400 "chatty if true")
401 ("rounds", bpo::value<unsigned>()->default_value(512),
402 "number of pingpong rounds")
403 ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
404 "ratio of keepalive in ping messages");
405 return app.run(argc, argv, [&app] {
406 auto&& config = app.configuration();
407 verbose = config["verbose"].as<bool>();
408 auto rounds = config["rounds"].as<unsigned>();
409 auto keepalive_ratio = config["keepalive-ratio"].as<double>();
410 return test_echo(rounds, keepalive_ratio)
411 .then([] {
412 return test_concurrent_dispatch();
413 }).then([] {
414 std::cout << "All tests succeeded" << std::endl;
415 }).handle_exception([] (auto eptr) {
416 std::cout << "Test failure" << std::endl;
417 return seastar::make_exception_future<>(eptr);
418 });
419 });
420 }