]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/tools/perf_crimson_msgr.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / tools / perf_crimson_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <map>
5 #include <boost/program_options.hpp>
6 #include <boost/iterator/counting_iterator.hpp>
7
8 #include <seastar/core/app-template.hh>
9 #include <seastar/core/do_with.hh>
10 #include <seastar/core/future-util.hh>
11 #include <seastar/core/lowres_clock.hh>
12 #include <seastar/core/reactor.hh>
13 #include <seastar/core/sleep.hh>
14 #include <seastar/core/semaphore.hh>
15 #include <seastar/core/smp.hh>
16 #include <seastar/core/thread.hh>
17
18 #include "common/ceph_time.h"
19 #include "messages/MOSDOp.h"
20 #include "include/random.h"
21
22 #include "crimson/auth/DummyAuth.h"
23 #include "crimson/common/log.h"
24 #include "crimson/common/config_proxy.h"
25 #include "crimson/net/Connection.h"
26 #include "crimson/net/Dispatcher.h"
27 #include "crimson/net/Messenger.h"
28 #include "crimson/osd/stop_signal.h"
29
30 using namespace std;
31 using namespace std::chrono_literals;
32
33 using lowres_clock_t = seastar::lowres_system_clock;
34
35 namespace bpo = boost::program_options;
36
37 namespace {
38
39 template<typename Message>
40 using Ref = boost::intrusive_ptr<Message>;
41
42 seastar::logger& logger() {
43 return crimson::get_logger(ceph_subsys_ms);
44 }
45
46 template <typename T, typename... Args>
47 seastar::future<T*> create_sharded(Args... args) {
48 // seems we should only construct/stop shards on #0
49 return seastar::smp::submit_to(0, [=] {
50 auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
51 return sharded_obj->start(args...).then([sharded_obj]() {
52 seastar::engine().at_exit([sharded_obj]() {
53 return sharded_obj->stop().then([sharded_obj] {});
54 });
55 return sharded_obj.get();
56 });
57 }).then([] (seastar::sharded<T> *ptr_shard) {
58 // return the pointer valid for the caller CPU
59 return &ptr_shard->local();
60 });
61 }
62
63 double get_reactor_utilization() {
64 auto &value_map = seastar::metrics::impl::get_value_map();
65 auto found = value_map.find("reactor_utilization");
66 assert(found != value_map.end());
67 auto &[full_name, metric_family] = *found;
68 std::ignore = full_name;
69 assert(metric_family.size() == 1);
70 const auto& [labels, metric] = *metric_family.begin();
71 std::ignore = labels;
72 auto value = (*metric)();
73 return value.ui();
74 }
75
76 enum class perf_mode_t {
77 both,
78 client,
79 server
80 };
81
82 struct client_config {
83 entity_addr_t server_addr;
84 unsigned block_size;
85 unsigned ramptime;
86 unsigned msgtime;
87 unsigned num_clients;
88 unsigned num_conns;
89 unsigned depth;
90 bool skip_core_0;
91
92 std::string str() const {
93 std::ostringstream out;
94 out << "client[>> " << server_addr
95 << "](bs=" << block_size
96 << ", ramptime=" << ramptime
97 << ", msgtime=" << msgtime
98 << ", num_clients=" << num_clients
99 << ", num_conns=" << num_conns
100 << ", depth=" << depth
101 << ", skip_core_0=" << skip_core_0
102 << ")";
103 return out.str();
104 }
105
106 static client_config load(bpo::variables_map& options) {
107 client_config conf;
108 entity_addr_t addr;
109 ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
110 ceph_assert_always(addr.is_msgr2());
111
112 conf.server_addr = addr;
113 conf.block_size = options["client-bs"].as<unsigned>();
114 conf.ramptime = options["ramptime"].as<unsigned>();
115 conf.msgtime = options["msgtime"].as<unsigned>();
116 conf.num_clients = options["clients"].as<unsigned>();
117 ceph_assert_always(conf.num_clients > 0);
118 conf.num_conns = options["conns-per-client"].as<unsigned>();
119 ceph_assert_always(conf.num_conns > 0);
120 conf.depth = options["depth"].as<unsigned>();
121 conf.skip_core_0 = options["client-skip-core-0"].as<bool>();
122 return conf;
123 }
124 };
125
126 struct server_config {
127 entity_addr_t addr;
128 unsigned block_size;
129 bool is_fixed_cpu;
130 unsigned core;
131
132 std::string str() const {
133 std::ostringstream out;
134 out << "server[" << addr
135 << "](bs=" << block_size
136 << ", is_fixed_cpu=" << is_fixed_cpu
137 << ", core=" << core
138 << ")";
139 return out.str();
140 }
141
142 static server_config load(bpo::variables_map& options) {
143 server_config conf;
144 entity_addr_t addr;
145 ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
146 ceph_assert_always(addr.is_msgr2());
147
148 conf.addr = addr;
149 conf.block_size = options["server-bs"].as<unsigned>();
150 conf.is_fixed_cpu = options["server-fixed-cpu"].as<bool>();
151 conf.core = options["server-core"].as<unsigned>();
152 return conf;
153 }
154 };
155
156 const unsigned SAMPLE_RATE = 256;
157
158 static seastar::future<> run(
159 perf_mode_t mode,
160 const client_config& client_conf,
161 const server_config& server_conf,
162 bool crc_enabled)
163 {
164 struct test_state {
165 struct Server final
166 : public crimson::net::Dispatcher,
167 public seastar::peering_sharded_service<Server> {
168 // available only in msgr_sid
169 crimson::net::MessengerRef msgr;
170 crimson::auth::DummyAuthClientServer dummy_auth;
171 const seastar::shard_id msgr_sid;
172 std::string lname;
173
174 bool is_fixed_cpu = true;
175 bool is_stopped = false;
176 std::optional<seastar::future<>> fut_report;
177
178 unsigned conn_count = 0;
179 unsigned msg_count = 0;
180 MessageRef last_msg;
181
182 // available in all shards
183 unsigned msg_len;
184 bufferlist msg_data;
185
186 Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report)
187 : msgr_sid{msgr_sid},
188 msg_len{msg_len} {
189 lname = fmt::format("server@{}", msgr_sid);
190 msg_data.append_zero(msg_len);
191
192 if (seastar::this_shard_id() == msgr_sid &&
193 needs_report) {
194 start_report();
195 }
196 }
197
198 void ms_handle_connect(
199 crimson::net::ConnectionRef,
200 seastar::shard_id) override {
201 ceph_abort("impossible, server won't connect");
202 }
203
204 void ms_handle_accept(
205 crimson::net::ConnectionRef,
206 seastar::shard_id new_shard,
207 bool is_replace) override {
208 ceph_assert_always(new_shard == seastar::this_shard_id());
209 auto &server = container().local();
210 ++server.conn_count;
211 }
212
213 void ms_handle_reset(
214 crimson::net::ConnectionRef,
215 bool) override {
216 auto &server = container().local();
217 --server.conn_count;
218 }
219
220 std::optional<seastar::future<>> ms_dispatch(
221 crimson::net::ConnectionRef c, MessageRef m) override {
222 assert(c->get_shard_id() == seastar::this_shard_id());
223 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
224
225 auto &server = container().local();
226
227 // server replies with MOSDOp to generate server-side write workload
228 const static pg_t pgid;
229 const static object_locator_t oloc;
230 const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
231 pgid.pool(), oloc.nspace);
232 static spg_t spgid(pgid);
233 auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
234 bufferlist data(server.msg_data);
235 rep->write(0, server.msg_len, data);
236 rep->set_tid(m->get_tid());
237 ++server.msg_count;
238 std::ignore = c->send(std::move(rep));
239
240 if (server.msg_count % 16 == 0) {
241 server.last_msg = std::move(m);
242 }
243 return {seastar::now()};
244 }
245
246 seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) {
247 return container().invoke_on(
248 msgr_sid, [addr, is_fixed_cpu](auto &server) {
249 // server msgr is always with nonce 0
250 server.msgr = crimson::net::Messenger::create(
251 entity_name_t::OSD(server.msgr_sid),
252 server.lname, 0, is_fixed_cpu);
253 server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
254 server.msgr->set_auth_client(&server.dummy_auth);
255 server.msgr->set_auth_server(&server.dummy_auth);
256 server.is_fixed_cpu = is_fixed_cpu;
257 return server.msgr->bind(entity_addrvec_t{addr}
258 ).safe_then([&server] {
259 return server.msgr->start({&server});
260 }, crimson::net::Messenger::bind_ertr::all_same_way(
261 [addr] (const std::error_code& e) {
262 logger().error("Server: "
263 "there is another instance running at {}", addr);
264 ceph_abort();
265 }));
266 });
267 }
268
269 seastar::future<> shutdown() {
270 logger().info("{} shutdown...", lname);
271 return container().invoke_on(
272 msgr_sid, [](auto &server) {
273 server.is_stopped = true;
274 ceph_assert(server.msgr);
275 server.msgr->stop();
276 return server.msgr->shutdown(
277 ).then([&server] {
278 if (server.fut_report.has_value()) {
279 return std::move(server.fut_report.value());
280 } else {
281 return seastar::now();
282 }
283 });
284 });
285 }
286
287 private:
288 struct ShardReport {
289 unsigned msg_count = 0;
290
291 // per-interval metrics
292 double reactor_utilization;
293 unsigned conn_count = 0;
294 int msg_size = 0;
295 unsigned msg_count_interval = 0;
296 };
297
298 // should not be called frequently to impact performance
299 void get_report(ShardReport& last) {
300 unsigned last_msg_count = last.msg_count;
301 int msg_size = -1;
302 if (last_msg) {
303 auto msg = boost::static_pointer_cast<MOSDOp>(last_msg);
304 msg->finish_decode();
305 ceph_assert_always(msg->ops.size() == 1);
306 msg_size = msg->ops[0].op.extent.length;
307 last_msg.reset();
308 }
309
310 last.msg_count = msg_count;
311 last.reactor_utilization = get_reactor_utilization();
312 last.conn_count = conn_count;
313 last.msg_size = msg_size;
314 last.msg_count_interval = msg_count - last_msg_count;
315 }
316
317 struct TimerReport {
318 unsigned elapsed = 0u;
319 mono_time start_time = mono_clock::zero();
320 std::vector<ShardReport> reports;
321
322 TimerReport(unsigned shards) : reports(shards) {}
323 };
324
325 void start_report() {
326 seastar::promise<> pr_report;
327 fut_report = pr_report.get_future();
328 seastar::do_with(
329 TimerReport(seastar::smp::count),
330 [this](auto &report) {
331 return seastar::do_until(
332 [this] { return is_stopped; },
333 [&report, this] {
334 return seastar::sleep(2s
335 ).then([&report, this] {
336 report.elapsed += 2;
337 if (is_fixed_cpu) {
338 return seastar::smp::submit_to(msgr_sid,
339 [&report, this] {
340 auto &server = container().local();
341 server.get_report(report.reports[seastar::this_shard_id()]);
342 }).then([&report, this] {
343 auto now = mono_clock::now();
344 auto prv = report.start_time;
345 report.start_time = now;
346 if (prv == mono_clock::zero()) {
347 // cannot compute duration
348 return;
349 }
350 std::chrono::duration<double> duration_d = now - prv;
351 double duration = duration_d.count();
352 auto &ireport = report.reports[msgr_sid];
353 double iops = ireport.msg_count_interval / duration;
354 double throughput_MB = -1;
355 if (ireport.msg_size >= 0) {
356 throughput_MB = iops * ireport.msg_size / 1048576;
357 }
358 std::ostringstream sout;
359 sout << setfill(' ')
360 << report.elapsed
361 << "(" << std::setw(5) << duration << ") "
362 << std::setw(9) << iops << "IOPS "
363 << std::setw(8) << throughput_MB << "MiB/s "
364 << ireport.reactor_utilization
365 << "(" << ireport.conn_count << ")";
366 std::cout << sout.str() << std::endl;
367 });
368 } else {
369 return seastar::smp::invoke_on_all([&report, this] {
370 auto &server = container().local();
371 server.get_report(report.reports[seastar::this_shard_id()]);
372 }).then([&report, this] {
373 auto now = mono_clock::now();
374 auto prv = report.start_time;
375 report.start_time = now;
376 if (prv == mono_clock::zero()) {
377 // cannot compute duration
378 return;
379 }
380 std::chrono::duration<double> duration_d = now - prv;
381 double duration = duration_d.count();
382 unsigned num_msgs = 0;
383 // -1 means unavailable, -2 means mismatch
384 int msg_size = -1;
385 for (auto &i : report.reports) {
386 if (i.msg_size >= 0) {
387 if (msg_size == -2) {
388 // pass
389 } else if (msg_size == -1) {
390 msg_size = i.msg_size;
391 } else {
392 if (msg_size != i.msg_size) {
393 msg_size = -2;
394 }
395 }
396 }
397 num_msgs += i.msg_count_interval;
398 }
399 double iops = num_msgs / duration;
400 double throughput_MB = msg_size;
401 if (msg_size >= 0) {
402 throughput_MB = iops * msg_size / 1048576;
403 }
404 std::ostringstream sout;
405 sout << setfill(' ')
406 << report.elapsed
407 << "(" << std::setw(5) << duration << ") "
408 << std::setw(9) << iops << "IOPS "
409 << std::setw(8) << throughput_MB << "MiB/s ";
410 for (auto &i : report.reports) {
411 sout << i.reactor_utilization
412 << "(" << i.conn_count << ") ";
413 }
414 std::cout << sout.str() << std::endl;
415 });
416 }
417 });
418 }
419 );
420 }).then([this] {
421 logger().info("report is stopped!");
422 }).forward_to(std::move(pr_report));
423 }
424 };
425
426 struct Client final
427 : public crimson::net::Dispatcher,
428 public seastar::peering_sharded_service<Client> {
429
430 struct ConnStats {
431 mono_time connecting_time = mono_clock::zero();
432 mono_time connected_time = mono_clock::zero();
433 unsigned received_count = 0u;
434
435 mono_time start_time = mono_clock::zero();
436 unsigned start_count = 0u;
437
438 unsigned sampled_count = 0u;
439 double sampled_total_lat_s = 0.0;
440
441 // for reporting only
442 mono_time finish_time = mono_clock::zero();
443
444 void start_connecting() {
445 connecting_time = mono_clock::now();
446 }
447
448 void finish_connecting() {
449 ceph_assert_always(connected_time == mono_clock::zero());
450 connected_time = mono_clock::now();
451 }
452
453 void start_collect() {
454 ceph_assert_always(connected_time != mono_clock::zero());
455 start_time = mono_clock::now();
456 start_count = received_count;
457 sampled_count = 0u;
458 sampled_total_lat_s = 0.0;
459 finish_time = mono_clock::zero();
460 }
461
462 void prepare_summary(const ConnStats &current) {
463 *this = current;
464 finish_time = mono_clock::now();
465 }
466 };
467
468 struct PeriodStats {
469 mono_time start_time = mono_clock::zero();
470 unsigned start_count = 0u;
471 unsigned sampled_count = 0u;
472 double sampled_total_lat_s = 0.0;
473
474 // for reporting only
475 mono_time finish_time = mono_clock::zero();
476 unsigned finish_count = 0u;
477 unsigned depth = 0u;
478
479 void start_collect(unsigned received_count) {
480 start_time = mono_clock::now();
481 start_count = received_count;
482 sampled_count = 0u;
483 sampled_total_lat_s = 0.0;
484 }
485
486 void reset_period(
487 unsigned received_count, unsigned _depth, PeriodStats &snapshot) {
488 snapshot.start_time = start_time;
489 snapshot.start_count = start_count;
490 snapshot.sampled_count = sampled_count;
491 snapshot.sampled_total_lat_s = sampled_total_lat_s;
492 snapshot.finish_time = mono_clock::now();
493 snapshot.finish_count = received_count;
494 snapshot.depth = _depth;
495
496 start_collect(received_count);
497 }
498 };
499
500 struct JobReport {
501 std::string name;
502 unsigned depth = 0;
503 double connect_time_s = 0;
504 unsigned total_msgs = 0;
505 double messaging_time_s = 0;
506 double latency_ms = 0;
507 double iops = 0;
508 double throughput_mbps = 0;
509
510 void account(const JobReport &stats) {
511 depth += stats.depth;
512 connect_time_s += stats.connect_time_s;
513 total_msgs += stats.total_msgs;
514 messaging_time_s += stats.messaging_time_s;
515 latency_ms += stats.latency_ms;
516 iops += stats.iops;
517 throughput_mbps += stats.throughput_mbps;
518 }
519
520 void report() const {
521 auto str = fmt::format(
522 "{}(depth={}):\n"
523 " connect time: {:08f}s\n"
524 " messages received: {}\n"
525 " messaging time: {:08f}s\n"
526 " latency: {:08f}ms\n"
527 " IOPS: {:08f}\n"
528 " out throughput: {:08f}MB/s",
529 name, depth, connect_time_s,
530 total_msgs, messaging_time_s,
531 latency_ms, iops,
532 throughput_mbps);
533 std::cout << str << std::endl;
534 }
535 };
536
537 struct ConnectionPriv : public crimson::net::Connection::user_private_t {
538 unsigned index;
539 ConnectionPriv(unsigned i) : index{i} {}
540 };
541
542 struct ConnState {
543 crimson::net::MessengerRef msgr;
544 ConnStats conn_stats;
545 PeriodStats period_stats;
546 seastar::semaphore depth;
547 std::vector<lowres_clock_t::time_point> time_msgs_sent;
548 unsigned sent_count = 0u;
549 crimson::net::ConnectionRef active_conn;
550 bool stop_send = false;
551 seastar::promise<JobReport> stopped_send_promise;
552
553 ConnState(std::size_t _depth)
554 : depth{_depth},
555 time_msgs_sent{_depth, lowres_clock_t::time_point::min()} {}
556
557 unsigned get_current_units() const {
558 ceph_assert(depth.available_units() >= 0);
559 return depth.current();
560 }
561
562 seastar::future<JobReport> stop_dispatch_messages() {
563 stop_send = true;
564 depth.broken(DepthBroken());
565 return stopped_send_promise.get_future();
566 }
567 };
568
569 const seastar::shard_id sid;
570 const unsigned id;
571 const std::optional<unsigned> server_sid;
572
573 const unsigned num_clients;
574 const unsigned num_conns;
575 const unsigned msg_len;
576 bufferlist msg_data;
577 const unsigned nr_depth;
578 const unsigned nonce_base;
579 crimson::auth::DummyAuthClientServer dummy_auth;
580
581 std::vector<ConnState> conn_states;
582
583 Client(unsigned num_clients,
584 unsigned num_conns,
585 unsigned msg_len,
586 unsigned _depth,
587 unsigned nonce_base,
588 std::optional<unsigned> server_sid)
589 : sid{seastar::this_shard_id()},
590 id{sid + num_clients - seastar::smp::count},
591 server_sid{server_sid},
592 num_clients{num_clients},
593 num_conns{num_conns},
594 msg_len{msg_len},
595 nr_depth{_depth},
596 nonce_base{nonce_base} {
597 if (is_active()) {
598 for (unsigned i = 0; i < num_conns; ++i) {
599 conn_states.emplace_back(nr_depth);
600 }
601 }
602 msg_data.append_zero(msg_len);
603 }
604
605 std::string get_name(unsigned i) {
606 return fmt::format("client{}Conn{}@{}", id, i, sid);
607 }
608
609 void ms_handle_connect(
610 crimson::net::ConnectionRef conn,
611 seastar::shard_id prv_shard) override {
612 ceph_assert_always(prv_shard == seastar::this_shard_id());
613 assert(is_active());
614 unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
615 auto &conn_state = conn_states[index];
616 conn_state.conn_stats.finish_connecting();
617 }
618
619 std::optional<seastar::future<>> ms_dispatch(
620 crimson::net::ConnectionRef conn, MessageRef m) override {
621 assert(is_active());
622 // server replies with MOSDOp to generate server-side write workload
623 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
624
625 unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
626 assert(index < num_conns);
627 auto &conn_state = conn_states[index];
628
629 auto msg_id = m->get_tid();
630 if (msg_id % SAMPLE_RATE == 0) {
631 auto msg_index = msg_id % conn_state.time_msgs_sent.size();
632 ceph_assert(conn_state.time_msgs_sent[msg_index] !=
633 lowres_clock_t::time_point::min());
634 std::chrono::duration<double> cur_latency =
635 lowres_clock_t::now() - conn_state.time_msgs_sent[msg_index];
636 conn_state.conn_stats.sampled_total_lat_s += cur_latency.count();
637 ++(conn_state.conn_stats.sampled_count);
638 conn_state.period_stats.sampled_total_lat_s += cur_latency.count();
639 ++(conn_state.period_stats.sampled_count);
640 conn_state.time_msgs_sent[msg_index] = lowres_clock_t::time_point::min();
641 }
642
643 ++(conn_state.conn_stats.received_count);
644 conn_state.depth.signal(1);
645
646 return {seastar::now()};
647 }
648
649 // should start messenger at this shard?
650 bool is_active() {
651 ceph_assert(seastar::this_shard_id() == sid);
652 return sid + num_clients >= seastar::smp::count;
653 }
654
655 seastar::future<> init() {
656 return container().invoke_on_all([](auto& client) {
657 if (client.is_active()) {
658 return seastar::do_for_each(
659 boost::make_counting_iterator(0u),
660 boost::make_counting_iterator(client.num_conns),
661 [&client](auto i) {
662 auto &conn_state = client.conn_states[i];
663 std::string name = client.get_name(i);
664 conn_state.msgr = crimson::net::Messenger::create(
665 entity_name_t::OSD(client.id * client.num_conns + i),
666 name, client.nonce_base + client.id * client.num_conns + i, true);
667 conn_state.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
668 conn_state.msgr->set_auth_client(&client.dummy_auth);
669 conn_state.msgr->set_auth_server(&client.dummy_auth);
670 return conn_state.msgr->start({&client});
671 });
672 }
673 return seastar::now();
674 });
675 }
676
677 seastar::future<> shutdown() {
678 return seastar::do_with(
679 std::vector<JobReport>(num_clients * num_conns),
680 [this](auto &all_stats) {
681 return container().invoke_on_all([&all_stats](auto& client) {
682 if (!client.is_active()) {
683 return seastar::now();
684 }
685
686 return seastar::parallel_for_each(
687 boost::make_counting_iterator(0u),
688 boost::make_counting_iterator(client.num_conns),
689 [&all_stats, &client](auto i) {
690 logger().info("{} shutdown...", client.get_name(i));
691 auto &conn_state = client.conn_states[i];
692 return conn_state.stop_dispatch_messages(
693 ).then([&all_stats, &client, i](auto stats) {
694 all_stats[client.id * client.num_conns + i] = stats;
695 });
696 }).then([&client] {
697 return seastar::do_for_each(
698 boost::make_counting_iterator(0u),
699 boost::make_counting_iterator(client.num_conns),
700 [&client](auto i) {
701 auto &conn_state = client.conn_states[i];
702 ceph_assert(conn_state.msgr);
703 conn_state.msgr->stop();
704 return conn_state.msgr->shutdown();
705 });
706 });
707 }).then([&all_stats, this] {
708 auto nr_jobs = all_stats.size();
709 JobReport summary;
710 std::vector<JobReport> clients(num_clients);
711
712 for (unsigned i = 0; i < nr_jobs; ++i) {
713 auto &stats = all_stats[i];
714 stats.report();
715 clients[i / num_conns].account(stats);
716 summary.account(stats);
717 }
718
719 std::cout << std::endl;
720 std::cout << "per client:" << std::endl;
721 for (unsigned i = 0; i < num_clients; ++i) {
722 auto &stats = clients[i];
723 stats.name = fmt::format("client{}", i);
724 stats.connect_time_s /= num_conns;
725 stats.messaging_time_s /= num_conns;
726 stats.latency_ms /= num_conns;
727 stats.report();
728 }
729
730 std::cout << std::endl;
731 summary.name = fmt::format("all", nr_jobs);
732 summary.connect_time_s /= nr_jobs;
733 summary.messaging_time_s /= nr_jobs;
734 summary.latency_ms /= nr_jobs;
735 summary.report();
736 });
737 });
738 }
739
740 seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
741 return container().invoke_on_all([peer_addr](auto& client) {
742 // start clients in active cores
743 if (client.is_active()) {
744 for (unsigned i = 0; i < client.num_conns; ++i) {
745 auto &conn_state = client.conn_states[i];
746 conn_state.conn_stats.start_connecting();
747 conn_state.active_conn = conn_state.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
748 conn_state.active_conn->set_user_private(
749 std::make_unique<ConnectionPriv>(i));
750 }
751 // make sure handshake won't hurt the performance
752 return seastar::sleep(1s).then([&client] {
753 for (unsigned i = 0; i < client.num_conns; ++i) {
754 auto &conn_state = client.conn_states[i];
755 if (conn_state.conn_stats.connected_time == mono_clock::zero()) {
756 logger().error("\n{} not connected after 1s!\n",
757 client.get_name(i));
758 ceph_assert(false);
759 }
760 }
761 });
762 }
763 return seastar::now();
764 });
765 }
766
767 private:
768 class TimerReport {
769 private:
770 const unsigned num_clients;
771 const unsigned num_conns;
772 const unsigned msgtime;
773 const unsigned bytes_of_block;
774
775 unsigned elapsed = 0u;
776 std::vector<PeriodStats> snaps;
777 std::vector<ConnStats> summaries;
778 std::vector<double> client_reactor_utilizations;
779 std::optional<double> server_reactor_utilization;
780
781 public:
782 TimerReport(unsigned num_clients, unsigned num_conns, unsigned msgtime, unsigned bs)
783 : num_clients{num_clients},
784 num_conns{num_conns},
785 msgtime{msgtime},
786 bytes_of_block{bs},
787 snaps{num_clients * num_conns},
788 summaries{num_clients * num_conns},
789 client_reactor_utilizations(num_clients) {}
790
791 unsigned get_elapsed() const { return elapsed; }
792
793 PeriodStats& get_snap(unsigned client_id, unsigned i) {
794 return snaps[client_id * num_conns + i];
795 }
796
797 ConnStats& get_summary(unsigned client_id, unsigned i) {
798 return summaries[client_id * num_conns + i];
799 }
800
801 void set_client_reactor_utilization(unsigned client_id, double ru) {
802 client_reactor_utilizations[client_id] = ru;
803 }
804
805 void set_server_reactor_utilization(double ru) {
806 server_reactor_utilization = ru;
807 }
808
809 bool should_stop() const {
810 return elapsed >= msgtime;
811 }
812
813 seastar::future<> ticktock() {
814 return seastar::sleep(1s).then([this] {
815 ++elapsed;
816 });
817 }
818
819 void report_header() const {
820 std::ostringstream sout;
821 sout << std::setfill(' ')
822 << std::setw(6) << "sec"
823 << std::setw(7) << "depth"
824 << std::setw(10) << "IOPS"
825 << std::setw(9) << "MB/s"
826 << std::setw(9) << "lat(ms)";
827 std::cout << sout.str() << std::endl;
828 }
829
830 void report_period() {
831 std::chrono::duration<double> elapsed_d = 0s;
832 unsigned depth = 0u;
833 unsigned ops = 0u;
834 unsigned sampled_count = 0u;
835 double sampled_total_lat_s = 0.0;
836 for (const auto& snap: snaps) {
837 elapsed_d += (snap.finish_time - snap.start_time);
838 depth += snap.depth;
839 ops += (snap.finish_count - snap.start_count);
840 sampled_count += snap.sampled_count;
841 sampled_total_lat_s += snap.sampled_total_lat_s;
842 }
843 double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
844 double iops = ops/elapsed_s;
845 std::ostringstream sout;
846 sout << setfill(' ')
847 << std::setw(5) << elapsed_s
848 << " "
849 << std::setw(6) << depth
850 << " "
851 << std::setw(9) << iops
852 << " "
853 << std::setw(8) << iops * bytes_of_block / 1048576
854 << " "
855 << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
856 << " -- ";
857 if (server_reactor_utilization.has_value()) {
858 sout << *server_reactor_utilization << " -- ";
859 }
860 for (double cru : client_reactor_utilizations) {
861 sout << cru << ",";
862 }
863 std::cout << sout.str() << std::endl;
864 }
865
866 void report_summary() const {
867 std::chrono::duration<double> elapsed_d = 0s;
868 unsigned ops = 0u;
869 unsigned sampled_count = 0u;
870 double sampled_total_lat_s = 0.0;
871 for (const auto& summary: summaries) {
872 elapsed_d += (summary.finish_time - summary.start_time);
873 ops += (summary.received_count - summary.start_count);
874 sampled_count += summary.sampled_count;
875 sampled_total_lat_s += summary.sampled_total_lat_s;
876 }
877 double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
878 double iops = ops / elapsed_s;
879 std::ostringstream sout;
880 sout << "--------------"
881 << " summary "
882 << "--------------\n"
883 << setfill(' ')
884 << std::setw(7) << elapsed_s
885 << std::setw(6) << "-"
886 << std::setw(8) << iops
887 << std::setw(8) << iops * bytes_of_block / 1048576
888 << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
889 << "\n";
890 std::cout << sout.str() << std::endl;
891 }
892 };
893
894 seastar::future<> report_period(TimerReport& report) {
895 return container().invoke_on_all([&report] (auto& client) {
896 if (client.is_active()) {
897 for (unsigned i = 0; i < client.num_conns; ++i) {
898 auto &conn_state = client.conn_states[i];
899 PeriodStats& snap = report.get_snap(client.id, i);
900 conn_state.period_stats.reset_period(
901 conn_state.conn_stats.received_count,
902 client.nr_depth - conn_state.get_current_units(),
903 snap);
904 }
905 report.set_client_reactor_utilization(client.id, get_reactor_utilization());
906 }
907 if (client.server_sid.has_value() &&
908 seastar::this_shard_id() == *client.server_sid) {
909 assert(!client.is_active());
910 report.set_server_reactor_utilization(get_reactor_utilization());
911 }
912 }).then([&report] {
913 report.report_period();
914 });
915 }
916
917 seastar::future<> report_summary(TimerReport& report) {
918 return container().invoke_on_all([&report] (auto& client) {
919 if (client.is_active()) {
920 for (unsigned i = 0; i < client.num_conns; ++i) {
921 auto &conn_state = client.conn_states[i];
922 ConnStats& summary = report.get_summary(client.id, i);
923 summary.prepare_summary(conn_state.conn_stats);
924 }
925 }
926 }).then([&report] {
927 report.report_summary();
928 });
929 }
930
931 public:
932 seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
933 logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns",
934 num_clients, num_conns);
935 return container().invoke_on_all([] (auto& client) {
936 if (client.is_active()) {
937 for (unsigned i = 0; i < client.num_conns; ++i) {
938 client.do_dispatch_messages(i);
939 }
940 }
941 }).then([ramptime] {
942 logger().info("[all clients]: ramping up {} seconds...", ramptime);
943 return seastar::sleep(std::chrono::seconds(ramptime));
944 }).then([this] {
945 return container().invoke_on_all([] (auto& client) {
946 if (client.is_active()) {
947 for (unsigned i = 0; i < client.num_conns; ++i) {
948 auto &conn_state = client.conn_states[i];
949 conn_state.conn_stats.start_collect();
950 conn_state.period_stats.start_collect(conn_state.conn_stats.received_count);
951 }
952 }
953 });
954 }).then([this, msgtime] {
955 logger().info("[all clients]: reporting {} seconds...\n", msgtime);
956 return seastar::do_with(
957 TimerReport(num_clients, num_conns, msgtime, msg_len),
958 [this](auto& report) {
959 report.report_header();
960 return seastar::do_until(
961 [&report] { return report.should_stop(); },
962 [&report, this] {
963 return report.ticktock().then([&report, this] {
964 // report period every 1s
965 return report_period(report);
966 }).then([&report, this] {
967 // report summary every 10s
968 if (report.get_elapsed() % 10 == 0) {
969 return report_summary(report);
970 } else {
971 return seastar::now();
972 }
973 });
974 }
975 ).then([&report, this] {
976 // report the final summary
977 if (report.get_elapsed() % 10 != 0) {
978 return report_summary(report);
979 } else {
980 return seastar::now();
981 }
982 });
983 });
984 });
985 }
986
987 private:
988 seastar::future<> send_msg(ConnState &conn_state) {
989 ceph_assert(seastar::this_shard_id() == sid);
990 conn_state.sent_count += 1;
991 return conn_state.depth.wait(1
992 ).then([this, &conn_state] {
993 const static pg_t pgid;
994 const static object_locator_t oloc;
995 const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
996 pgid.pool(), oloc.nspace);
997 static spg_t spgid(pgid);
998 auto m = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
999 bufferlist data(msg_data);
1000 m->write(0, msg_len, data);
1001 // use tid as the identity of each round
1002 m->set_tid(conn_state.sent_count);
1003
1004 // sample message latency
1005 if (unlikely(conn_state.sent_count % SAMPLE_RATE == 0)) {
1006 auto index = conn_state.sent_count % conn_state.time_msgs_sent.size();
1007 ceph_assert(conn_state.time_msgs_sent[index] ==
1008 lowres_clock_t::time_point::min());
1009 conn_state.time_msgs_sent[index] = lowres_clock_t::now();
1010 }
1011
1012 return conn_state.active_conn->send(std::move(m));
1013 });
1014 }
1015
1016 class DepthBroken: public std::exception {};
1017
1018 seastar::future<JobReport> stop_dispatch_messages(unsigned i) {
1019 auto &conn_state = conn_states[i];
1020 conn_state.stop_send = true;
1021 conn_state.depth.broken(DepthBroken());
1022 return conn_state.stopped_send_promise.get_future();
1023 }
1024
1025 void do_dispatch_messages(unsigned i) {
1026 ceph_assert(seastar::this_shard_id() == sid);
1027 auto &conn_state = conn_states[i];
1028 ceph_assert(conn_state.sent_count == 0);
1029 conn_state.conn_stats.start_time = mono_clock::now();
1030 // forwarded to stopped_send_promise
1031 (void) seastar::do_until(
1032 [&conn_state] { return conn_state.stop_send; },
1033 [this, &conn_state] { return send_msg(conn_state); }
1034 ).handle_exception_type([] (const DepthBroken& e) {
1035 // ok, stopped by stop_dispatch_messages()
1036 }).then([this, &conn_state, i] {
1037 std::string name = get_name(i);
1038 logger().info("{} {}: stopped sending OSDOPs",
1039 name, *conn_state.active_conn);
1040
1041 std::chrono::duration<double> dur_conn =
1042 conn_state.conn_stats.connected_time -
1043 conn_state.conn_stats.connecting_time;
1044 std::chrono::duration<double> dur_msg =
1045 mono_clock::now() - conn_state.conn_stats.start_time;
1046 unsigned ops =
1047 conn_state.conn_stats.received_count -
1048 conn_state.conn_stats.start_count;
1049
1050 JobReport stats;
1051 stats.name = name;
1052 stats.depth = nr_depth;
1053 stats.connect_time_s = dur_conn.count();
1054 stats.total_msgs = ops;
1055 stats.messaging_time_s = dur_msg.count();
1056 stats.latency_ms =
1057 conn_state.conn_stats.sampled_total_lat_s /
1058 conn_state.conn_stats.sampled_count * 1000;
1059 stats.iops = ops / dur_msg.count();
1060 stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576;
1061
1062 conn_state.stopped_send_promise.set_value(stats);
1063 });
1064 }
1065 };
1066 };
1067
1068 std::optional<unsigned> server_sid;
1069 bool server_needs_report = false;
1070 if (mode == perf_mode_t::both) {
1071 ceph_assert(server_conf.is_fixed_cpu == true);
1072 server_sid = server_conf.core;
1073 } else if (mode == perf_mode_t::server) {
1074 server_needs_report = true;
1075 }
1076 return seastar::when_all(
1077 seastar::futurize_invoke([mode, server_conf, server_needs_report] {
1078 if (mode == perf_mode_t::client) {
1079 return seastar::make_ready_future<test_state::Server*>(nullptr);
1080 } else {
1081 return create_sharded<test_state::Server>(
1082 server_conf.core,
1083 server_conf.block_size,
1084 server_needs_report);
1085 }
1086 }),
1087 seastar::futurize_invoke([mode, client_conf, server_sid] {
1088 if (mode == perf_mode_t::server) {
1089 return seastar::make_ready_future<test_state::Client*>(nullptr);
1090 } else {
1091 unsigned nonce_base = ceph::util::generate_random_number<unsigned>();
1092 logger().info("client nonce_base={}", nonce_base);
1093 return create_sharded<test_state::Client>(
1094 client_conf.num_clients,
1095 client_conf.num_conns,
1096 client_conf.block_size,
1097 client_conf.depth,
1098 nonce_base,
1099 server_sid);
1100 }
1101 }),
1102 crimson::common::sharded_conf().start(
1103 EntityName{}, std::string_view{"ceph"}
1104 ).then([] {
1105 return crimson::common::local_conf().start();
1106 }).then([crc_enabled] {
1107 return crimson::common::local_conf().set_val(
1108 "ms_crc_data", crc_enabled ? "true" : "false");
1109 })
1110 ).then([=](auto&& ret) {
1111 auto server = std::move(std::get<0>(ret).get0());
1112 auto client = std::move(std::get<1>(ret).get0());
1113 // reserve core 0 for potentially better performance
1114 if (mode == perf_mode_t::both) {
1115 logger().info("\nperf settings:\n smp={}\n {}\n {}\n",
1116 seastar::smp::count, client_conf.str(), server_conf.str());
1117 if (client_conf.skip_core_0) {
1118 ceph_assert(seastar::smp::count > client_conf.num_clients);
1119 } else {
1120 ceph_assert(seastar::smp::count >= client_conf.num_clients);
1121 }
1122 ceph_assert(client_conf.num_clients > 0);
1123 ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients);
1124 return seastar::when_all_succeed(
1125 // it is not reasonable to allow server/client to shared cores for
1126 // performance benchmarking purposes.
1127 server->init(server_conf.addr, server_conf.is_fixed_cpu),
1128 client->init()
1129 ).then_unpack([client, addr = client_conf.server_addr] {
1130 return client->connect_wait_verify(addr);
1131 }).then([client, ramptime = client_conf.ramptime,
1132 msgtime = client_conf.msgtime] {
1133 return client->dispatch_with_timer(ramptime, msgtime);
1134 }).then([client] {
1135 return client->shutdown();
1136 }).then([server] {
1137 return server->shutdown();
1138 });
1139 } else if (mode == perf_mode_t::client) {
1140 logger().info("\nperf settings:\n smp={}\n {}\n",
1141 seastar::smp::count, client_conf.str());
1142 if (client_conf.skip_core_0) {
1143 ceph_assert(seastar::smp::count > client_conf.num_clients);
1144 } else {
1145 ceph_assert(seastar::smp::count >= client_conf.num_clients);
1146 }
1147 ceph_assert(client_conf.num_clients > 0);
1148 return client->init(
1149 ).then([client, addr = client_conf.server_addr] {
1150 return client->connect_wait_verify(addr);
1151 }).then([client, ramptime = client_conf.ramptime,
1152 msgtime = client_conf.msgtime] {
1153 return client->dispatch_with_timer(ramptime, msgtime);
1154 }).then([client] {
1155 return client->shutdown();
1156 });
1157 } else { // mode == perf_mode_t::server
1158 ceph_assert(seastar::smp::count > server_conf.core);
1159 logger().info("\nperf settings:\n smp={}\n {}\n",
1160 seastar::smp::count, server_conf.str());
1161 return seastar::async([server, server_conf] {
1162 // FIXME: SIGINT is not received by stop_signal
1163 seastar_apps_lib::stop_signal should_stop;
1164 server->init(server_conf.addr, server_conf.is_fixed_cpu).get();
1165 should_stop.wait().get();
1166 server->shutdown().get();
1167 });
1168 }
1169 }).finally([] {
1170 return crimson::common::sharded_conf().stop();
1171 });
1172 }
1173
1174 }
1175
1176 int main(int argc, char** argv)
1177 {
1178 seastar::app_template app;
1179 app.add_options()
1180 ("mode", bpo::value<unsigned>()->default_value(0),
1181 "0: both, 1:client, 2:server")
1182 ("server-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
1183 "server address(only support msgr v2 protocol)")
1184 ("ramptime", bpo::value<unsigned>()->default_value(5),
1185 "seconds of client ramp-up time")
1186 ("msgtime", bpo::value<unsigned>()->default_value(15),
1187 "seconds of client messaging time")
1188 ("clients", bpo::value<unsigned>()->default_value(1),
1189 "number of client messengers")
1190 ("conns-per-client", bpo::value<unsigned>()->default_value(1),
1191 "number of connections per client")
1192 ("client-bs", bpo::value<unsigned>()->default_value(4096),
1193 "client block size")
1194 ("depth", bpo::value<unsigned>()->default_value(512),
1195 "client io depth per job")
1196 ("client-skip-core-0", bpo::value<bool>()->default_value(true),
1197 "client skip core 0")
1198 ("server-fixed-cpu", bpo::value<bool>()->default_value(true),
1199 "server is in the fixed cpu mode, non-fixed doesn't support the mode both")
1200 ("server-core", bpo::value<unsigned>()->default_value(1),
1201 "server messenger running core")
1202 ("server-bs", bpo::value<unsigned>()->default_value(0),
1203 "server block size")
1204 ("crc-enabled", bpo::value<bool>()->default_value(false),
1205 "enable CRC checks");
1206 return app.run(argc, argv, [&app] {
1207 auto&& config = app.configuration();
1208 auto mode = config["mode"].as<unsigned>();
1209 ceph_assert(mode <= 2);
1210 auto _mode = static_cast<perf_mode_t>(mode);
1211 bool crc_enabled = config["crc-enabled"].as<bool>();
1212 auto server_conf = server_config::load(config);
1213 auto client_conf = client_config::load(config);
1214 return run(_mode, client_conf, server_conf, crc_enabled
1215 ).then([] {
1216 logger().info("\nsuccessful!\n");
1217 }).handle_exception([] (auto eptr) {
1218 logger().info("\nfailed!\n");
1219 return seastar::make_exception_future<>(eptr);
1220 });
1221 });
1222 }