]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/test_messenger_thrash.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / crimson / test_messenger_thrash.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <map>
5 #include <random>
6 #include <fmt/format.h>
7 #include <fmt/ostream.h>
8 #include <seastar/core/app-template.hh>
9 #include <seastar/core/do_with.hh>
10 #include <seastar/core/future-util.hh>
11 #include <seastar/core/reactor.hh>
12 #include <seastar/core/sleep.hh>
13 #include <seastar/core/with_timeout.hh>
14
15 #include "common/ceph_argparse.h"
16 #include "messages/MPing.h"
17 #include "messages/MCommand.h"
18 #include "crimson/auth/DummyAuth.h"
19 #include "crimson/common/log.h"
20 #include "crimson/net/Connection.h"
21 #include "crimson/net/Dispatcher.h"
22 #include "crimson/net/Messenger.h"
23
24 using namespace std::chrono_literals;
25 namespace bpo = boost::program_options;
26 using crimson::common::local_conf;
27 using payload_seq_t = uint64_t;
28
29 struct Payload {
30 enum Who : uint8_t {
31 PING = 0,
32 PONG = 1,
33 };
34 uint8_t who = 0;
35 payload_seq_t seq = 0;
36 bufferlist data;
37
38 Payload(Who who, uint64_t seq, const bufferlist& data)
39 : who(who), seq(seq), data(data)
40 {}
41 Payload() = default;
42 DENC(Payload, v, p) {
43 DENC_START(1, 1, p);
44 denc(v.who, p);
45 denc(v.seq, p);
46 denc(v.data, p);
47 DENC_FINISH(p);
48 }
49 };
50 WRITE_CLASS_DENC(Payload)
51
52 template<>
53 struct fmt::formatter<Payload> : fmt::formatter<std::string_view> {
54 template <typename FormatContext>
55 auto format(const Payload& pl, FormatContext& ctx) const {
56 return fmt::format_to(ctx.out(), "reply={} i={}", pl.who, pl.seq);
57 }
58 };
59
60 namespace {
61
62 seastar::logger& logger() {
63 return crimson::get_logger(ceph_subsys_test);
64 }
65
66 std::random_device rd;
67 std::default_random_engine rng{rd()};
68 std::uniform_int_distribution<> prob(0,99);
69 bool verbose = false;
70
71 entity_addr_t get_server_addr() {
72 static int port = 16800;
73 ++port;
74 entity_addr_t saddr;
75 saddr.parse("127.0.0.1", nullptr);
76 saddr.set_port(port);
77 return saddr;
78 }
79
80 uint64_t get_nonce() {
81 static uint64_t nonce = 1;
82 ++nonce;
83 return nonce;
84 }
85
86 struct thrash_params_t {
87 std::size_t servers;
88 std::size_t clients;
89 std::size_t connections;
90 std::size_t random_op;
91 };
92
93 class SyntheticWorkload;
94
95 class SyntheticDispatcher final
96 : public crimson::net::Dispatcher {
97 public:
98 std::map<crimson::net::Connection*, std::deque<payload_seq_t> > conn_sent;
99 std::map<payload_seq_t, bufferlist> sent;
100 unsigned index;
101 SyntheticWorkload *workload;
102
103 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
104 index(0), workload(wl) {
105 }
106
107 std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef con,
108 MessageRef m) final {
109 if (verbose) {
110 logger().warn("{}: con = {}", __func__, *con);
111 }
112 // MSG_COMMAND is used to disorganize regular message flow
113 if (m->get_type() == MSG_COMMAND) {
114 return seastar::now();
115 }
116
117 Payload pl;
118 auto p = m->get_data().cbegin();
119 decode(pl, p);
120 if (pl.who == Payload::PING) {
121 logger().info(" {} conn= {} {}", __func__, *con, pl);
122 return reply_message(m, con, pl);
123 } else {
124 ceph_assert(pl.who == Payload::PONG);
125 if (sent.count(pl.seq)) {
126 logger().info(" {} conn= {} {}", __func__, *con, pl);
127 ceph_assert(conn_sent[&*con].front() == pl.seq);
128 ceph_assert(pl.data.contents_equal(sent[pl.seq]));
129 conn_sent[&*con].pop_front();
130 sent.erase(pl.seq);
131 }
132
133 return seastar::now();
134 }
135 }
136
137 void ms_handle_accept(
138 crimson::net::ConnectionRef conn,
139 seastar::shard_id prv_shard,
140 bool is_replace) final {
141 logger().info("{} - Connection:{}", __func__, *conn);
142 assert(prv_shard == seastar::this_shard_id());
143 }
144
145 void ms_handle_connect(
146 crimson::net::ConnectionRef conn,
147 seastar::shard_id prv_shard) final {
148 logger().info("{} - Connection:{}", __func__, *conn);
149 assert(prv_shard == seastar::this_shard_id());
150 }
151
152 void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace) final;
153
154 void ms_handle_remote_reset(crimson::net::ConnectionRef con) final {
155 clear_pending(con);
156 }
157
158 std::optional<seastar::future<>> reply_message(
159 const MessageRef m,
160 crimson::net::ConnectionRef con,
161 Payload& pl) {
162 pl.who = Payload::PONG;
163 bufferlist bl;
164 encode(pl, bl);
165 auto rm = crimson::make_message<MPing>();
166 rm->set_data(bl);
167 if (verbose) {
168 logger().info("{} conn= {} reply i= {}",
169 __func__, *con, pl.seq);
170 }
171 return con->send(std::move(rm));
172 }
173
174 seastar::future<> send_message_wrap(crimson::net::ConnectionRef con,
175 const bufferlist& data) {
176 auto m = crimson::make_message<MPing>();
177 Payload pl{Payload::PING, index++, data};
178 bufferlist bl;
179 encode(pl, bl);
180 m->set_data(bl);
181 sent[pl.seq] = pl.data;
182 conn_sent[&*con].push_back(pl.seq);
183 logger().info("{} conn= {} send i= {}",
184 __func__, *con, pl.seq);
185
186 return con->send(std::move(m));
187 }
188
189 uint64_t get_num_pending_msgs() {
190 return sent.size();
191 }
192
193 void clear_pending(crimson::net::ConnectionRef con) {
194 for (std::deque<uint64_t>::iterator it = conn_sent[&*con].begin();
195 it != conn_sent[&*con].end(); ++it)
196 sent.erase(*it);
197 conn_sent.erase(&*con);
198 }
199
200 void print() {
201 for (auto && [connptr, list] : conn_sent) {
202 if (!list.empty()) {
203 logger().info("{} {} wait {}", __func__,
204 (void*)connptr, list.size());
205 }
206 }
207 }
208 };
209
210 class SyntheticWorkload {
211 // messengers must be freed after its connections
212 std::set<crimson::net::MessengerRef> available_servers;
213 std::set<crimson::net::MessengerRef> available_clients;
214
215 crimson::net::SocketPolicy server_policy;
216 crimson::net::SocketPolicy client_policy;
217 std::map<crimson::net::ConnectionRef,
218 std::pair<crimson::net::MessengerRef,
219 crimson::net::MessengerRef>> available_connections;
220 SyntheticDispatcher dispatcher;
221 std::vector<bufferlist> rand_data;
222 crimson::auth::DummyAuthClientServer dummy_auth;
223
224 seastar::future<crimson::net::ConnectionRef> get_random_connection() {
225 return seastar::do_until(
226 [this] { return dispatcher.get_num_pending_msgs() <= max_in_flight; },
227 [] { return seastar::sleep(100ms); }
228 ).then([this] {
229 boost::uniform_int<> choose(0, available_connections.size() - 1);
230 int index = choose(rng);
231 std::map<crimson::net::ConnectionRef,
232 std::pair<crimson::net::MessengerRef, crimson::net::MessengerRef>>::iterator i
233 = available_connections.begin();
234 for (; index > 0; --index, ++i) ;
235 return seastar::make_ready_future<crimson::net::ConnectionRef>(i->first);
236 });
237 }
238
239 public:
240 const unsigned min_connections = 10;
241 const unsigned max_in_flight = 64;
242 const unsigned max_connections = 128;
243 const unsigned max_message_len = 1024 * 1024 * 4;
244 const uint64_t servers, clients;
245
246 SyntheticWorkload(int servers, int clients, int random_num,
247 crimson::net::SocketPolicy srv_policy,
248 crimson::net::SocketPolicy cli_policy)
249 : server_policy(srv_policy),
250 client_policy(cli_policy),
251 dispatcher(false, this),
252 servers(servers),
253 clients(clients) {
254
255 for (int i = 0; i < random_num; i++) {
256 bufferlist bl;
257 boost::uniform_int<> u(32, max_message_len);
258 uint64_t value_len = u(rng);
259 bufferptr bp(value_len);
260 bp.zero();
261 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
262 memcpy(bp.c_str()+j, &i, sizeof(i));
263 j += 4096;
264 }
265
266 bl.append(bp);
267 rand_data.push_back(bl);
268 }
269 }
270
271
272 bool can_create_connection() {
273 return available_connections.size() < max_connections;
274 }
275
276 seastar::future<> maybe_generate_connection() {
277 if (!can_create_connection()) {
278 return seastar::now();
279 }
280 crimson::net::MessengerRef server, client;
281 {
282 boost::uniform_int<> choose(0, available_servers.size() - 1);
283 int index = choose(rng);
284 std::set<crimson::net::MessengerRef>::iterator i
285 = available_servers.begin();
286 for (; index > 0; --index, ++i) ;
287 server = *i;
288 }
289 {
290 boost::uniform_int<> choose(0, available_clients.size() - 1);
291 int index = choose(rng);
292 std::set<crimson::net::MessengerRef>::iterator i
293 = available_clients.begin();
294 for (; index > 0; --index, ++i) ;
295 client = *i;
296 }
297
298
299 std::pair<crimson::net::MessengerRef, crimson::net::MessengerRef>
300 connected_pair;
301 {
302 crimson::net::ConnectionRef conn = client->connect(
303 server->get_myaddr(),
304 entity_name_t::TYPE_OSD);
305 connected_pair = std::make_pair(client, server);
306 available_connections[conn] = connected_pair;
307 }
308 return seastar::now();
309 }
310
311 seastar::future<> random_op (const uint64_t& iter) {
312 return seastar::do_with(iter, [this] (uint64_t& iter) {
313 return seastar::do_until(
314 [&] { return iter == 0; },
315 [&, this]
316 {
317 if (!(iter % 10)) {
318 logger().info("{} Op {} : ", __func__ ,iter);
319 print_internal_state();
320 }
321 --iter;
322 int val = prob(rng);
323 if(val > 90) {
324 return maybe_generate_connection();
325 } else if (val > 80) {
326 return drop_connection();
327 } else if (val > 10) {
328 return send_message();
329 } else {
330 return seastar::sleep(
331 std::chrono::milliseconds(rand() % 1000 + 500));
332 }
333 });
334 });
335 }
336
337 seastar::future<> generate_connections (const uint64_t& iter) {
338 return seastar::do_with(iter, [this] (uint64_t& iter) {
339 return seastar::do_until(
340 [&] { return iter == 0; },
341 [&, this]
342 {
343 --iter;
344 if (!(connections_count() % 10)) {
345 logger().info("seeding connection {}",
346 connections_count());
347 }
348 return maybe_generate_connection();
349 });
350 });
351 }
352
353 seastar::future<> init_server(const entity_name_t& name,
354 const std::string& lname,
355 const uint64_t nonce,
356 const entity_addr_t& addr) {
357 crimson::net::MessengerRef msgr =
358 crimson::net::Messenger::create(
359 name, lname, nonce, true);
360 msgr->set_default_policy(server_policy);
361 msgr->set_auth_client(&dummy_auth);
362 msgr->set_auth_server(&dummy_auth);
363 available_servers.insert(msgr);
364 return msgr->bind(entity_addrvec_t{addr}).safe_then(
365 [this, msgr] {
366 return msgr->start({&dispatcher});
367 }, crimson::net::Messenger::bind_ertr::all_same_way(
368 [addr] (const std::error_code& e) {
369 logger().error("{} test_messenger_thrash(): "
370 "there is another instance running at {}",
371 __func__, addr);
372 ceph_abort();
373 }));
374 }
375
376 seastar::future<> init_client(const entity_name_t& name,
377 const std::string& lname,
378 const uint64_t nonce) {
379 crimson::net::MessengerRef msgr =
380 crimson::net::Messenger::create(
381 name, lname, nonce, true);
382 msgr->set_default_policy(client_policy);
383 msgr->set_auth_client(&dummy_auth);
384 msgr->set_auth_server(&dummy_auth);
385 available_clients.insert(msgr);
386 return msgr->start({&dispatcher});
387 }
388
389 seastar::future<> send_message() {
390 return get_random_connection()
391 .then([this] (crimson::net::ConnectionRef conn) {
392 boost::uniform_int<> true_false(0, 99);
393 int val = true_false(rng);
394 if (val >= 95) {
395 uuid_d uuid;
396 uuid.generate_random();
397 auto m = crimson::make_message<MCommand>(uuid);
398 std::vector<std::string> cmds;
399 cmds.push_back("command");
400 m->cmd = cmds;
401 m->set_priority(200);
402 return conn->send(std::move(m));
403 } else {
404 boost::uniform_int<> u(0, rand_data.size()-1);
405 return dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
406 }
407 });
408 }
409
410 seastar::future<> drop_connection() {
411 if (available_connections.size() < min_connections) {
412 return seastar::now();
413 }
414
415 return get_random_connection()
416 .then([this] (crimson::net::ConnectionRef conn) {
417 dispatcher.clear_pending(conn);
418 conn->mark_down();
419 if (!client_policy.server &&
420 client_policy.standby) {
421 // it's a lossless policy, so we need to mark down each side
422 std::pair<crimson::net::MessengerRef, crimson::net::MessengerRef> &p =
423 available_connections[conn];
424 if (!p.first->get_default_policy().server &&
425 !p.second->get_default_policy().server) {
426 //verify that equal-to operator applies here
427 ceph_assert(p.first->owns_connection(*conn));
428 crimson::net::ConnectionRef peer = p.second->connect(
429 p.first->get_myaddr(), p.first->get_mytype());
430 peer->mark_down();
431 dispatcher.clear_pending(peer);
432 available_connections.erase(peer);
433 }
434 }
435 ceph_assert(available_connections.erase(conn) == 1U);
436 return seastar::now();
437 });
438 }
439
440 void print_internal_state(bool detail=false) {
441 logger().info("available_connections: {} inflight messages: {}",
442 available_connections.size(),
443 dispatcher.get_num_pending_msgs());
444 if (detail && !available_connections.empty()) {
445 dispatcher.print();
446 }
447 }
448
449 seastar::future<> wait_for_done() {
450 int i = 0;
451 return seastar::do_until(
452 [this] { return !dispatcher.get_num_pending_msgs(); },
453 [this, &i]
454 {
455 if (i++ % 50 == 0){
456 print_internal_state(true);
457 }
458 return seastar::sleep(100ms);
459 }).then([this] {
460 return seastar::do_for_each(available_servers, [] (auto server) {
461 if (verbose) {
462 logger().info("server {} shutdown" , server->get_myaddrs());
463 }
464 server->stop();
465 return server->shutdown();
466 });
467 }).then([this] {
468 return seastar::do_for_each(available_clients, [] (auto client) {
469 if (verbose) {
470 logger().info("client {} shutdown" , client->get_myaddrs());
471 }
472 client->stop();
473 return client->shutdown();
474 });
475 });
476 }
477
478 void handle_reset(crimson::net::ConnectionRef con) {
479 available_connections.erase(con);
480 }
481
482 uint64_t servers_count() {
483 return available_servers.size();
484 }
485
486 uint64_t clients_count() {
487 return available_clients.size();
488 }
489
490 uint64_t connections_count() {
491 return available_connections.size();
492 }
493 };
494
495 void SyntheticDispatcher::ms_handle_reset(crimson::net::ConnectionRef con,
496 bool is_replace) {
497 workload->handle_reset(con);
498 clear_pending(con);
499 }
500
501 seastar::future<> reset_conf() {
502 return seastar::when_all_succeed(
503 local_conf().set_val("ms_inject_socket_failures", "0"),
504 local_conf().set_val("ms_inject_internal_delays", "0"),
505 local_conf().set_val("ms_inject_delay_probability", "0"),
506 local_conf().set_val("ms_inject_delay_max", "0")
507 ).then_unpack([] {
508 return seastar::now();
509 });
510 }
511
512 // Testing Crimson messenger (with msgr-v2 protocol) robustness against
513 // network delays and failures. The test includes stress tests and
514 // socket level delays/failures injection tests, letting time
515 // and randomness achieve the best test coverage.
516
517 // Test Parameters:
518 // Clients: 8 (stateful)
519 // Servers: 32 (lossless)
520 // Connections: 100 (Generated between random clients/server)
521 // Random Operations: 120 (Generate/Drop Connection, Send Message, Sleep)
522 seastar::future<> test_stress(thrash_params_t tp)
523 {
524
525 logger().info("test_stress():");
526
527 SyntheticWorkload test_msg(tp.servers, tp.clients, 100,
528 crimson::net::SocketPolicy::stateful_server(0),
529 crimson::net::SocketPolicy::lossless_client(0));
530
531 return seastar::do_with(test_msg, [tp]
532 (SyntheticWorkload& test_msg) {
533 return seastar::do_until([&test_msg] {
534 return test_msg.servers_count() == test_msg.servers; },
535 [&test_msg] {
536 entity_addr_t bind_addr = get_server_addr();
537 bind_addr.set_type(entity_addr_t::TYPE_MSGR2);
538 uint64_t server_num = get_nonce();
539 return test_msg.init_server(entity_name_t::OSD(server_num),
540 "server", server_num , bind_addr);
541 }).then([&test_msg] {
542 return seastar::do_until([&test_msg] {
543 return test_msg.clients_count() == test_msg.clients; },
544 [&test_msg] {
545 return test_msg.init_client(entity_name_t::CLIENT(-1),
546 "client", get_nonce());
547 });
548 }).then([&test_msg, tp] {
549 return test_msg.generate_connections(tp.connections);
550 }).then([&test_msg, tp] {
551 return test_msg.random_op(tp.random_op);
552 }).then([&test_msg] {
553 return test_msg.wait_for_done();
554 }).then([] {
555 logger().info("test_stress() DONE");
556 }).handle_exception([] (auto eptr) {
557 logger().error(
558 "test_stress() failed: got exception {}",
559 eptr);
560 throw;
561 });
562 });
563 }
564
565 // Test Parameters:
566 // Clients: 8 (statefull)
567 // Servers: 32 (loseless)
568 // Connections: 100 (Generated between random clients/server)
569 // Random Operations: 120 (Generate/Drop Connection, Send Message, Sleep)
570 seastar::future<> test_injection(thrash_params_t tp)
571 {
572
573 logger().info("test_injection():");
574
575 SyntheticWorkload test_msg(tp.servers, tp.clients, 100,
576 crimson::net::SocketPolicy::stateful_server(0),
577 crimson::net::SocketPolicy::lossless_client(0));
578
579 return seastar::do_with(test_msg, [tp]
580 (SyntheticWorkload& test_msg) {
581 return seastar::do_until([&test_msg] {
582 return test_msg.servers_count() == test_msg.servers; },
583 [&test_msg] {
584 entity_addr_t bind_addr = get_server_addr();
585 bind_addr.set_type(entity_addr_t::TYPE_MSGR2);
586 uint64_t server_num = get_nonce();
587 return test_msg.init_server(entity_name_t::OSD(server_num),
588 "server", server_num , bind_addr);
589 }).then([&test_msg] {
590 return seastar::do_until([&test_msg] {
591 return test_msg.clients_count() == test_msg.clients; },
592 [&test_msg] {
593 return test_msg.init_client(entity_name_t::CLIENT(-1),
594 "client", get_nonce());
595 });
596 }).then([] {
597 return seastar::when_all_succeed(
598 local_conf().set_val("ms_inject_socket_failures", "30"),
599 local_conf().set_val("ms_inject_internal_delays", "0.1"),
600 local_conf().set_val("ms_inject_delay_probability", "1"),
601 local_conf().set_val("ms_inject_delay_max", "5"));
602 }).then_unpack([] {
603 return seastar::now();
604 }).then([&test_msg, tp] {
605 return test_msg.generate_connections(tp.connections);
606 }).then([&test_msg, tp] {
607 return test_msg.random_op(tp.random_op);
608 }).then([&test_msg] {
609 return test_msg.wait_for_done();
610 }).then([] {
611 logger().info("test_inejction() DONE");
612 return seastar::now();
613 }).then([] {
614 return reset_conf();
615 }).handle_exception([] (auto eptr) {
616 logger().error(
617 "test_injection() failed: got exception {}",
618 eptr);
619 throw;
620 });
621 });
622 }
623
624 }
625
626 seastar::future<int> do_test(seastar::app_template& app)
627 {
628 std::vector<const char*> args;
629 std::string cluster;
630 std::string conf_file_list;
631 auto init_params = ceph_argparse_early_args(args,
632 CEPH_ENTITY_TYPE_CLIENT,
633 &cluster,
634 &conf_file_list);
635 return crimson::common::sharded_conf().start(
636 init_params.name, cluster
637 ).then([] {
638 return local_conf().start();
639 }).then([conf_file_list] {
640 return local_conf().parse_config_files(conf_file_list);
641 }).then([&app] {
642 auto&& config = app.configuration();
643 verbose = config["verbose"].as<bool>();
644 return test_stress(thrash_params_t{8, 32, 50, 120})
645 .then([] {
646 return test_injection(thrash_params_t{16, 32, 50, 120});
647 }).then([] {
648 logger().info("All tests succeeded");
649 // Seastar has bugs to have events undispatched during shutdown,
650 // which will result in memory leak and thus fail LeakSanitizer.
651 return seastar::sleep(100ms);
652 });
653 }).then([] {
654 return crimson::common::sharded_conf().stop();
655 }).then([] {
656 return 0;
657 }).handle_exception([] (auto eptr) {
658 logger().error("Test failed: got exception {}", eptr);
659 return 1;
660 });
661 }
662
663 int main(int argc, char** argv)
664 {
665 seastar::app_template app;
666 app.add_options()
667 ("verbose,v", bpo::value<bool>()->default_value(false),
668 "chatty if true");
669 return app.run(argc, argv, [&app] {
670 return do_test(app);
671 });
672 }