1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <fmt/format.h>
7 #include <fmt/ostream.h>
8 #include <seastar/core/app-template.hh>
9 #include <seastar/core/do_with.hh>
10 #include <seastar/core/future-util.hh>
11 #include <seastar/core/reactor.hh>
12 #include <seastar/core/sleep.hh>
13 #include <seastar/core/with_timeout.hh>
15 #include "common/ceph_argparse.h"
16 #include "messages/MPing.h"
17 #include "messages/MCommand.h"
18 #include "crimson/auth/DummyAuth.h"
19 #include "crimson/common/log.h"
20 #include "crimson/net/Connection.h"
21 #include "crimson/net/Dispatcher.h"
22 #include "crimson/net/Messenger.h"
24 using namespace std::chrono_literals
;
25 namespace bpo
= boost::program_options
;
26 using crimson::common::local_conf
;
27 using payload_seq_t
= uint64_t;
35 payload_seq_t seq
= 0;
38 Payload(Who who
, uint64_t seq
, const bufferlist
& data
)
39 : who(who
), seq(seq
), data(data
)
50 WRITE_CLASS_DENC(Payload
)
53 struct fmt::formatter
<Payload
> : fmt::formatter
<std::string_view
> {
54 template <typename FormatContext
>
55 auto format(const Payload
& pl
, FormatContext
& ctx
) const {
56 return fmt::format_to(ctx
.out(), "reply={} i={}", pl
.who
, pl
.seq
);
62 seastar::logger
& logger() {
63 return crimson::get_logger(ceph_subsys_test
);
66 std::random_device rd
;
67 std::default_random_engine rng
{rd()};
68 std::uniform_int_distribution
<> prob(0,99);
71 entity_addr_t
get_server_addr() {
72 static int port
= 16800;
75 saddr
.parse("127.0.0.1", nullptr);
80 uint64_t get_nonce() {
81 static uint64_t nonce
= 1;
86 struct thrash_params_t
{
89 std::size_t connections
;
90 std::size_t random_op
;
93 class SyntheticWorkload
;
95 class SyntheticDispatcher final
96 : public crimson::net::Dispatcher
{
98 std::map
<crimson::net::Connection
*, std::deque
<payload_seq_t
> > conn_sent
;
99 std::map
<payload_seq_t
, bufferlist
> sent
;
101 SyntheticWorkload
*workload
;
103 SyntheticDispatcher(bool s
, SyntheticWorkload
*wl
):
104 index(0), workload(wl
) {
107 std::optional
<seastar::future
<>> ms_dispatch(crimson::net::ConnectionRef con
,
108 MessageRef m
) final
{
110 logger().warn("{}: con = {}", __func__
, *con
);
112 // MSG_COMMAND is used to disorganize regular message flow
113 if (m
->get_type() == MSG_COMMAND
) {
114 return seastar::now();
118 auto p
= m
->get_data().cbegin();
120 if (pl
.who
== Payload::PING
) {
121 logger().info(" {} conn= {} {}", __func__
, *con
, pl
);
122 return reply_message(m
, con
, pl
);
124 ceph_assert(pl
.who
== Payload::PONG
);
125 if (sent
.count(pl
.seq
)) {
126 logger().info(" {} conn= {} {}", __func__
, *con
, pl
);
127 ceph_assert(conn_sent
[&*con
].front() == pl
.seq
);
128 ceph_assert(pl
.data
.contents_equal(sent
[pl
.seq
]));
129 conn_sent
[&*con
].pop_front();
133 return seastar::now();
137 void ms_handle_accept(
138 crimson::net::ConnectionRef conn
,
139 seastar::shard_id prv_shard
,
140 bool is_replace
) final
{
141 logger().info("{} - Connection:{}", __func__
, *conn
);
142 assert(prv_shard
== seastar::this_shard_id());
145 void ms_handle_connect(
146 crimson::net::ConnectionRef conn
,
147 seastar::shard_id prv_shard
) final
{
148 logger().info("{} - Connection:{}", __func__
, *conn
);
149 assert(prv_shard
== seastar::this_shard_id());
152 void ms_handle_reset(crimson::net::ConnectionRef con
, bool is_replace
) final
;
154 void ms_handle_remote_reset(crimson::net::ConnectionRef con
) final
{
158 std::optional
<seastar::future
<>> reply_message(
160 crimson::net::ConnectionRef con
,
162 pl
.who
= Payload::PONG
;
165 auto rm
= crimson::make_message
<MPing
>();
168 logger().info("{} conn= {} reply i= {}",
169 __func__
, *con
, pl
.seq
);
171 return con
->send(std::move(rm
));
174 seastar::future
<> send_message_wrap(crimson::net::ConnectionRef con
,
175 const bufferlist
& data
) {
176 auto m
= crimson::make_message
<MPing
>();
177 Payload pl
{Payload::PING
, index
++, data
};
181 sent
[pl
.seq
] = pl
.data
;
182 conn_sent
[&*con
].push_back(pl
.seq
);
183 logger().info("{} conn= {} send i= {}",
184 __func__
, *con
, pl
.seq
);
186 return con
->send(std::move(m
));
189 uint64_t get_num_pending_msgs() {
193 void clear_pending(crimson::net::ConnectionRef con
) {
194 for (std::deque
<uint64_t>::iterator it
= conn_sent
[&*con
].begin();
195 it
!= conn_sent
[&*con
].end(); ++it
)
197 conn_sent
.erase(&*con
);
201 for (auto && [connptr
, list
] : conn_sent
) {
203 logger().info("{} {} wait {}", __func__
,
204 (void*)connptr
, list
.size());
210 class SyntheticWorkload
{
211 // messengers must be freed after its connections
212 std::set
<crimson::net::MessengerRef
> available_servers
;
213 std::set
<crimson::net::MessengerRef
> available_clients
;
215 crimson::net::SocketPolicy server_policy
;
216 crimson::net::SocketPolicy client_policy
;
217 std::map
<crimson::net::ConnectionRef
,
218 std::pair
<crimson::net::MessengerRef
,
219 crimson::net::MessengerRef
>> available_connections
;
220 SyntheticDispatcher dispatcher
;
221 std::vector
<bufferlist
> rand_data
;
222 crimson::auth::DummyAuthClientServer dummy_auth
;
224 seastar::future
<crimson::net::ConnectionRef
> get_random_connection() {
225 return seastar::do_until(
226 [this] { return dispatcher
.get_num_pending_msgs() <= max_in_flight
; },
227 [] { return seastar::sleep(100ms
); }
229 boost::uniform_int
<> choose(0, available_connections
.size() - 1);
230 int index
= choose(rng
);
231 std::map
<crimson::net::ConnectionRef
,
232 std::pair
<crimson::net::MessengerRef
, crimson::net::MessengerRef
>>::iterator i
233 = available_connections
.begin();
234 for (; index
> 0; --index
, ++i
) ;
235 return seastar::make_ready_future
<crimson::net::ConnectionRef
>(i
->first
);
240 const unsigned min_connections
= 10;
241 const unsigned max_in_flight
= 64;
242 const unsigned max_connections
= 128;
243 const unsigned max_message_len
= 1024 * 1024 * 4;
244 const uint64_t servers
, clients
;
246 SyntheticWorkload(int servers
, int clients
, int random_num
,
247 crimson::net::SocketPolicy srv_policy
,
248 crimson::net::SocketPolicy cli_policy
)
249 : server_policy(srv_policy
),
250 client_policy(cli_policy
),
251 dispatcher(false, this),
255 for (int i
= 0; i
< random_num
; i
++) {
257 boost::uniform_int
<> u(32, max_message_len
);
258 uint64_t value_len
= u(rng
);
259 bufferptr
bp(value_len
);
261 for (uint64_t j
= 0; j
< value_len
-sizeof(i
); ) {
262 memcpy(bp
.c_str()+j
, &i
, sizeof(i
));
267 rand_data
.push_back(bl
);
272 bool can_create_connection() {
273 return available_connections
.size() < max_connections
;
276 seastar::future
<> maybe_generate_connection() {
277 if (!can_create_connection()) {
278 return seastar::now();
280 crimson::net::MessengerRef server
, client
;
282 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
283 int index
= choose(rng
);
284 std::set
<crimson::net::MessengerRef
>::iterator i
285 = available_servers
.begin();
286 for (; index
> 0; --index
, ++i
) ;
290 boost::uniform_int
<> choose(0, available_clients
.size() - 1);
291 int index
= choose(rng
);
292 std::set
<crimson::net::MessengerRef
>::iterator i
293 = available_clients
.begin();
294 for (; index
> 0; --index
, ++i
) ;
299 std::pair
<crimson::net::MessengerRef
, crimson::net::MessengerRef
>
302 crimson::net::ConnectionRef conn
= client
->connect(
303 server
->get_myaddr(),
304 entity_name_t::TYPE_OSD
);
305 connected_pair
= std::make_pair(client
, server
);
306 available_connections
[conn
] = connected_pair
;
308 return seastar::now();
311 seastar::future
<> random_op (const uint64_t& iter
) {
312 return seastar::do_with(iter
, [this] (uint64_t& iter
) {
313 return seastar::do_until(
314 [&] { return iter
== 0; },
318 logger().info("{} Op {} : ", __func__
,iter
);
319 print_internal_state();
324 return maybe_generate_connection();
325 } else if (val
> 80) {
326 return drop_connection();
327 } else if (val
> 10) {
328 return send_message();
330 return seastar::sleep(
331 std::chrono::milliseconds(rand() % 1000 + 500));
337 seastar::future
<> generate_connections (const uint64_t& iter
) {
338 return seastar::do_with(iter
, [this] (uint64_t& iter
) {
339 return seastar::do_until(
340 [&] { return iter
== 0; },
344 if (!(connections_count() % 10)) {
345 logger().info("seeding connection {}",
346 connections_count());
348 return maybe_generate_connection();
353 seastar::future
<> init_server(const entity_name_t
& name
,
354 const std::string
& lname
,
355 const uint64_t nonce
,
356 const entity_addr_t
& addr
) {
357 crimson::net::MessengerRef msgr
=
358 crimson::net::Messenger::create(
359 name
, lname
, nonce
, true);
360 msgr
->set_default_policy(server_policy
);
361 msgr
->set_auth_client(&dummy_auth
);
362 msgr
->set_auth_server(&dummy_auth
);
363 available_servers
.insert(msgr
);
364 return msgr
->bind(entity_addrvec_t
{addr
}).safe_then(
366 return msgr
->start({&dispatcher
});
367 }, crimson::net::Messenger::bind_ertr::all_same_way(
368 [addr
] (const std::error_code
& e
) {
369 logger().error("{} test_messenger_thrash(): "
370 "there is another instance running at {}",
376 seastar::future
<> init_client(const entity_name_t
& name
,
377 const std::string
& lname
,
378 const uint64_t nonce
) {
379 crimson::net::MessengerRef msgr
=
380 crimson::net::Messenger::create(
381 name
, lname
, nonce
, true);
382 msgr
->set_default_policy(client_policy
);
383 msgr
->set_auth_client(&dummy_auth
);
384 msgr
->set_auth_server(&dummy_auth
);
385 available_clients
.insert(msgr
);
386 return msgr
->start({&dispatcher
});
389 seastar::future
<> send_message() {
390 return get_random_connection()
391 .then([this] (crimson::net::ConnectionRef conn
) {
392 boost::uniform_int
<> true_false(0, 99);
393 int val
= true_false(rng
);
396 uuid
.generate_random();
397 auto m
= crimson::make_message
<MCommand
>(uuid
);
398 std::vector
<std::string
> cmds
;
399 cmds
.push_back("command");
401 m
->set_priority(200);
402 return conn
->send(std::move(m
));
404 boost::uniform_int
<> u(0, rand_data
.size()-1);
405 return dispatcher
.send_message_wrap(conn
, rand_data
[u(rng
)]);
410 seastar::future
<> drop_connection() {
411 if (available_connections
.size() < min_connections
) {
412 return seastar::now();
415 return get_random_connection()
416 .then([this] (crimson::net::ConnectionRef conn
) {
417 dispatcher
.clear_pending(conn
);
419 if (!client_policy
.server
&&
420 client_policy
.standby
) {
421 // it's a lossless policy, so we need to mark down each side
422 std::pair
<crimson::net::MessengerRef
, crimson::net::MessengerRef
> &p
=
423 available_connections
[conn
];
424 if (!p
.first
->get_default_policy().server
&&
425 !p
.second
->get_default_policy().server
) {
426 //verify that equal-to operator applies here
427 ceph_assert(p
.first
->owns_connection(*conn
));
428 crimson::net::ConnectionRef peer
= p
.second
->connect(
429 p
.first
->get_myaddr(), p
.first
->get_mytype());
431 dispatcher
.clear_pending(peer
);
432 available_connections
.erase(peer
);
435 ceph_assert(available_connections
.erase(conn
) == 1U);
436 return seastar::now();
440 void print_internal_state(bool detail
=false) {
441 logger().info("available_connections: {} inflight messages: {}",
442 available_connections
.size(),
443 dispatcher
.get_num_pending_msgs());
444 if (detail
&& !available_connections
.empty()) {
449 seastar::future
<> wait_for_done() {
451 return seastar::do_until(
452 [this] { return !dispatcher
.get_num_pending_msgs(); },
456 print_internal_state(true);
458 return seastar::sleep(100ms
);
460 return seastar::do_for_each(available_servers
, [] (auto server
) {
462 logger().info("server {} shutdown" , server
->get_myaddrs());
465 return server
->shutdown();
468 return seastar::do_for_each(available_clients
, [] (auto client
) {
470 logger().info("client {} shutdown" , client
->get_myaddrs());
473 return client
->shutdown();
478 void handle_reset(crimson::net::ConnectionRef con
) {
479 available_connections
.erase(con
);
482 uint64_t servers_count() {
483 return available_servers
.size();
486 uint64_t clients_count() {
487 return available_clients
.size();
490 uint64_t connections_count() {
491 return available_connections
.size();
495 void SyntheticDispatcher::ms_handle_reset(crimson::net::ConnectionRef con
,
497 workload
->handle_reset(con
);
501 seastar::future
<> reset_conf() {
502 return seastar::when_all_succeed(
503 local_conf().set_val("ms_inject_socket_failures", "0"),
504 local_conf().set_val("ms_inject_internal_delays", "0"),
505 local_conf().set_val("ms_inject_delay_probability", "0"),
506 local_conf().set_val("ms_inject_delay_max", "0")
508 return seastar::now();
512 // Testing Crimson messenger (with msgr-v2 protocol) robustness against
513 // network delays and failures. The test includes stress tests and
514 // socket level delays/failures injection tests, letting time
515 // and randomness achieve the best test coverage.
518 // Clients: 8 (stateful)
519 // Servers: 32 (lossless)
520 // Connections: 100 (Generated between random clients/server)
521 // Random Operations: 120 (Generate/Drop Connection, Send Message, Sleep)
522 seastar::future
<> test_stress(thrash_params_t tp
)
525 logger().info("test_stress():");
527 SyntheticWorkload
test_msg(tp
.servers
, tp
.clients
, 100,
528 crimson::net::SocketPolicy::stateful_server(0),
529 crimson::net::SocketPolicy::lossless_client(0));
531 return seastar::do_with(test_msg
, [tp
]
532 (SyntheticWorkload
& test_msg
) {
533 return seastar::do_until([&test_msg
] {
534 return test_msg
.servers_count() == test_msg
.servers
; },
536 entity_addr_t bind_addr
= get_server_addr();
537 bind_addr
.set_type(entity_addr_t::TYPE_MSGR2
);
538 uint64_t server_num
= get_nonce();
539 return test_msg
.init_server(entity_name_t::OSD(server_num
),
540 "server", server_num
, bind_addr
);
541 }).then([&test_msg
] {
542 return seastar::do_until([&test_msg
] {
543 return test_msg
.clients_count() == test_msg
.clients
; },
545 return test_msg
.init_client(entity_name_t::CLIENT(-1),
546 "client", get_nonce());
548 }).then([&test_msg
, tp
] {
549 return test_msg
.generate_connections(tp
.connections
);
550 }).then([&test_msg
, tp
] {
551 return test_msg
.random_op(tp
.random_op
);
552 }).then([&test_msg
] {
553 return test_msg
.wait_for_done();
555 logger().info("test_stress() DONE");
556 }).handle_exception([] (auto eptr
) {
558 "test_stress() failed: got exception {}",
566 // Clients: 8 (statefull)
567 // Servers: 32 (loseless)
568 // Connections: 100 (Generated between random clients/server)
569 // Random Operations: 120 (Generate/Drop Connection, Send Message, Sleep)
570 seastar::future
<> test_injection(thrash_params_t tp
)
573 logger().info("test_injection():");
575 SyntheticWorkload
test_msg(tp
.servers
, tp
.clients
, 100,
576 crimson::net::SocketPolicy::stateful_server(0),
577 crimson::net::SocketPolicy::lossless_client(0));
579 return seastar::do_with(test_msg
, [tp
]
580 (SyntheticWorkload
& test_msg
) {
581 return seastar::do_until([&test_msg
] {
582 return test_msg
.servers_count() == test_msg
.servers
; },
584 entity_addr_t bind_addr
= get_server_addr();
585 bind_addr
.set_type(entity_addr_t::TYPE_MSGR2
);
586 uint64_t server_num
= get_nonce();
587 return test_msg
.init_server(entity_name_t::OSD(server_num
),
588 "server", server_num
, bind_addr
);
589 }).then([&test_msg
] {
590 return seastar::do_until([&test_msg
] {
591 return test_msg
.clients_count() == test_msg
.clients
; },
593 return test_msg
.init_client(entity_name_t::CLIENT(-1),
594 "client", get_nonce());
597 return seastar::when_all_succeed(
598 local_conf().set_val("ms_inject_socket_failures", "30"),
599 local_conf().set_val("ms_inject_internal_delays", "0.1"),
600 local_conf().set_val("ms_inject_delay_probability", "1"),
601 local_conf().set_val("ms_inject_delay_max", "5"));
603 return seastar::now();
604 }).then([&test_msg
, tp
] {
605 return test_msg
.generate_connections(tp
.connections
);
606 }).then([&test_msg
, tp
] {
607 return test_msg
.random_op(tp
.random_op
);
608 }).then([&test_msg
] {
609 return test_msg
.wait_for_done();
611 logger().info("test_inejction() DONE");
612 return seastar::now();
615 }).handle_exception([] (auto eptr
) {
617 "test_injection() failed: got exception {}",
626 seastar::future
<int> do_test(seastar::app_template
& app
)
628 std::vector
<const char*> args
;
630 std::string conf_file_list
;
631 auto init_params
= ceph_argparse_early_args(args
,
632 CEPH_ENTITY_TYPE_CLIENT
,
635 return crimson::common::sharded_conf().start(
636 init_params
.name
, cluster
638 return local_conf().start();
639 }).then([conf_file_list
] {
640 return local_conf().parse_config_files(conf_file_list
);
642 auto&& config
= app
.configuration();
643 verbose
= config
["verbose"].as
<bool>();
644 return test_stress(thrash_params_t
{8, 32, 50, 120})
646 return test_injection(thrash_params_t
{16, 32, 50, 120});
648 logger().info("All tests succeeded");
649 // Seastar has bugs to have events undispatched during shutdown,
650 // which will result in memory leak and thus fail LeakSanitizer.
651 return seastar::sleep(100ms
);
654 return crimson::common::sharded_conf().stop();
657 }).handle_exception([] (auto eptr
) {
658 logger().error("Test failed: got exception {}", eptr
);
663 int main(int argc
, char** argv
)
665 seastar::app_template app
;
667 ("verbose,v", bpo::value
<bool>()->default_value(false),
669 return app
.run(argc
, argv
, [&app
] {