1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include <boost/program_options.hpp>
6 #include <boost/iterator/counting_iterator.hpp>
8 #include <seastar/core/app-template.hh>
9 #include <seastar/core/do_with.hh>
10 #include <seastar/core/future-util.hh>
11 #include <seastar/core/lowres_clock.hh>
12 #include <seastar/core/reactor.hh>
13 #include <seastar/core/sleep.hh>
14 #include <seastar/core/semaphore.hh>
15 #include <seastar/core/smp.hh>
16 #include <seastar/core/thread.hh>
18 #include "common/ceph_time.h"
19 #include "messages/MOSDOp.h"
20 #include "include/random.h"
22 #include "crimson/auth/DummyAuth.h"
23 #include "crimson/common/log.h"
24 #include "crimson/common/config_proxy.h"
25 #include "crimson/net/Connection.h"
26 #include "crimson/net/Dispatcher.h"
27 #include "crimson/net/Messenger.h"
28 #include "crimson/osd/stop_signal.h"
31 using namespace std::chrono_literals
;
33 using lowres_clock_t
= seastar::lowres_system_clock
;
35 namespace bpo
= boost::program_options
;
39 template<typename Message
>
40 using Ref
= boost::intrusive_ptr
<Message
>;
42 seastar::logger
& logger() {
43 return crimson::get_logger(ceph_subsys_ms
);
46 template <typename T
, typename
... Args
>
47 seastar::future
<T
*> create_sharded(Args
... args
) {
48 // seems we should only construct/stop shards on #0
49 return seastar::smp::submit_to(0, [=] {
50 auto sharded_obj
= seastar::make_lw_shared
<seastar::sharded
<T
>>();
51 return sharded_obj
->start(args
...).then([sharded_obj
]() {
52 seastar::engine().at_exit([sharded_obj
]() {
53 return sharded_obj
->stop().then([sharded_obj
] {});
55 return sharded_obj
.get();
57 }).then([] (seastar::sharded
<T
> *ptr_shard
) {
58 // return the pointer valid for the caller CPU
59 return &ptr_shard
->local();
63 double get_reactor_utilization() {
64 auto &value_map
= seastar::metrics::impl::get_value_map();
65 auto found
= value_map
.find("reactor_utilization");
66 assert(found
!= value_map
.end());
67 auto &[full_name
, metric_family
] = *found
;
68 std::ignore
= full_name
;
69 assert(metric_family
.size() == 1);
70 const auto& [labels
, metric
] = *metric_family
.begin();
72 auto value
= (*metric
)();
76 enum class perf_mode_t
{
82 struct client_config
{
83 entity_addr_t server_addr
;
92 std::string
str() const {
93 std::ostringstream out
;
94 out
<< "client[>> " << server_addr
95 << "](bs=" << block_size
96 << ", ramptime=" << ramptime
97 << ", msgtime=" << msgtime
98 << ", num_clients=" << num_clients
99 << ", num_conns=" << num_conns
100 << ", depth=" << depth
101 << ", skip_core_0=" << skip_core_0
106 static client_config
load(bpo::variables_map
& options
) {
109 ceph_assert(addr
.parse(options
["server-addr"].as
<std::string
>().c_str(), nullptr));
110 ceph_assert_always(addr
.is_msgr2());
112 conf
.server_addr
= addr
;
113 conf
.block_size
= options
["client-bs"].as
<unsigned>();
114 conf
.ramptime
= options
["ramptime"].as
<unsigned>();
115 conf
.msgtime
= options
["msgtime"].as
<unsigned>();
116 conf
.num_clients
= options
["clients"].as
<unsigned>();
117 ceph_assert_always(conf
.num_clients
> 0);
118 conf
.num_conns
= options
["conns-per-client"].as
<unsigned>();
119 ceph_assert_always(conf
.num_conns
> 0);
120 conf
.depth
= options
["depth"].as
<unsigned>();
121 conf
.skip_core_0
= options
["client-skip-core-0"].as
<bool>();
126 struct server_config
{
132 std::string
str() const {
133 std::ostringstream out
;
134 out
<< "server[" << addr
135 << "](bs=" << block_size
136 << ", is_fixed_cpu=" << is_fixed_cpu
142 static server_config
load(bpo::variables_map
& options
) {
145 ceph_assert(addr
.parse(options
["server-addr"].as
<std::string
>().c_str(), nullptr));
146 ceph_assert_always(addr
.is_msgr2());
149 conf
.block_size
= options
["server-bs"].as
<unsigned>();
150 conf
.is_fixed_cpu
= options
["server-fixed-cpu"].as
<bool>();
151 conf
.core
= options
["server-core"].as
<unsigned>();
156 const unsigned SAMPLE_RATE
= 256;
158 static seastar::future
<> run(
160 const client_config
& client_conf
,
161 const server_config
& server_conf
,
166 : public crimson::net::Dispatcher
,
167 public seastar::peering_sharded_service
<Server
> {
168 // available only in msgr_sid
169 crimson::net::MessengerRef msgr
;
170 crimson::auth::DummyAuthClientServer dummy_auth
;
171 const seastar::shard_id msgr_sid
;
174 bool is_fixed_cpu
= true;
175 bool is_stopped
= false;
176 std::optional
<seastar::future
<>> fut_report
;
178 unsigned conn_count
= 0;
179 unsigned msg_count
= 0;
182 // available in all shards
186 Server(seastar::shard_id msgr_sid
, unsigned msg_len
, bool needs_report
)
187 : msgr_sid
{msgr_sid
},
189 lname
= fmt::format("server@{}", msgr_sid
);
190 msg_data
.append_zero(msg_len
);
192 if (seastar::this_shard_id() == msgr_sid
&&
198 void ms_handle_connect(
199 crimson::net::ConnectionRef
,
200 seastar::shard_id
) override
{
201 ceph_abort("impossible, server won't connect");
204 void ms_handle_accept(
205 crimson::net::ConnectionRef
,
206 seastar::shard_id new_shard
,
207 bool is_replace
) override
{
208 ceph_assert_always(new_shard
== seastar::this_shard_id());
209 auto &server
= container().local();
213 void ms_handle_reset(
214 crimson::net::ConnectionRef
,
216 auto &server
= container().local();
220 std::optional
<seastar::future
<>> ms_dispatch(
221 crimson::net::ConnectionRef c
, MessageRef m
) override
{
222 assert(c
->get_shard_id() == seastar::this_shard_id());
223 ceph_assert(m
->get_type() == CEPH_MSG_OSD_OP
);
225 auto &server
= container().local();
227 // server replies with MOSDOp to generate server-side write workload
228 const static pg_t pgid
;
229 const static object_locator_t oloc
;
230 const static hobject_t
hobj(object_t(), oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
231 pgid
.pool(), oloc
.nspace
);
232 static spg_t
spgid(pgid
);
233 auto rep
= crimson::make_message
<MOSDOp
>(0, 0, hobj
, spgid
, 0, 0, 0);
234 bufferlist
data(server
.msg_data
);
235 rep
->write(0, server
.msg_len
, data
);
236 rep
->set_tid(m
->get_tid());
238 std::ignore
= c
->send(std::move(rep
));
240 if (server
.msg_count
% 16 == 0) {
241 server
.last_msg
= std::move(m
);
243 return {seastar::now()};
246 seastar::future
<> init(const entity_addr_t
& addr
, bool is_fixed_cpu
) {
247 return container().invoke_on(
248 msgr_sid
, [addr
, is_fixed_cpu
](auto &server
) {
249 // server msgr is always with nonce 0
250 server
.msgr
= crimson::net::Messenger::create(
251 entity_name_t::OSD(server
.msgr_sid
),
252 server
.lname
, 0, is_fixed_cpu
);
253 server
.msgr
->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
254 server
.msgr
->set_auth_client(&server
.dummy_auth
);
255 server
.msgr
->set_auth_server(&server
.dummy_auth
);
256 server
.is_fixed_cpu
= is_fixed_cpu
;
257 return server
.msgr
->bind(entity_addrvec_t
{addr
}
258 ).safe_then([&server
] {
259 return server
.msgr
->start({&server
});
260 }, crimson::net::Messenger::bind_ertr::all_same_way(
261 [addr
] (const std::error_code
& e
) {
262 logger().error("Server: "
263 "there is another instance running at {}", addr
);
269 seastar::future
<> shutdown() {
270 logger().info("{} shutdown...", lname
);
271 return container().invoke_on(
272 msgr_sid
, [](auto &server
) {
273 server
.is_stopped
= true;
274 ceph_assert(server
.msgr
);
276 return server
.msgr
->shutdown(
278 if (server
.fut_report
.has_value()) {
279 return std::move(server
.fut_report
.value());
281 return seastar::now();
289 unsigned msg_count
= 0;
291 // per-interval metrics
292 double reactor_utilization
;
293 unsigned conn_count
= 0;
295 unsigned msg_count_interval
= 0;
298 // should not be called frequently to impact performance
299 void get_report(ShardReport
& last
) {
300 unsigned last_msg_count
= last
.msg_count
;
303 auto msg
= boost::static_pointer_cast
<MOSDOp
>(last_msg
);
304 msg
->finish_decode();
305 ceph_assert_always(msg
->ops
.size() == 1);
306 msg_size
= msg
->ops
[0].op
.extent
.length
;
310 last
.msg_count
= msg_count
;
311 last
.reactor_utilization
= get_reactor_utilization();
312 last
.conn_count
= conn_count
;
313 last
.msg_size
= msg_size
;
314 last
.msg_count_interval
= msg_count
- last_msg_count
;
318 unsigned elapsed
= 0u;
319 mono_time start_time
= mono_clock::zero();
320 std::vector
<ShardReport
> reports
;
322 TimerReport(unsigned shards
) : reports(shards
) {}
325 void start_report() {
326 seastar::promise
<> pr_report
;
327 fut_report
= pr_report
.get_future();
329 TimerReport(seastar::smp::count
),
330 [this](auto &report
) {
331 return seastar::do_until(
332 [this] { return is_stopped
; },
334 return seastar::sleep(2s
335 ).then([&report
, this] {
338 return seastar::smp::submit_to(msgr_sid
,
340 auto &server
= container().local();
341 server
.get_report(report
.reports
[seastar::this_shard_id()]);
342 }).then([&report
, this] {
343 auto now
= mono_clock::now();
344 auto prv
= report
.start_time
;
345 report
.start_time
= now
;
346 if (prv
== mono_clock::zero()) {
347 // cannot compute duration
350 std::chrono::duration
<double> duration_d
= now
- prv
;
351 double duration
= duration_d
.count();
352 auto &ireport
= report
.reports
[msgr_sid
];
353 double iops
= ireport
.msg_count_interval
/ duration
;
354 double throughput_MB
= -1;
355 if (ireport
.msg_size
>= 0) {
356 throughput_MB
= iops
* ireport
.msg_size
/ 1048576;
358 std::ostringstream sout
;
361 << "(" << std::setw(5) << duration
<< ") "
362 << std::setw(9) << iops
<< "IOPS "
363 << std::setw(8) << throughput_MB
<< "MiB/s "
364 << ireport
.reactor_utilization
365 << "(" << ireport
.conn_count
<< ")";
366 std::cout
<< sout
.str() << std::endl
;
369 return seastar::smp::invoke_on_all([&report
, this] {
370 auto &server
= container().local();
371 server
.get_report(report
.reports
[seastar::this_shard_id()]);
372 }).then([&report
, this] {
373 auto now
= mono_clock::now();
374 auto prv
= report
.start_time
;
375 report
.start_time
= now
;
376 if (prv
== mono_clock::zero()) {
377 // cannot compute duration
380 std::chrono::duration
<double> duration_d
= now
- prv
;
381 double duration
= duration_d
.count();
382 unsigned num_msgs
= 0;
383 // -1 means unavailable, -2 means mismatch
385 for (auto &i
: report
.reports
) {
386 if (i
.msg_size
>= 0) {
387 if (msg_size
== -2) {
389 } else if (msg_size
== -1) {
390 msg_size
= i
.msg_size
;
392 if (msg_size
!= i
.msg_size
) {
397 num_msgs
+= i
.msg_count_interval
;
399 double iops
= num_msgs
/ duration
;
400 double throughput_MB
= msg_size
;
402 throughput_MB
= iops
* msg_size
/ 1048576;
404 std::ostringstream sout
;
407 << "(" << std::setw(5) << duration
<< ") "
408 << std::setw(9) << iops
<< "IOPS "
409 << std::setw(8) << throughput_MB
<< "MiB/s ";
410 for (auto &i
: report
.reports
) {
411 sout
<< i
.reactor_utilization
412 << "(" << i
.conn_count
<< ") ";
414 std::cout
<< sout
.str() << std::endl
;
421 logger().info("report is stopped!");
422 }).forward_to(std::move(pr_report
));
427 : public crimson::net::Dispatcher
,
428 public seastar::peering_sharded_service
<Client
> {
431 mono_time connecting_time
= mono_clock::zero();
432 mono_time connected_time
= mono_clock::zero();
433 unsigned received_count
= 0u;
435 mono_time start_time
= mono_clock::zero();
436 unsigned start_count
= 0u;
438 unsigned sampled_count
= 0u;
439 double sampled_total_lat_s
= 0.0;
441 // for reporting only
442 mono_time finish_time
= mono_clock::zero();
444 void start_connecting() {
445 connecting_time
= mono_clock::now();
448 void finish_connecting() {
449 ceph_assert_always(connected_time
== mono_clock::zero());
450 connected_time
= mono_clock::now();
453 void start_collect() {
454 ceph_assert_always(connected_time
!= mono_clock::zero());
455 start_time
= mono_clock::now();
456 start_count
= received_count
;
458 sampled_total_lat_s
= 0.0;
459 finish_time
= mono_clock::zero();
462 void prepare_summary(const ConnStats
¤t
) {
464 finish_time
= mono_clock::now();
469 mono_time start_time
= mono_clock::zero();
470 unsigned start_count
= 0u;
471 unsigned sampled_count
= 0u;
472 double sampled_total_lat_s
= 0.0;
474 // for reporting only
475 mono_time finish_time
= mono_clock::zero();
476 unsigned finish_count
= 0u;
479 void start_collect(unsigned received_count
) {
480 start_time
= mono_clock::now();
481 start_count
= received_count
;
483 sampled_total_lat_s
= 0.0;
487 unsigned received_count
, unsigned _depth
, PeriodStats
&snapshot
) {
488 snapshot
.start_time
= start_time
;
489 snapshot
.start_count
= start_count
;
490 snapshot
.sampled_count
= sampled_count
;
491 snapshot
.sampled_total_lat_s
= sampled_total_lat_s
;
492 snapshot
.finish_time
= mono_clock::now();
493 snapshot
.finish_count
= received_count
;
494 snapshot
.depth
= _depth
;
496 start_collect(received_count
);
503 double connect_time_s
= 0;
504 unsigned total_msgs
= 0;
505 double messaging_time_s
= 0;
506 double latency_ms
= 0;
508 double throughput_mbps
= 0;
510 void account(const JobReport
&stats
) {
511 depth
+= stats
.depth
;
512 connect_time_s
+= stats
.connect_time_s
;
513 total_msgs
+= stats
.total_msgs
;
514 messaging_time_s
+= stats
.messaging_time_s
;
515 latency_ms
+= stats
.latency_ms
;
517 throughput_mbps
+= stats
.throughput_mbps
;
520 void report() const {
521 auto str
= fmt::format(
523 " connect time: {:08f}s\n"
524 " messages received: {}\n"
525 " messaging time: {:08f}s\n"
526 " latency: {:08f}ms\n"
528 " out throughput: {:08f}MB/s",
529 name
, depth
, connect_time_s
,
530 total_msgs
, messaging_time_s
,
533 std::cout
<< str
<< std::endl
;
537 struct ConnectionPriv
: public crimson::net::Connection::user_private_t
{
539 ConnectionPriv(unsigned i
) : index
{i
} {}
543 crimson::net::MessengerRef msgr
;
544 ConnStats conn_stats
;
545 PeriodStats period_stats
;
546 seastar::semaphore depth
;
547 std::vector
<lowres_clock_t::time_point
> time_msgs_sent
;
548 unsigned sent_count
= 0u;
549 crimson::net::ConnectionRef active_conn
;
550 bool stop_send
= false;
551 seastar::promise
<JobReport
> stopped_send_promise
;
553 ConnState(std::size_t _depth
)
555 time_msgs_sent
{_depth
, lowres_clock_t::time_point::min()} {}
557 unsigned get_current_units() const {
558 ceph_assert(depth
.available_units() >= 0);
559 return depth
.current();
562 seastar::future
<JobReport
> stop_dispatch_messages() {
564 depth
.broken(DepthBroken());
565 return stopped_send_promise
.get_future();
569 const seastar::shard_id sid
;
571 const std::optional
<unsigned> server_sid
;
573 const unsigned num_clients
;
574 const unsigned num_conns
;
575 const unsigned msg_len
;
577 const unsigned nr_depth
;
578 const unsigned nonce_base
;
579 crimson::auth::DummyAuthClientServer dummy_auth
;
581 std::vector
<ConnState
> conn_states
;
583 Client(unsigned num_clients
,
588 std::optional
<unsigned> server_sid
)
589 : sid
{seastar::this_shard_id()},
590 id
{sid
+ num_clients
- seastar::smp::count
},
591 server_sid
{server_sid
},
592 num_clients
{num_clients
},
593 num_conns
{num_conns
},
596 nonce_base
{nonce_base
} {
598 for (unsigned i
= 0; i
< num_conns
; ++i
) {
599 conn_states
.emplace_back(nr_depth
);
602 msg_data
.append_zero(msg_len
);
605 std::string
get_name(unsigned i
) {
606 return fmt::format("client{}Conn{}@{}", id
, i
, sid
);
609 void ms_handle_connect(
610 crimson::net::ConnectionRef conn
,
611 seastar::shard_id prv_shard
) override
{
612 ceph_assert_always(prv_shard
== seastar::this_shard_id());
614 unsigned index
= static_cast<ConnectionPriv
&>(conn
->get_user_private()).index
;
615 auto &conn_state
= conn_states
[index
];
616 conn_state
.conn_stats
.finish_connecting();
619 std::optional
<seastar::future
<>> ms_dispatch(
620 crimson::net::ConnectionRef conn
, MessageRef m
) override
{
622 // server replies with MOSDOp to generate server-side write workload
623 ceph_assert(m
->get_type() == CEPH_MSG_OSD_OP
);
625 unsigned index
= static_cast<ConnectionPriv
&>(conn
->get_user_private()).index
;
626 assert(index
< num_conns
);
627 auto &conn_state
= conn_states
[index
];
629 auto msg_id
= m
->get_tid();
630 if (msg_id
% SAMPLE_RATE
== 0) {
631 auto msg_index
= msg_id
% conn_state
.time_msgs_sent
.size();
632 ceph_assert(conn_state
.time_msgs_sent
[msg_index
] !=
633 lowres_clock_t::time_point::min());
634 std::chrono::duration
<double> cur_latency
=
635 lowres_clock_t::now() - conn_state
.time_msgs_sent
[msg_index
];
636 conn_state
.conn_stats
.sampled_total_lat_s
+= cur_latency
.count();
637 ++(conn_state
.conn_stats
.sampled_count
);
638 conn_state
.period_stats
.sampled_total_lat_s
+= cur_latency
.count();
639 ++(conn_state
.period_stats
.sampled_count
);
640 conn_state
.time_msgs_sent
[msg_index
] = lowres_clock_t::time_point::min();
643 ++(conn_state
.conn_stats
.received_count
);
644 conn_state
.depth
.signal(1);
646 return {seastar::now()};
649 // should start messenger at this shard?
651 ceph_assert(seastar::this_shard_id() == sid
);
652 return sid
+ num_clients
>= seastar::smp::count
;
655 seastar::future
<> init() {
656 return container().invoke_on_all([](auto& client
) {
657 if (client
.is_active()) {
658 return seastar::do_for_each(
659 boost::make_counting_iterator(0u),
660 boost::make_counting_iterator(client
.num_conns
),
662 auto &conn_state
= client
.conn_states
[i
];
663 std::string name
= client
.get_name(i
);
664 conn_state
.msgr
= crimson::net::Messenger::create(
665 entity_name_t::OSD(client
.id
* client
.num_conns
+ i
),
666 name
, client
.nonce_base
+ client
.id
* client
.num_conns
+ i
, true);
667 conn_state
.msgr
->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
668 conn_state
.msgr
->set_auth_client(&client
.dummy_auth
);
669 conn_state
.msgr
->set_auth_server(&client
.dummy_auth
);
670 return conn_state
.msgr
->start({&client
});
673 return seastar::now();
677 seastar::future
<> shutdown() {
678 return seastar::do_with(
679 std::vector
<JobReport
>(num_clients
* num_conns
),
680 [this](auto &all_stats
) {
681 return container().invoke_on_all([&all_stats
](auto& client
) {
682 if (!client
.is_active()) {
683 return seastar::now();
686 return seastar::parallel_for_each(
687 boost::make_counting_iterator(0u),
688 boost::make_counting_iterator(client
.num_conns
),
689 [&all_stats
, &client
](auto i
) {
690 logger().info("{} shutdown...", client
.get_name(i
));
691 auto &conn_state
= client
.conn_states
[i
];
692 return conn_state
.stop_dispatch_messages(
693 ).then([&all_stats
, &client
, i
](auto stats
) {
694 all_stats
[client
.id
* client
.num_conns
+ i
] = stats
;
697 return seastar::do_for_each(
698 boost::make_counting_iterator(0u),
699 boost::make_counting_iterator(client
.num_conns
),
701 auto &conn_state
= client
.conn_states
[i
];
702 ceph_assert(conn_state
.msgr
);
703 conn_state
.msgr
->stop();
704 return conn_state
.msgr
->shutdown();
707 }).then([&all_stats
, this] {
708 auto nr_jobs
= all_stats
.size();
710 std::vector
<JobReport
> clients(num_clients
);
712 for (unsigned i
= 0; i
< nr_jobs
; ++i
) {
713 auto &stats
= all_stats
[i
];
715 clients
[i
/ num_conns
].account(stats
);
716 summary
.account(stats
);
719 std::cout
<< std::endl
;
720 std::cout
<< "per client:" << std::endl
;
721 for (unsigned i
= 0; i
< num_clients
; ++i
) {
722 auto &stats
= clients
[i
];
723 stats
.name
= fmt::format("client{}", i
);
724 stats
.connect_time_s
/= num_conns
;
725 stats
.messaging_time_s
/= num_conns
;
726 stats
.latency_ms
/= num_conns
;
730 std::cout
<< std::endl
;
731 summary
.name
= fmt::format("all", nr_jobs
);
732 summary
.connect_time_s
/= nr_jobs
;
733 summary
.messaging_time_s
/= nr_jobs
;
734 summary
.latency_ms
/= nr_jobs
;
740 seastar::future
<> connect_wait_verify(const entity_addr_t
& peer_addr
) {
741 return container().invoke_on_all([peer_addr
](auto& client
) {
742 // start clients in active cores
743 if (client
.is_active()) {
744 for (unsigned i
= 0; i
< client
.num_conns
; ++i
) {
745 auto &conn_state
= client
.conn_states
[i
];
746 conn_state
.conn_stats
.start_connecting();
747 conn_state
.active_conn
= conn_state
.msgr
->connect(peer_addr
, entity_name_t::TYPE_OSD
);
748 conn_state
.active_conn
->set_user_private(
749 std::make_unique
<ConnectionPriv
>(i
));
751 // make sure handshake won't hurt the performance
752 return seastar::sleep(1s
).then([&client
] {
753 for (unsigned i
= 0; i
< client
.num_conns
; ++i
) {
754 auto &conn_state
= client
.conn_states
[i
];
755 if (conn_state
.conn_stats
.connected_time
== mono_clock::zero()) {
756 logger().error("\n{} not connected after 1s!\n",
763 return seastar::now();
770 const unsigned num_clients
;
771 const unsigned num_conns
;
772 const unsigned msgtime
;
773 const unsigned bytes_of_block
;
775 unsigned elapsed
= 0u;
776 std::vector
<PeriodStats
> snaps
;
777 std::vector
<ConnStats
> summaries
;
778 std::vector
<double> client_reactor_utilizations
;
779 std::optional
<double> server_reactor_utilization
;
782 TimerReport(unsigned num_clients
, unsigned num_conns
, unsigned msgtime
, unsigned bs
)
783 : num_clients
{num_clients
},
784 num_conns
{num_conns
},
787 snaps
{num_clients
* num_conns
},
788 summaries
{num_clients
* num_conns
},
789 client_reactor_utilizations(num_clients
) {}
791 unsigned get_elapsed() const { return elapsed
; }
793 PeriodStats
& get_snap(unsigned client_id
, unsigned i
) {
794 return snaps
[client_id
* num_conns
+ i
];
797 ConnStats
& get_summary(unsigned client_id
, unsigned i
) {
798 return summaries
[client_id
* num_conns
+ i
];
801 void set_client_reactor_utilization(unsigned client_id
, double ru
) {
802 client_reactor_utilizations
[client_id
] = ru
;
805 void set_server_reactor_utilization(double ru
) {
806 server_reactor_utilization
= ru
;
809 bool should_stop() const {
810 return elapsed
>= msgtime
;
813 seastar::future
<> ticktock() {
814 return seastar::sleep(1s
).then([this] {
819 void report_header() const {
820 std::ostringstream sout
;
821 sout
<< std::setfill(' ')
822 << std::setw(6) << "sec"
823 << std::setw(7) << "depth"
824 << std::setw(10) << "IOPS"
825 << std::setw(9) << "MB/s"
826 << std::setw(9) << "lat(ms)";
827 std::cout
<< sout
.str() << std::endl
;
830 void report_period() {
831 std::chrono::duration
<double> elapsed_d
= 0s
;
834 unsigned sampled_count
= 0u;
835 double sampled_total_lat_s
= 0.0;
836 for (const auto& snap
: snaps
) {
837 elapsed_d
+= (snap
.finish_time
- snap
.start_time
);
839 ops
+= (snap
.finish_count
- snap
.start_count
);
840 sampled_count
+= snap
.sampled_count
;
841 sampled_total_lat_s
+= snap
.sampled_total_lat_s
;
843 double elapsed_s
= elapsed_d
.count() / (num_clients
* num_conns
);
844 double iops
= ops
/elapsed_s
;
845 std::ostringstream sout
;
847 << std::setw(5) << elapsed_s
849 << std::setw(6) << depth
851 << std::setw(9) << iops
853 << std::setw(8) << iops
* bytes_of_block
/ 1048576
855 << std::setw(8) << (sampled_total_lat_s
/ sampled_count
* 1000)
857 if (server_reactor_utilization
.has_value()) {
858 sout
<< *server_reactor_utilization
<< " -- ";
860 for (double cru
: client_reactor_utilizations
) {
863 std::cout
<< sout
.str() << std::endl
;
866 void report_summary() const {
867 std::chrono::duration
<double> elapsed_d
= 0s
;
869 unsigned sampled_count
= 0u;
870 double sampled_total_lat_s
= 0.0;
871 for (const auto& summary
: summaries
) {
872 elapsed_d
+= (summary
.finish_time
- summary
.start_time
);
873 ops
+= (summary
.received_count
- summary
.start_count
);
874 sampled_count
+= summary
.sampled_count
;
875 sampled_total_lat_s
+= summary
.sampled_total_lat_s
;
877 double elapsed_s
= elapsed_d
.count() / (num_clients
* num_conns
);
878 double iops
= ops
/ elapsed_s
;
879 std::ostringstream sout
;
880 sout
<< "--------------"
882 << "--------------\n"
884 << std::setw(7) << elapsed_s
885 << std::setw(6) << "-"
886 << std::setw(8) << iops
887 << std::setw(8) << iops
* bytes_of_block
/ 1048576
888 << std::setw(8) << (sampled_total_lat_s
/ sampled_count
* 1000)
890 std::cout
<< sout
.str() << std::endl
;
894 seastar::future
<> report_period(TimerReport
& report
) {
895 return container().invoke_on_all([&report
] (auto& client
) {
896 if (client
.is_active()) {
897 for (unsigned i
= 0; i
< client
.num_conns
; ++i
) {
898 auto &conn_state
= client
.conn_states
[i
];
899 PeriodStats
& snap
= report
.get_snap(client
.id
, i
);
900 conn_state
.period_stats
.reset_period(
901 conn_state
.conn_stats
.received_count
,
902 client
.nr_depth
- conn_state
.get_current_units(),
905 report
.set_client_reactor_utilization(client
.id
, get_reactor_utilization());
907 if (client
.server_sid
.has_value() &&
908 seastar::this_shard_id() == *client
.server_sid
) {
909 assert(!client
.is_active());
910 report
.set_server_reactor_utilization(get_reactor_utilization());
913 report
.report_period();
917 seastar::future
<> report_summary(TimerReport
& report
) {
918 return container().invoke_on_all([&report
] (auto& client
) {
919 if (client
.is_active()) {
920 for (unsigned i
= 0; i
< client
.num_conns
; ++i
) {
921 auto &conn_state
= client
.conn_states
[i
];
922 ConnStats
& summary
= report
.get_summary(client
.id
, i
);
923 summary
.prepare_summary(conn_state
.conn_stats
);
927 report
.report_summary();
932 seastar::future
<> dispatch_with_timer(unsigned ramptime
, unsigned msgtime
) {
933 logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns",
934 num_clients
, num_conns
);
935 return container().invoke_on_all([] (auto& client
) {
936 if (client
.is_active()) {
937 for (unsigned i
= 0; i
< client
.num_conns
; ++i
) {
938 client
.do_dispatch_messages(i
);
942 logger().info("[all clients]: ramping up {} seconds...", ramptime
);
943 return seastar::sleep(std::chrono::seconds(ramptime
));
945 return container().invoke_on_all([] (auto& client
) {
946 if (client
.is_active()) {
947 for (unsigned i
= 0; i
< client
.num_conns
; ++i
) {
948 auto &conn_state
= client
.conn_states
[i
];
949 conn_state
.conn_stats
.start_collect();
950 conn_state
.period_stats
.start_collect(conn_state
.conn_stats
.received_count
);
954 }).then([this, msgtime
] {
955 logger().info("[all clients]: reporting {} seconds...\n", msgtime
);
956 return seastar::do_with(
957 TimerReport(num_clients
, num_conns
, msgtime
, msg_len
),
958 [this](auto& report
) {
959 report
.report_header();
960 return seastar::do_until(
961 [&report
] { return report
.should_stop(); },
963 return report
.ticktock().then([&report
, this] {
964 // report period every 1s
965 return report_period(report
);
966 }).then([&report
, this] {
967 // report summary every 10s
968 if (report
.get_elapsed() % 10 == 0) {
969 return report_summary(report
);
971 return seastar::now();
975 ).then([&report
, this] {
976 // report the final summary
977 if (report
.get_elapsed() % 10 != 0) {
978 return report_summary(report
);
980 return seastar::now();
988 seastar::future
<> send_msg(ConnState
&conn_state
) {
989 ceph_assert(seastar::this_shard_id() == sid
);
990 conn_state
.sent_count
+= 1;
991 return conn_state
.depth
.wait(1
992 ).then([this, &conn_state
] {
993 const static pg_t pgid
;
994 const static object_locator_t oloc
;
995 const static hobject_t
hobj(object_t(), oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
996 pgid
.pool(), oloc
.nspace
);
997 static spg_t
spgid(pgid
);
998 auto m
= crimson::make_message
<MOSDOp
>(0, 0, hobj
, spgid
, 0, 0, 0);
999 bufferlist
data(msg_data
);
1000 m
->write(0, msg_len
, data
);
1001 // use tid as the identity of each round
1002 m
->set_tid(conn_state
.sent_count
);
1004 // sample message latency
1005 if (unlikely(conn_state
.sent_count
% SAMPLE_RATE
== 0)) {
1006 auto index
= conn_state
.sent_count
% conn_state
.time_msgs_sent
.size();
1007 ceph_assert(conn_state
.time_msgs_sent
[index
] ==
1008 lowres_clock_t::time_point::min());
1009 conn_state
.time_msgs_sent
[index
] = lowres_clock_t::now();
1012 return conn_state
.active_conn
->send(std::move(m
));
1016 class DepthBroken
: public std::exception
{};
1018 seastar::future
<JobReport
> stop_dispatch_messages(unsigned i
) {
1019 auto &conn_state
= conn_states
[i
];
1020 conn_state
.stop_send
= true;
1021 conn_state
.depth
.broken(DepthBroken());
1022 return conn_state
.stopped_send_promise
.get_future();
1025 void do_dispatch_messages(unsigned i
) {
1026 ceph_assert(seastar::this_shard_id() == sid
);
1027 auto &conn_state
= conn_states
[i
];
1028 ceph_assert(conn_state
.sent_count
== 0);
1029 conn_state
.conn_stats
.start_time
= mono_clock::now();
1030 // forwarded to stopped_send_promise
1031 (void) seastar::do_until(
1032 [&conn_state
] { return conn_state
.stop_send
; },
1033 [this, &conn_state
] { return send_msg(conn_state
); }
1034 ).handle_exception_type([] (const DepthBroken
& e
) {
1035 // ok, stopped by stop_dispatch_messages()
1036 }).then([this, &conn_state
, i
] {
1037 std::string name
= get_name(i
);
1038 logger().info("{} {}: stopped sending OSDOPs",
1039 name
, *conn_state
.active_conn
);
1041 std::chrono::duration
<double> dur_conn
=
1042 conn_state
.conn_stats
.connected_time
-
1043 conn_state
.conn_stats
.connecting_time
;
1044 std::chrono::duration
<double> dur_msg
=
1045 mono_clock::now() - conn_state
.conn_stats
.start_time
;
1047 conn_state
.conn_stats
.received_count
-
1048 conn_state
.conn_stats
.start_count
;
1052 stats
.depth
= nr_depth
;
1053 stats
.connect_time_s
= dur_conn
.count();
1054 stats
.total_msgs
= ops
;
1055 stats
.messaging_time_s
= dur_msg
.count();
1057 conn_state
.conn_stats
.sampled_total_lat_s
/
1058 conn_state
.conn_stats
.sampled_count
* 1000;
1059 stats
.iops
= ops
/ dur_msg
.count();
1060 stats
.throughput_mbps
= ops
/ dur_msg
.count() * msg_len
/ 1048576;
1062 conn_state
.stopped_send_promise
.set_value(stats
);
1068 std::optional
<unsigned> server_sid
;
1069 bool server_needs_report
= false;
1070 if (mode
== perf_mode_t::both
) {
1071 ceph_assert(server_conf
.is_fixed_cpu
== true);
1072 server_sid
= server_conf
.core
;
1073 } else if (mode
== perf_mode_t::server
) {
1074 server_needs_report
= true;
1076 return seastar::when_all(
1077 seastar::futurize_invoke([mode
, server_conf
, server_needs_report
] {
1078 if (mode
== perf_mode_t::client
) {
1079 return seastar::make_ready_future
<test_state::Server
*>(nullptr);
1081 return create_sharded
<test_state::Server
>(
1083 server_conf
.block_size
,
1084 server_needs_report
);
1087 seastar::futurize_invoke([mode
, client_conf
, server_sid
] {
1088 if (mode
== perf_mode_t::server
) {
1089 return seastar::make_ready_future
<test_state::Client
*>(nullptr);
1091 unsigned nonce_base
= ceph::util::generate_random_number
<unsigned>();
1092 logger().info("client nonce_base={}", nonce_base
);
1093 return create_sharded
<test_state::Client
>(
1094 client_conf
.num_clients
,
1095 client_conf
.num_conns
,
1096 client_conf
.block_size
,
1102 crimson::common::sharded_conf().start(
1103 EntityName
{}, std::string_view
{"ceph"}
1105 return crimson::common::local_conf().start();
1106 }).then([crc_enabled
] {
1107 return crimson::common::local_conf().set_val(
1108 "ms_crc_data", crc_enabled
? "true" : "false");
1110 ).then([=](auto&& ret
) {
1111 auto server
= std::move(std::get
<0>(ret
).get0());
1112 auto client
= std::move(std::get
<1>(ret
).get0());
1113 // reserve core 0 for potentially better performance
1114 if (mode
== perf_mode_t::both
) {
1115 logger().info("\nperf settings:\n smp={}\n {}\n {}\n",
1116 seastar::smp::count
, client_conf
.str(), server_conf
.str());
1117 if (client_conf
.skip_core_0
) {
1118 ceph_assert(seastar::smp::count
> client_conf
.num_clients
);
1120 ceph_assert(seastar::smp::count
>= client_conf
.num_clients
);
1122 ceph_assert(client_conf
.num_clients
> 0);
1123 ceph_assert(seastar::smp::count
> server_conf
.core
+ client_conf
.num_clients
);
1124 return seastar::when_all_succeed(
1125 // it is not reasonable to allow server/client to shared cores for
1126 // performance benchmarking purposes.
1127 server
->init(server_conf
.addr
, server_conf
.is_fixed_cpu
),
1129 ).then_unpack([client
, addr
= client_conf
.server_addr
] {
1130 return client
->connect_wait_verify(addr
);
1131 }).then([client
, ramptime
= client_conf
.ramptime
,
1132 msgtime
= client_conf
.msgtime
] {
1133 return client
->dispatch_with_timer(ramptime
, msgtime
);
1135 return client
->shutdown();
1137 return server
->shutdown();
1139 } else if (mode
== perf_mode_t::client
) {
1140 logger().info("\nperf settings:\n smp={}\n {}\n",
1141 seastar::smp::count
, client_conf
.str());
1142 if (client_conf
.skip_core_0
) {
1143 ceph_assert(seastar::smp::count
> client_conf
.num_clients
);
1145 ceph_assert(seastar::smp::count
>= client_conf
.num_clients
);
1147 ceph_assert(client_conf
.num_clients
> 0);
1148 return client
->init(
1149 ).then([client
, addr
= client_conf
.server_addr
] {
1150 return client
->connect_wait_verify(addr
);
1151 }).then([client
, ramptime
= client_conf
.ramptime
,
1152 msgtime
= client_conf
.msgtime
] {
1153 return client
->dispatch_with_timer(ramptime
, msgtime
);
1155 return client
->shutdown();
1157 } else { // mode == perf_mode_t::server
1158 ceph_assert(seastar::smp::count
> server_conf
.core
);
1159 logger().info("\nperf settings:\n smp={}\n {}\n",
1160 seastar::smp::count
, server_conf
.str());
1161 return seastar::async([server
, server_conf
] {
1162 // FIXME: SIGINT is not received by stop_signal
1163 seastar_apps_lib::stop_signal should_stop
;
1164 server
->init(server_conf
.addr
, server_conf
.is_fixed_cpu
).get();
1165 should_stop
.wait().get();
1166 server
->shutdown().get();
1170 return crimson::common::sharded_conf().stop();
1176 int main(int argc
, char** argv
)
1178 seastar::app_template app
;
1180 ("mode", bpo::value
<unsigned>()->default_value(0),
1181 "0: both, 1:client, 2:server")
1182 ("server-addr", bpo::value
<std::string
>()->default_value("v2:127.0.0.1:9010"),
1183 "server address(only support msgr v2 protocol)")
1184 ("ramptime", bpo::value
<unsigned>()->default_value(5),
1185 "seconds of client ramp-up time")
1186 ("msgtime", bpo::value
<unsigned>()->default_value(15),
1187 "seconds of client messaging time")
1188 ("clients", bpo::value
<unsigned>()->default_value(1),
1189 "number of client messengers")
1190 ("conns-per-client", bpo::value
<unsigned>()->default_value(1),
1191 "number of connections per client")
1192 ("client-bs", bpo::value
<unsigned>()->default_value(4096),
1193 "client block size")
1194 ("depth", bpo::value
<unsigned>()->default_value(512),
1195 "client io depth per job")
1196 ("client-skip-core-0", bpo::value
<bool>()->default_value(true),
1197 "client skip core 0")
1198 ("server-fixed-cpu", bpo::value
<bool>()->default_value(true),
1199 "server is in the fixed cpu mode, non-fixed doesn't support the mode both")
1200 ("server-core", bpo::value
<unsigned>()->default_value(1),
1201 "server messenger running core")
1202 ("server-bs", bpo::value
<unsigned>()->default_value(0),
1203 "server block size")
1204 ("crc-enabled", bpo::value
<bool>()->default_value(false),
1205 "enable CRC checks");
1206 return app
.run(argc
, argv
, [&app
] {
1207 auto&& config
= app
.configuration();
1208 auto mode
= config
["mode"].as
<unsigned>();
1209 ceph_assert(mode
<= 2);
1210 auto _mode
= static_cast<perf_mode_t
>(mode
);
1211 bool crc_enabled
= config
["crc-enabled"].as
<bool>();
1212 auto server_conf
= server_config::load(config
);
1213 auto client_conf
= client_config::load(config
);
1214 return run(_mode
, client_conf
, server_conf
, crc_enabled
1216 logger().info("\nsuccessful!\n");
1217 }).handle_exception([] (auto eptr
) {
1218 logger().info("\nfailed!\n");
1219 return seastar::make_exception_future
<>(eptr
);