1 #include "common/ceph_time.h"
2 #include "messages/MPing.h"
3 #include "crimson/common/log.h"
4 #include "crimson/net/Connection.h"
5 #include "crimson/net/Dispatcher.h"
6 #include "crimson/net/Messenger.h"
10 #include <boost/program_options.hpp>
11 #include <seastar/core/app-template.hh>
12 #include <seastar/core/do_with.hh>
13 #include <seastar/core/future-util.hh>
14 #include <seastar/core/reactor.hh>
15 #include <seastar/core/sleep.hh>
17 namespace bpo
= boost::program_options
;
21 seastar::logger
& logger() {
22 return ceph::get_logger(ceph_subsys_ms
);
25 static std::random_device rd
;
26 static std::default_random_engine rng
{rd()};
27 static bool verbose
= false;
29 static seastar::future
<> test_echo(unsigned rounds
,
30 double keepalive_ratio
)
34 : public ceph::net::Dispatcher
,
35 public seastar::peering_sharded_service
<Server
> {
36 ceph::net::Messenger
*msgr
= nullptr;
37 MessageRef msg_pong
{new MPing(), false};
39 Dispatcher
* get_local_shard() override
{
40 return &(container().local());
42 seastar::future
<> stop() {
43 return seastar::make_ready_future
<>();
45 seastar::future
<> ms_dispatch(ceph::net::ConnectionRef c
,
46 MessageRef m
) override
{
48 logger().info("server got {}", *m
);
51 return c
->send(msg_pong
);
54 seastar::future
<> init(const entity_name_t
& name
,
55 const std::string
& lname
,
57 const entity_addr_t
& addr
) {
58 auto&& fut
= ceph::net::Messenger::create(name
, lname
, nonce
);
59 return fut
.then([this, addr
](ceph::net::Messenger
*messenger
) {
60 return container().invoke_on_all([messenger
](auto& server
) {
61 server
.msgr
= messenger
->get_local_shard();
62 }).then([messenger
, addr
] {
63 return messenger
->bind(entity_addrvec_t
{addr
});
64 }).then([this, messenger
] {
65 return messenger
->start(this);
69 seastar::future
<> shutdown() {
71 return msgr
->shutdown();
76 : public ceph::net::Dispatcher
,
77 public seastar::peering_sharded_service
<Client
> {
79 struct PingSession
: public seastar::enable_shared_from_this
<PingSession
> {
81 mono_time connected_time
;
82 mono_time finish_time
;
84 using PingSessionRef
= seastar::shared_ptr
<PingSession
>;
87 std::bernoulli_distribution keepalive_dist
;
88 ceph::net::Messenger
*msgr
= nullptr;
89 std::map
<ceph::net::Connection
*, seastar::promise
<>> pending_conns
;
90 std::map
<ceph::net::ConnectionRef
, PingSessionRef
> sessions
;
91 MessageRef msg_ping
{new MPing(), false};
93 Client(unsigned rounds
, double keepalive_ratio
)
95 keepalive_dist(std::bernoulli_distribution
{keepalive_ratio
}) {}
97 PingSessionRef
find_session(ceph::net::ConnectionRef c
) {
98 auto found
= sessions
.find(c
);
99 if (found
== sessions
.end()) {
102 return found
->second
;
105 Dispatcher
* get_local_shard() override
{
106 return &(container().local());
108 seastar::future
<> stop() {
109 return seastar::now();
111 seastar::future
<> ms_handle_connect(ceph::net::ConnectionRef conn
) override
{
112 logger().info("{}: connected to {}", *conn
, conn
->get_peer_addr());
113 auto session
= seastar::make_shared
<PingSession
>();
114 auto [i
, added
] = sessions
.emplace(conn
, session
);
117 session
->connected_time
= mono_clock::now();
118 return seastar::now();
120 seastar::future
<> ms_dispatch(ceph::net::ConnectionRef c
,
121 MessageRef m
) override
{
122 auto session
= find_session(c
);
125 logger().info("client ms_dispatch {}", session
->count
);
128 if (session
->count
== rounds
) {
129 logger().info("{}: finished receiving {} pongs", *c
.get(), session
->count
);
130 session
->finish_time
= mono_clock::now();
131 return container().invoke_on_all([conn
= c
.get()](auto &client
) {
132 auto found
= client
.pending_conns
.find(conn
);
133 ceph_assert(found
!= client
.pending_conns
.end());
134 found
->second
.set_value();
137 return seastar::now();
141 seastar::future
<> init(const entity_name_t
& name
,
142 const std::string
& lname
,
143 const uint64_t nonce
) {
144 return ceph::net::Messenger::create(name
, lname
, nonce
)
145 .then([this](ceph::net::Messenger
*messenger
) {
146 return container().invoke_on_all([messenger
](auto& client
) {
147 client
.msgr
= messenger
->get_local_shard();
148 }).then([this, messenger
] {
149 return messenger
->start(this);
154 seastar::future
<> shutdown() {
156 return msgr
->shutdown();
159 seastar::future
<> dispatch_pingpong(const entity_addr_t
& peer_addr
, bool foreign_dispatch
=true) {
160 mono_time start_time
= mono_clock::now();
161 return msgr
->connect(peer_addr
, entity_name_t::TYPE_OSD
)
162 .then([this, foreign_dispatch
, start_time
](auto conn
) {
163 return seastar::futurize_apply([this, conn
, foreign_dispatch
] {
164 if (foreign_dispatch
) {
165 return do_dispatch_pingpong(&**conn
);
167 // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
168 return container().invoke_on(conn
->get()->shard_id(), [conn
= &**conn
](auto &client
) {
169 return client
.do_dispatch_pingpong(conn
);
172 }).finally([this, conn
, start_time
] {
173 return container().invoke_on(conn
->get()->shard_id(), [conn
, start_time
](auto &client
) {
174 auto session
= client
.find_session((*conn
)->shared_from_this());
175 std::chrono::duration
<double> dur_handshake
= session
->connected_time
- start_time
;
176 std::chrono::duration
<double> dur_pingpong
= session
->finish_time
- session
->connected_time
;
177 logger().info("{}: handshake {}, pingpong {}",
178 **conn
, dur_handshake
.count(), dur_pingpong
.count());
185 seastar::future
<> do_dispatch_pingpong(ceph::net::Connection
* conn
) {
186 return container().invoke_on_all([conn
](auto& client
) {
187 auto [i
, added
] = client
.pending_conns
.emplace(conn
, seastar::promise
<>());
190 }).then([this, conn
] {
191 return seastar::do_with(0u, 0u,
192 [this, conn
](auto &count_ping
, auto &count_keepalive
) {
193 return seastar::do_until(
194 [this, conn
, &count_ping
, &count_keepalive
] {
195 bool stop
= (count_ping
== rounds
);
197 logger().info("{}: finished sending {} pings with {} keepalives",
198 *conn
, count_ping
, count_keepalive
);
202 [this, conn
, &count_ping
, &count_keepalive
] {
203 return seastar::repeat([this, conn
, &count_ping
, &count_keepalive
] {
204 if (keepalive_dist(rng
)) {
205 return conn
->keepalive()
206 .then([&count_keepalive
] {
207 count_keepalive
+= 1;
208 return seastar::make_ready_future
<seastar::stop_iteration
>(
209 seastar::stop_iteration::no
);
212 return conn
->send(msg_ping
)
213 .then([&count_ping
] {
215 return seastar::make_ready_future
<seastar::stop_iteration
>(
216 seastar::stop_iteration::yes
);
220 }).then([this, conn
] {
221 auto found
= pending_conns
.find(conn
);
222 return found
->second
.get_future();
230 logger().info("test_echo():");
231 return seastar::when_all_succeed(
232 ceph::net::create_sharded
<test_state::Server
>(),
233 ceph::net::create_sharded
<test_state::Server
>(),
234 ceph::net::create_sharded
<test_state::Client
>(rounds
, keepalive_ratio
),
235 ceph::net::create_sharded
<test_state::Client
>(rounds
, keepalive_ratio
))
236 .then([rounds
, keepalive_ratio
](test_state::Server
*server1
,
237 test_state::Server
*server2
,
238 test_state::Client
*client1
,
239 test_state::Client
*client2
) {
240 // start servers and clients
242 addr1
.parse("127.0.0.1:9010", nullptr);
243 addr1
.set_type(entity_addr_t::TYPE_LEGACY
);
245 addr2
.parse("127.0.0.1:9011", nullptr);
246 addr2
.set_type(entity_addr_t::TYPE_LEGACY
);
247 return seastar::when_all_succeed(
248 server1
->init(entity_name_t::OSD(0), "server1", 1, addr1
),
249 server2
->init(entity_name_t::OSD(1), "server2", 2, addr2
),
250 client1
->init(entity_name_t::OSD(2), "client1", 3),
251 client2
->init(entity_name_t::OSD(3), "client2", 4))
252 // dispatch pingpoing
253 .then([client1
, client2
, server1
, server2
] {
254 return seastar::when_all_succeed(
255 // test connecting in parallel, accepting in parallel,
256 // and operating the connection reference from a foreign/local core
257 client1
->dispatch_pingpong(server1
->msgr
->get_myaddr(), true),
258 client1
->dispatch_pingpong(server2
->msgr
->get_myaddr(), false),
259 client2
->dispatch_pingpong(server1
->msgr
->get_myaddr(), false),
260 client2
->dispatch_pingpong(server2
->msgr
->get_myaddr(), true));
262 }).finally([client1
] {
263 logger().info("client1 shutdown...");
264 return client1
->shutdown();
265 }).finally([client2
] {
266 logger().info("client2 shutdown...");
267 return client2
->shutdown();
268 }).finally([server1
] {
269 logger().info("server1 shutdown...");
270 return server1
->shutdown();
271 }).finally([server2
] {
272 logger().info("server2 shutdown...");
273 return server2
->shutdown();
278 static seastar::future
<> test_concurrent_dispatch()
282 : public ceph::net::Dispatcher
,
283 public seastar::peering_sharded_service
<Server
> {
284 ceph::net::Messenger
*msgr
= nullptr;
286 seastar::promise
<> on_second
; // satisfied on second dispatch
287 seastar::promise
<> on_done
; // satisfied when first dispatch unblocks
289 seastar::future
<> ms_dispatch(ceph::net::ConnectionRef c
,
290 MessageRef m
) override
{
293 // block on the first request until we reenter with the second
294 return on_second
.get_future()
296 return container().invoke_on_all([](Server
& server
) {
297 server
.on_done
.set_value();
301 on_second
.set_value();
302 return seastar::now();
304 throw std::runtime_error("unexpected count");
308 seastar::future
<> wait() { return on_done
.get_future(); }
310 seastar::future
<> init(const entity_name_t
& name
,
311 const std::string
& lname
,
312 const uint64_t nonce
,
313 const entity_addr_t
& addr
) {
314 return ceph::net::Messenger::create(name
, lname
, nonce
)
315 .then([this, addr
](ceph::net::Messenger
*messenger
) {
316 return container().invoke_on_all([messenger
](auto& server
) {
317 server
.msgr
= messenger
->get_local_shard();
318 }).then([messenger
, addr
] {
319 return messenger
->bind(entity_addrvec_t
{addr
});
320 }).then([this, messenger
] {
321 return messenger
->start(this);
326 Dispatcher
* get_local_shard() override
{
327 return &(container().local());
329 seastar::future
<> stop() {
330 return seastar::make_ready_future
<>();
335 : public ceph::net::Dispatcher
,
336 public seastar::peering_sharded_service
<Client
> {
337 ceph::net::Messenger
*msgr
= nullptr;
339 seastar::future
<> init(const entity_name_t
& name
,
340 const std::string
& lname
,
341 const uint64_t nonce
) {
342 return ceph::net::Messenger::create(name
, lname
, nonce
)
343 .then([this](ceph::net::Messenger
*messenger
) {
344 return container().invoke_on_all([messenger
](auto& client
) {
345 client
.msgr
= messenger
->get_local_shard();
346 }).then([this, messenger
] {
347 return messenger
->start(this);
352 Dispatcher
* get_local_shard() override
{
353 return &(container().local());
355 seastar::future
<> stop() {
356 return seastar::make_ready_future
<>();
361 logger().info("test_concurrent_dispatch():");
362 return seastar::when_all_succeed(
363 ceph::net::create_sharded
<test_state::Server
>(),
364 ceph::net::create_sharded
<test_state::Client
>())
365 .then([](test_state::Server
*server
,
366 test_state::Client
*client
) {
368 addr
.parse("127.0.0.1:9010", nullptr);
369 addr
.set_type(entity_addr_t::TYPE_LEGACY
);
370 addr
.set_family(AF_INET
);
371 return seastar::when_all_succeed(
372 server
->init(entity_name_t::OSD(4), "server3", 5, addr
),
373 client
->init(entity_name_t::OSD(5), "client3", 6))
374 .then([server
, client
] {
375 return client
->msgr
->connect(server
->msgr
->get_myaddr(),
376 entity_name_t::TYPE_OSD
);
377 }).then([](ceph::net::ConnectionXRef conn
) {
379 (*conn
)->send(MessageRef
{new MPing
, false});
380 (*conn
)->send(MessageRef
{new MPing
, false});
383 }).finally([client
] {
384 logger().info("client shutdown...");
385 return client
->msgr
->shutdown();
386 }).finally([server
] {
387 logger().info("server shutdown...");
388 return server
->msgr
->shutdown();
395 int main(int argc
, char** argv
)
397 seastar::app_template app
;
399 ("verbose,v", bpo::value
<bool>()->default_value(false),
401 ("rounds", bpo::value
<unsigned>()->default_value(512),
402 "number of pingpong rounds")
403 ("keepalive-ratio", bpo::value
<double>()->default_value(0.1),
404 "ratio of keepalive in ping messages");
405 return app
.run(argc
, argv
, [&app
] {
406 auto&& config
= app
.configuration();
407 verbose
= config
["verbose"].as
<bool>();
408 auto rounds
= config
["rounds"].as
<unsigned>();
409 auto keepalive_ratio
= config
["keepalive-ratio"].as
<double>();
410 return test_echo(rounds
, keepalive_ratio
)
412 return test_concurrent_dispatch();
414 std::cout
<< "All tests succeeded" << std::endl
;
415 }).handle_exception([] (auto eptr
) {
416 std::cout
<< "Test failure" << std::endl
;
417 return seastar::make_exception_future
<>(eptr
);