1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/program_options.hpp>
8 #include <seastar/core/app-template.hh>
9 #include <seastar/core/do_with.hh>
10 #include <seastar/core/future-util.hh>
11 #include <seastar/core/reactor.hh>
12 #include <seastar/core/sleep.hh>
13 #include <seastar/core/semaphore.hh>
14 #include <seastar/core/smp.hh>
16 #include "common/ceph_time.h"
17 #include "messages/MOSDOp.h"
19 #include "crimson/auth/DummyAuth.h"
20 #include "crimson/common/log.h"
21 #include "crimson/common/config_proxy.h"
22 #include "crimson/net/Connection.h"
23 #include "crimson/net/Dispatcher.h"
24 #include "crimson/net/Messenger.h"
27 using namespace std::chrono_literals
;
29 namespace bpo
= boost::program_options
;
33 template<typename Message
>
34 using Ref
= boost::intrusive_ptr
<Message
>;
36 seastar::logger
& logger() {
37 return crimson::get_logger(ceph_subsys_ms
);
40 template <typename T
, typename
... Args
>
41 seastar::future
<T
*> create_sharded(Args
... args
) {
42 // seems we should only construct/stop shards on #0
43 return seastar::smp::submit_to(0, [=] {
44 auto sharded_obj
= seastar::make_lw_shared
<seastar::sharded
<T
>>();
45 return sharded_obj
->start(args
...).then([sharded_obj
]() {
46 seastar::engine().at_exit([sharded_obj
]() {
47 return sharded_obj
->stop().then([sharded_obj
] {});
49 return sharded_obj
.get();
51 }).then([] (seastar::sharded
<T
> *ptr_shard
) {
52 // return the pointer valid for the caller CPU
53 return &ptr_shard
->local();
57 enum class perf_mode_t
{
63 struct client_config
{
64 entity_addr_t server_addr
;
71 std::string
str() const {
72 std::ostringstream out
;
73 out
<< "client[>> " << server_addr
74 << "](bs=" << block_size
75 << ", ramptime=" << ramptime
76 << ", msgtime=" << msgtime
78 << ", depth=" << depth
83 static client_config
load(bpo::variables_map
& options
) {
86 ceph_assert(addr
.parse(options
["addr"].as
<std::string
>().c_str(), nullptr));
87 ceph_assert_always(addr
.is_msgr2());
89 conf
.server_addr
= addr
;
90 conf
.block_size
= options
["cbs"].as
<unsigned>();
91 conf
.ramptime
= options
["ramptime"].as
<unsigned>();
92 conf
.msgtime
= options
["msgtime"].as
<unsigned>();
93 conf
.jobs
= options
["jobs"].as
<unsigned>();
94 conf
.depth
= options
["depth"].as
<unsigned>();
95 ceph_assert(conf
.depth
% conf
.jobs
== 0);
100 struct server_config
{
105 std::string
str() const {
106 std::ostringstream out
;
107 out
<< "server[" << addr
108 << "](bs=" << block_size
114 static server_config
load(bpo::variables_map
& options
) {
117 ceph_assert(addr
.parse(options
["addr"].as
<std::string
>().c_str(), nullptr));
118 ceph_assert_always(addr
.is_msgr2());
121 conf
.block_size
= options
["sbs"].as
<unsigned>();
122 conf
.core
= options
["core"].as
<unsigned>();
127 const unsigned SAMPLE_RATE
= 7;
129 static seastar::future
<> run(
131 const client_config
& client_conf
,
132 const server_config
& server_conf
,
137 using ServerFRef
= seastar::foreign_ptr
<std::unique_ptr
<Server
>>;
140 : public crimson::net::Dispatcher
{
141 crimson::net::MessengerRef msgr
;
142 crimson::auth::DummyAuthClientServer dummy_auth
;
143 const seastar::shard_id msgr_sid
;
148 Server(unsigned msg_len
)
149 : msgr_sid
{seastar::this_shard_id()},
152 lname
+= std::to_string(msgr_sid
);
153 msg_data
.append_zero(msg_len
);
156 std::optional
<seastar::future
<>> ms_dispatch(
157 crimson::net::ConnectionRef c
, MessageRef m
) override
{
158 ceph_assert(m
->get_type() == CEPH_MSG_OSD_OP
);
160 // server replies with MOSDOp to generate server-side write workload
161 const static pg_t pgid
;
162 const static object_locator_t oloc
;
163 const static hobject_t
hobj(object_t(), oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
164 pgid
.pool(), oloc
.nspace
);
165 static spg_t
spgid(pgid
);
166 auto rep
= crimson::make_message
<MOSDOp
>(0, 0, hobj
, spgid
, 0, 0, 0);
167 bufferlist
data(msg_data
);
168 rep
->write(0, msg_len
, data
);
169 rep
->set_tid(m
->get_tid());
170 std::ignore
= c
->send(std::move(rep
));
171 return {seastar::now()};
174 seastar::future
<> init(const entity_addr_t
& addr
) {
175 return seastar::smp::submit_to(msgr_sid
, [addr
, this] {
176 // server msgr is always with nonce 0
177 msgr
= crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid
), lname
, 0);
178 msgr
->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
179 msgr
->set_auth_client(&dummy_auth
);
180 msgr
->set_auth_server(&dummy_auth
);
181 return msgr
->bind(entity_addrvec_t
{addr
}).safe_then([this] {
182 return msgr
->start({this});
183 }, crimson::net::Messenger::bind_ertr::all_same_way(
184 [addr
] (const std::error_code
& e
) {
185 logger().error("Server: "
186 "there is another instance running at {}", addr
);
191 seastar::future
<> shutdown() {
192 logger().info("{} shutdown...", lname
);
193 return seastar::smp::submit_to(msgr_sid
, [this] {
196 return msgr
->shutdown();
199 seastar::future
<> wait() {
200 return seastar::smp::submit_to(msgr_sid
, [this] {
206 static seastar::future
<ServerFRef
> create(seastar::shard_id msgr_sid
, unsigned msg_len
) {
207 return seastar::smp::submit_to(msgr_sid
, [msg_len
] {
208 return seastar::make_foreign(std::make_unique
<Server
>(msg_len
));
214 : public crimson::net::Dispatcher
,
215 public seastar::peering_sharded_service
<Client
> {
218 mono_time connecting_time
= mono_clock::zero();
219 mono_time connected_time
= mono_clock::zero();
220 unsigned received_count
= 0u;
222 mono_time start_time
= mono_clock::zero();
223 unsigned start_count
= 0u;
225 unsigned sampled_count
= 0u;
226 double total_lat_s
= 0.0;
228 // for reporting only
229 mono_time finish_time
= mono_clock::zero();
232 start_time
= mono_clock::now();
233 start_count
= received_count
;
236 finish_time
= mono_clock::zero();
239 ConnStats conn_stats
;
242 mono_time start_time
= mono_clock::zero();
243 unsigned start_count
= 0u;
244 unsigned sampled_count
= 0u;
245 double total_lat_s
= 0.0;
247 // for reporting only
248 mono_time finish_time
= mono_clock::zero();
249 unsigned finish_count
= 0u;
252 void reset(unsigned received_count
, PeriodStats
* snap
= nullptr) {
254 snap
->start_time
= start_time
;
255 snap
->start_count
= start_count
;
256 snap
->sampled_count
= sampled_count
;
257 snap
->total_lat_s
= total_lat_s
;
258 snap
->finish_time
= mono_clock::now();
259 snap
->finish_count
= received_count
;
261 start_time
= mono_clock::now();
262 start_count
= received_count
;
267 PeriodStats period_stats
;
269 const seastar::shard_id sid
;
273 crimson::net::MessengerRef msgr
;
274 const unsigned msg_len
;
276 const unsigned nr_depth
;
277 seastar::semaphore depth
;
278 std::vector
<mono_time
> time_msgs_sent
;
279 crimson::auth::DummyAuthClientServer dummy_auth
;
281 unsigned sent_count
= 0u;
282 crimson::net::ConnectionRef active_conn
= nullptr;
284 bool stop_send
= false;
285 seastar::promise
<> stopped_send_promise
;
287 Client(unsigned jobs
, unsigned msg_len
, unsigned depth
)
288 : sid
{seastar::this_shard_id()},
291 nr_depth
{depth
/jobs
},
293 time_msgs_sent
{depth
/jobs
, mono_clock::zero()} {
295 lname
+= std::to_string(sid
);
296 msg_data
.append_zero(msg_len
);
299 unsigned get_current_depth() const {
300 ceph_assert(depth
.available_units() >= 0);
301 return nr_depth
- depth
.current();
304 void ms_handle_connect(crimson::net::ConnectionRef conn
) override
{
305 conn_stats
.connected_time
= mono_clock::now();
307 std::optional
<seastar::future
<>> ms_dispatch(
308 crimson::net::ConnectionRef
, MessageRef m
) override
{
309 // server replies with MOSDOp to generate server-side write workload
310 ceph_assert(m
->get_type() == CEPH_MSG_OSD_OP
);
312 auto msg_id
= m
->get_tid();
313 if (msg_id
% SAMPLE_RATE
== 0) {
314 auto index
= msg_id
% time_msgs_sent
.size();
315 ceph_assert(time_msgs_sent
[index
] != mono_clock::zero());
316 std::chrono::duration
<double> cur_latency
= mono_clock::now() - time_msgs_sent
[index
];
317 conn_stats
.total_lat_s
+= cur_latency
.count();
318 ++(conn_stats
.sampled_count
);
319 period_stats
.total_lat_s
+= cur_latency
.count();
320 ++(period_stats
.sampled_count
);
321 time_msgs_sent
[index
] = mono_clock::zero();
324 ++(conn_stats
.received_count
);
327 return {seastar::now()};
330 // should start messenger at this shard?
332 ceph_assert(seastar::this_shard_id() == sid
);
333 return sid
!= 0 && sid
<= jobs
;
336 seastar::future
<> init() {
337 return container().invoke_on_all([] (auto& client
) {
338 if (client
.is_active()) {
339 client
.msgr
= crimson::net::Messenger::create(entity_name_t::OSD(client
.sid
), client
.lname
, client
.sid
);
340 client
.msgr
->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
341 client
.msgr
->set_auth_client(&client
.dummy_auth
);
342 client
.msgr
->set_auth_server(&client
.dummy_auth
);
343 return client
.msgr
->start({&client
});
345 return seastar::now();
349 seastar::future
<> shutdown() {
350 return container().invoke_on_all([] (auto& client
) {
351 if (client
.is_active()) {
352 logger().info("{} shutdown...", client
.lname
);
353 ceph_assert(client
.msgr
);
355 return client
.msgr
->shutdown().then([&client
] {
356 return client
.stop_dispatch_messages();
359 return seastar::now();
363 seastar::future
<> connect_wait_verify(const entity_addr_t
& peer_addr
) {
364 return container().invoke_on_all([peer_addr
] (auto& client
) {
365 // start clients in active cores (#1 ~ #jobs)
366 if (client
.is_active()) {
367 mono_time start_time
= mono_clock::now();
368 client
.active_conn
= client
.msgr
->connect(peer_addr
, entity_name_t::TYPE_OSD
);
369 // make sure handshake won't hurt the performance
370 return seastar::sleep(1s
).then([&client
, start_time
] {
371 if (client
.conn_stats
.connected_time
== mono_clock::zero()) {
372 logger().error("\n{} not connected after 1s!\n", client
.lname
);
375 client
.conn_stats
.connecting_time
= start_time
;
378 return seastar::now();
386 const unsigned msgtime
;
387 const unsigned bytes_of_block
;
389 unsigned elapsed
= 0u;
390 std::vector
<mono_time
> start_times
;
391 std::vector
<PeriodStats
> snaps
;
392 std::vector
<ConnStats
> summaries
;
395 TimerReport(unsigned jobs
, unsigned msgtime
, unsigned bs
)
399 start_times
{jobs
, mono_clock::zero()},
403 unsigned get_elapsed() const { return elapsed
; }
405 PeriodStats
& get_snap_by_job(seastar::shard_id sid
) {
406 ceph_assert(sid
>= 1 && sid
<= jobs
);
407 return snaps
[sid
- 1];
410 ConnStats
& get_summary_by_job(seastar::shard_id sid
) {
411 ceph_assert(sid
>= 1 && sid
<= jobs
);
412 return summaries
[sid
- 1];
415 bool should_stop() const {
416 return elapsed
>= msgtime
;
419 seastar::future
<> ticktock() {
420 return seastar::sleep(1s
).then([this] {
425 void report_header() {
426 std::ostringstream sout
;
427 sout
<< std::setfill(' ')
428 << std::setw(7) << "sec"
429 << std::setw(6) << "depth"
430 << std::setw(8) << "IOPS"
431 << std::setw(8) << "MB/s"
432 << std::setw(8) << "lat(ms)";
433 std::cout
<< sout
.str() << std::endl
;
436 void report_period() {
438 // init this->start_times at the first period
439 for (unsigned i
=0; i
<jobs
; ++i
) {
440 start_times
[i
] = snaps
[i
].start_time
;
443 std::chrono::duration
<double> elapsed_d
= 0s
;
446 unsigned sampled_count
= 0u;
447 double total_lat_s
= 0.0;
448 for (const auto& snap
: snaps
) {
449 elapsed_d
+= (snap
.finish_time
- snap
.start_time
);
451 ops
+= (snap
.finish_count
- snap
.start_count
);
452 sampled_count
+= snap
.sampled_count
;
453 total_lat_s
+= snap
.total_lat_s
;
455 double elapsed_s
= elapsed_d
.count() / jobs
;
456 double iops
= ops
/elapsed_s
;
457 std::ostringstream sout
;
459 << std::setw(7) << elapsed_s
460 << std::setw(6) << depth
461 << std::setw(8) << iops
462 << std::setw(8) << iops
* bytes_of_block
/ 1048576
463 << std::setw(8) << (total_lat_s
/ sampled_count
* 1000);
464 std::cout
<< sout
.str() << std::endl
;
467 void report_summary() const {
468 std::chrono::duration
<double> elapsed_d
= 0s
;
470 unsigned sampled_count
= 0u;
471 double total_lat_s
= 0.0;
472 for (const auto& summary
: summaries
) {
473 elapsed_d
+= (summary
.finish_time
- summary
.start_time
);
474 ops
+= (summary
.received_count
- summary
.start_count
);
475 sampled_count
+= summary
.sampled_count
;
476 total_lat_s
+= summary
.total_lat_s
;
478 double elapsed_s
= elapsed_d
.count() / jobs
;
479 double iops
= ops
/ elapsed_s
;
480 std::ostringstream sout
;
481 sout
<< "--------------"
483 << "--------------\n"
485 << std::setw(7) << elapsed_s
486 << std::setw(6) << "-"
487 << std::setw(8) << iops
488 << std::setw(8) << iops
* bytes_of_block
/ 1048576
489 << std::setw(8) << (total_lat_s
/ sampled_count
* 1000)
491 std::cout
<< sout
.str() << std::endl
;
495 seastar::future
<> report_period(TimerReport
& report
) {
496 return container().invoke_on_all([&report
] (auto& client
) {
497 if (client
.is_active()) {
498 PeriodStats
& snap
= report
.get_snap_by_job(client
.sid
);
499 client
.period_stats
.reset(client
.conn_stats
.received_count
,
501 snap
.depth
= client
.get_current_depth();
504 report
.report_period();
508 seastar::future
<> report_summary(TimerReport
& report
) {
509 return container().invoke_on_all([&report
] (auto& client
) {
510 if (client
.is_active()) {
511 ConnStats
& summary
= report
.get_summary_by_job(client
.sid
);
512 summary
= client
.conn_stats
;
513 summary
.finish_time
= mono_clock::now();
516 report
.report_summary();
521 seastar::future
<> dispatch_with_timer(unsigned ramptime
, unsigned msgtime
) {
522 logger().info("[all clients]: start sending MOSDOps from {} clients", jobs
);
523 return container().invoke_on_all([] (auto& client
) {
524 if (client
.is_active()) {
525 client
.do_dispatch_messages(client
.active_conn
.get());
528 logger().info("[all clients]: ramping up {} seconds...", ramptime
);
529 return seastar::sleep(std::chrono::seconds(ramptime
));
531 return container().invoke_on_all([] (auto& client
) {
532 if (client
.is_active()) {
533 client
.conn_stats
.start();
534 client
.period_stats
.reset(client
.conn_stats
.received_count
);
537 }).then([this, msgtime
] {
538 logger().info("[all clients]: reporting {} seconds...\n", msgtime
);
539 return seastar::do_with(
540 TimerReport(jobs
, msgtime
, msg_len
), [this] (auto& report
) {
541 report
.report_header();
542 return seastar::do_until(
543 [&report
] { return report
.should_stop(); },
545 return report
.ticktock().then([&report
, this] {
546 // report period every 1s
547 return report_period(report
);
548 }).then([&report
, this] {
549 // report summary every 10s
550 if (report
.get_elapsed() % 10 == 0) {
551 return report_summary(report
);
553 return seastar::now();
557 ).then([&report
, this] {
558 // report the final summary
559 if (report
.get_elapsed() % 10 != 0) {
560 return report_summary(report
);
562 return seastar::now();
570 seastar::future
<> send_msg(crimson::net::Connection
* conn
) {
571 ceph_assert(seastar::this_shard_id() == sid
);
572 return depth
.wait(1).then([this, conn
] {
573 const static pg_t pgid
;
574 const static object_locator_t oloc
;
575 const static hobject_t
hobj(object_t(), oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
576 pgid
.pool(), oloc
.nspace
);
577 static spg_t
spgid(pgid
);
578 auto m
= crimson::make_message
<MOSDOp
>(0, 0, hobj
, spgid
, 0, 0, 0);
579 bufferlist
data(msg_data
);
580 m
->write(0, msg_len
, data
);
581 // use tid as the identity of each round
582 m
->set_tid(sent_count
);
584 // sample message latency
585 if (sent_count
% SAMPLE_RATE
== 0) {
586 auto index
= sent_count
% time_msgs_sent
.size();
587 ceph_assert(time_msgs_sent
[index
] == mono_clock::zero());
588 time_msgs_sent
[index
] = mono_clock::now();
591 return conn
->send(std::move(m
));
595 class DepthBroken
: public std::exception
{};
597 seastar::future
<> stop_dispatch_messages() {
599 depth
.broken(DepthBroken());
600 return stopped_send_promise
.get_future();
603 void do_dispatch_messages(crimson::net::Connection
* conn
) {
604 ceph_assert(seastar::this_shard_id() == sid
);
605 ceph_assert(sent_count
== 0);
606 conn_stats
.start_time
= mono_clock::now();
607 // forwarded to stopped_send_promise
608 (void) seastar::do_until(
609 [this] { return stop_send
; },
612 return send_msg(conn
);
614 ).handle_exception_type([] (const DepthBroken
& e
) {
615 // ok, stopped by stop_dispatch_messages()
616 }).then([this, conn
] {
617 std::chrono::duration
<double> dur_conn
= conn_stats
.connected_time
- conn_stats
.connecting_time
;
618 std::chrono::duration
<double> dur_msg
= mono_clock::now() - conn_stats
.start_time
;
619 unsigned ops
= conn_stats
.received_count
- conn_stats
.start_count
;
620 logger().info("{}: stopped sending OSDOPs.\n"
622 " connect time: {}s\n"
623 " messages received: {}\n"
624 " messaging time: {}s\n"
627 " throughput: {}MB/s\n",
634 conn_stats
.total_lat_s
/ conn_stats
.sampled_count
* 1000,
635 ops
/ dur_msg
.count(),
636 ops
/ dur_msg
.count() * msg_len
/ 1048576);
637 stopped_send_promise
.set_value();
643 return seastar::when_all(
644 test_state::Server::create(server_conf
.core
, server_conf
.block_size
),
645 create_sharded
<test_state::Client
>(client_conf
.jobs
, client_conf
.block_size
, client_conf
.depth
),
646 crimson::common::sharded_conf().start(EntityName
{}, std::string_view
{"ceph"}).then([] {
647 return crimson::common::local_conf().start();
648 }).then([crc_enabled
] {
649 return crimson::common::local_conf().set_val(
650 "ms_crc_data", crc_enabled
? "true" : "false");
652 ).then([=](auto&& ret
) {
653 auto fp_server
= std::move(std::get
<0>(ret
).get0());
654 auto client
= std::move(std::get
<1>(ret
).get0());
655 test_state::Server
* server
= fp_server
.get();
656 if (mode
== perf_mode_t::both
) {
657 logger().info("\nperf settings:\n {}\n {}\n",
658 client_conf
.str(), server_conf
.str());
659 ceph_assert(seastar::smp::count
>= 1+client_conf
.jobs
);
660 ceph_assert(client_conf
.jobs
> 0);
661 ceph_assert(seastar::smp::count
>= 1+server_conf
.core
);
662 ceph_assert(server_conf
.core
== 0 || server_conf
.core
> client_conf
.jobs
);
663 return seastar::when_all_succeed(
664 server
->init(server_conf
.addr
),
666 ).then_unpack([client
, addr
= client_conf
.server_addr
] {
667 return client
->connect_wait_verify(addr
);
668 }).then([client
, ramptime
= client_conf
.ramptime
,
669 msgtime
= client_conf
.msgtime
] {
670 return client
->dispatch_with_timer(ramptime
, msgtime
);
672 return client
->shutdown();
673 }).then([server
, fp_server
= std::move(fp_server
)] () mutable {
674 return server
->shutdown().then([cleanup
= std::move(fp_server
)] {});
676 } else if (mode
== perf_mode_t::client
) {
677 logger().info("\nperf settings:\n {}\n", client_conf
.str());
678 ceph_assert(seastar::smp::count
>= 1+client_conf
.jobs
);
679 ceph_assert(client_conf
.jobs
> 0);
681 ).then([client
, addr
= client_conf
.server_addr
] {
682 return client
->connect_wait_verify(addr
);
683 }).then([client
, ramptime
= client_conf
.ramptime
,
684 msgtime
= client_conf
.msgtime
] {
685 return client
->dispatch_with_timer(ramptime
, msgtime
);
687 return client
->shutdown();
689 } else { // mode == perf_mode_t::server
690 ceph_assert(seastar::smp::count
>= 1+server_conf
.core
);
691 logger().info("\nperf settings:\n {}\n", server_conf
.str());
692 return server
->init(server_conf
.addr
695 return server
->wait();
697 }).then([server
, fp_server
= std::move(fp_server
)] () mutable {
698 return server
->shutdown().then([cleanup
= std::move(fp_server
)] {});
702 return crimson::common::sharded_conf().stop();
708 int main(int argc
, char** argv
)
710 seastar::app_template app
;
712 ("mode", bpo::value
<unsigned>()->default_value(0),
713 "0: both, 1:client, 2:server")
714 ("addr", bpo::value
<std::string
>()->default_value("v2:127.0.0.1:9010"),
715 "server address(only support msgr v2 protocol)")
716 ("ramptime", bpo::value
<unsigned>()->default_value(5),
717 "seconds of client ramp-up time")
718 ("msgtime", bpo::value
<unsigned>()->default_value(15),
719 "seconds of client messaging time")
720 ("jobs", bpo::value
<unsigned>()->default_value(1),
721 "number of client jobs (messengers)")
722 ("cbs", bpo::value
<unsigned>()->default_value(4096),
724 ("depth", bpo::value
<unsigned>()->default_value(512),
726 ("core", bpo::value
<unsigned>()->default_value(0),
727 "server running core")
728 ("sbs", bpo::value
<unsigned>()->default_value(0),
730 ("crc-enabled", bpo::value
<bool>()->default_value(false),
731 "enable CRC checks");
732 return app
.run(argc
, argv
, [&app
] {
733 auto&& config
= app
.configuration();
734 auto mode
= config
["mode"].as
<unsigned>();
735 ceph_assert(mode
<= 2);
736 auto _mode
= static_cast<perf_mode_t
>(mode
);
737 bool crc_enabled
= config
["crc-enabled"].as
<bool>();
738 auto server_conf
= server_config::load(config
);
739 auto client_conf
= client_config::load(config
);
740 return run(_mode
, client_conf
, server_conf
, crc_enabled
742 logger().info("\nsuccessful!\n");
743 }).handle_exception([] (auto eptr
) {
744 logger().info("\nfailed!\n");
745 return seastar::make_exception_future
<>(eptr
);