]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/tools/perf_crimson_msgr.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / tools / perf_crimson_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <map>
5 #include <random>
6 #include <boost/program_options.hpp>
7
8 #include <seastar/core/app-template.hh>
9 #include <seastar/core/do_with.hh>
10 #include <seastar/core/future-util.hh>
11 #include <seastar/core/reactor.hh>
12 #include <seastar/core/sleep.hh>
13 #include <seastar/core/semaphore.hh>
14 #include <seastar/core/smp.hh>
15
16 #include "common/ceph_time.h"
17 #include "messages/MOSDOp.h"
18
19 #include "crimson/auth/DummyAuth.h"
20 #include "crimson/common/log.h"
21 #include "crimson/common/config_proxy.h"
22 #include "crimson/net/Connection.h"
23 #include "crimson/net/Dispatcher.h"
24 #include "crimson/net/Messenger.h"
25
26 using namespace std;
27 using namespace std::chrono_literals;
28
29 namespace bpo = boost::program_options;
30
31 namespace {
32
33 template<typename Message>
34 using Ref = boost::intrusive_ptr<Message>;
35
36 seastar::logger& logger() {
37 return crimson::get_logger(ceph_subsys_ms);
38 }
39
40 template <typename T, typename... Args>
41 seastar::future<T*> create_sharded(Args... args) {
42 // seems we should only construct/stop shards on #0
43 return seastar::smp::submit_to(0, [=] {
44 auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
45 return sharded_obj->start(args...).then([sharded_obj]() {
46 seastar::engine().at_exit([sharded_obj]() {
47 return sharded_obj->stop().then([sharded_obj] {});
48 });
49 return sharded_obj.get();
50 });
51 }).then([] (seastar::sharded<T> *ptr_shard) {
52 // return the pointer valid for the caller CPU
53 return &ptr_shard->local();
54 });
55 }
56
57 enum class perf_mode_t {
58 both,
59 client,
60 server
61 };
62
63 struct client_config {
64 entity_addr_t server_addr;
65 unsigned block_size;
66 unsigned ramptime;
67 unsigned msgtime;
68 unsigned jobs;
69 unsigned depth;
70
71 std::string str() const {
72 std::ostringstream out;
73 out << "client[>> " << server_addr
74 << "](bs=" << block_size
75 << ", ramptime=" << ramptime
76 << ", msgtime=" << msgtime
77 << ", jobs=" << jobs
78 << ", depth=" << depth
79 << ")";
80 return out.str();
81 }
82
83 static client_config load(bpo::variables_map& options) {
84 client_config conf;
85 entity_addr_t addr;
86 ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
87 ceph_assert_always(addr.is_msgr2());
88
89 conf.server_addr = addr;
90 conf.block_size = options["cbs"].as<unsigned>();
91 conf.ramptime = options["ramptime"].as<unsigned>();
92 conf.msgtime = options["msgtime"].as<unsigned>();
93 conf.jobs = options["jobs"].as<unsigned>();
94 conf.depth = options["depth"].as<unsigned>();
95 ceph_assert(conf.depth % conf.jobs == 0);
96 return conf;
97 }
98 };
99
100 struct server_config {
101 entity_addr_t addr;
102 unsigned block_size;
103 unsigned core;
104
105 std::string str() const {
106 std::ostringstream out;
107 out << "server[" << addr
108 << "](bs=" << block_size
109 << ", core=" << core
110 << ")";
111 return out.str();
112 }
113
114 static server_config load(bpo::variables_map& options) {
115 server_config conf;
116 entity_addr_t addr;
117 ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
118 ceph_assert_always(addr.is_msgr2());
119
120 conf.addr = addr;
121 conf.block_size = options["sbs"].as<unsigned>();
122 conf.core = options["core"].as<unsigned>();
123 return conf;
124 }
125 };
126
127 const unsigned SAMPLE_RATE = 7;
128
129 static seastar::future<> run(
130 perf_mode_t mode,
131 const client_config& client_conf,
132 const server_config& server_conf,
133 bool crc_enabled)
134 {
135 struct test_state {
136 struct Server;
137 using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
138
139 struct Server final
140 : public crimson::net::Dispatcher {
141 crimson::net::MessengerRef msgr;
142 crimson::auth::DummyAuthClientServer dummy_auth;
143 const seastar::shard_id msgr_sid;
144 std::string lname;
145 unsigned msg_len;
146 bufferlist msg_data;
147
148 Server(unsigned msg_len)
149 : msgr_sid{seastar::this_shard_id()},
150 msg_len{msg_len} {
151 lname = "server#";
152 lname += std::to_string(msgr_sid);
153 msg_data.append_zero(msg_len);
154 }
155
156 std::optional<seastar::future<>> ms_dispatch(
157 crimson::net::ConnectionRef c, MessageRef m) override {
158 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
159
160 // server replies with MOSDOp to generate server-side write workload
161 const static pg_t pgid;
162 const static object_locator_t oloc;
163 const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
164 pgid.pool(), oloc.nspace);
165 static spg_t spgid(pgid);
166 auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
167 bufferlist data(msg_data);
168 rep->write(0, msg_len, data);
169 rep->set_tid(m->get_tid());
170 std::ignore = c->send(std::move(rep));
171 return {seastar::now()};
172 }
173
174 seastar::future<> init(const entity_addr_t& addr) {
175 return seastar::smp::submit_to(msgr_sid, [addr, this] {
176 // server msgr is always with nonce 0
177 msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
178 msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
179 msgr->set_auth_client(&dummy_auth);
180 msgr->set_auth_server(&dummy_auth);
181 return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
182 return msgr->start({this});
183 }, crimson::net::Messenger::bind_ertr::all_same_way(
184 [addr] (const std::error_code& e) {
185 logger().error("Server: "
186 "there is another instance running at {}", addr);
187 ceph_abort();
188 }));
189 });
190 }
191 seastar::future<> shutdown() {
192 logger().info("{} shutdown...", lname);
193 return seastar::smp::submit_to(msgr_sid, [this] {
194 ceph_assert(msgr);
195 msgr->stop();
196 return msgr->shutdown();
197 });
198 }
199 seastar::future<> wait() {
200 return seastar::smp::submit_to(msgr_sid, [this] {
201 ceph_assert(msgr);
202 return msgr->wait();
203 });
204 }
205
206 static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
207 return seastar::smp::submit_to(msgr_sid, [msg_len] {
208 return seastar::make_foreign(std::make_unique<Server>(msg_len));
209 });
210 }
211 };
212
213 struct Client final
214 : public crimson::net::Dispatcher,
215 public seastar::peering_sharded_service<Client> {
216
217 struct ConnStats {
218 mono_time connecting_time = mono_clock::zero();
219 mono_time connected_time = mono_clock::zero();
220 unsigned received_count = 0u;
221
222 mono_time start_time = mono_clock::zero();
223 unsigned start_count = 0u;
224
225 unsigned sampled_count = 0u;
226 double total_lat_s = 0.0;
227
228 // for reporting only
229 mono_time finish_time = mono_clock::zero();
230
231 void start() {
232 start_time = mono_clock::now();
233 start_count = received_count;
234 sampled_count = 0u;
235 total_lat_s = 0.0;
236 finish_time = mono_clock::zero();
237 }
238 };
239 ConnStats conn_stats;
240
241 struct PeriodStats {
242 mono_time start_time = mono_clock::zero();
243 unsigned start_count = 0u;
244 unsigned sampled_count = 0u;
245 double total_lat_s = 0.0;
246
247 // for reporting only
248 mono_time finish_time = mono_clock::zero();
249 unsigned finish_count = 0u;
250 unsigned depth = 0u;
251
252 void reset(unsigned received_count, PeriodStats* snap = nullptr) {
253 if (snap) {
254 snap->start_time = start_time;
255 snap->start_count = start_count;
256 snap->sampled_count = sampled_count;
257 snap->total_lat_s = total_lat_s;
258 snap->finish_time = mono_clock::now();
259 snap->finish_count = received_count;
260 }
261 start_time = mono_clock::now();
262 start_count = received_count;
263 sampled_count = 0u;
264 total_lat_s = 0.0;
265 }
266 };
267 PeriodStats period_stats;
268
269 const seastar::shard_id sid;
270 std::string lname;
271
272 const unsigned jobs;
273 crimson::net::MessengerRef msgr;
274 const unsigned msg_len;
275 bufferlist msg_data;
276 const unsigned nr_depth;
277 seastar::semaphore depth;
278 std::vector<mono_time> time_msgs_sent;
279 crimson::auth::DummyAuthClientServer dummy_auth;
280
281 unsigned sent_count = 0u;
282 crimson::net::ConnectionRef active_conn = nullptr;
283
284 bool stop_send = false;
285 seastar::promise<> stopped_send_promise;
286
287 Client(unsigned jobs, unsigned msg_len, unsigned depth)
288 : sid{seastar::this_shard_id()},
289 jobs{jobs},
290 msg_len{msg_len},
291 nr_depth{depth/jobs},
292 depth{nr_depth},
293 time_msgs_sent{depth/jobs, mono_clock::zero()} {
294 lname = "client#";
295 lname += std::to_string(sid);
296 msg_data.append_zero(msg_len);
297 }
298
299 unsigned get_current_depth() const {
300 ceph_assert(depth.available_units() >= 0);
301 return nr_depth - depth.current();
302 }
303
304 void ms_handle_connect(crimson::net::ConnectionRef conn) override {
305 conn_stats.connected_time = mono_clock::now();
306 }
307 std::optional<seastar::future<>> ms_dispatch(
308 crimson::net::ConnectionRef, MessageRef m) override {
309 // server replies with MOSDOp to generate server-side write workload
310 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
311
312 auto msg_id = m->get_tid();
313 if (msg_id % SAMPLE_RATE == 0) {
314 auto index = msg_id % time_msgs_sent.size();
315 ceph_assert(time_msgs_sent[index] != mono_clock::zero());
316 std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
317 conn_stats.total_lat_s += cur_latency.count();
318 ++(conn_stats.sampled_count);
319 period_stats.total_lat_s += cur_latency.count();
320 ++(period_stats.sampled_count);
321 time_msgs_sent[index] = mono_clock::zero();
322 }
323
324 ++(conn_stats.received_count);
325 depth.signal(1);
326
327 return {seastar::now()};
328 }
329
330 // should start messenger at this shard?
331 bool is_active() {
332 ceph_assert(seastar::this_shard_id() == sid);
333 return sid != 0 && sid <= jobs;
334 }
335
336 seastar::future<> init() {
337 return container().invoke_on_all([] (auto& client) {
338 if (client.is_active()) {
339 client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
340 client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
341 client.msgr->set_auth_client(&client.dummy_auth);
342 client.msgr->set_auth_server(&client.dummy_auth);
343 return client.msgr->start({&client});
344 }
345 return seastar::now();
346 });
347 }
348
349 seastar::future<> shutdown() {
350 return container().invoke_on_all([] (auto& client) {
351 if (client.is_active()) {
352 logger().info("{} shutdown...", client.lname);
353 ceph_assert(client.msgr);
354 client.msgr->stop();
355 return client.msgr->shutdown().then([&client] {
356 return client.stop_dispatch_messages();
357 });
358 }
359 return seastar::now();
360 });
361 }
362
363 seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
364 return container().invoke_on_all([peer_addr] (auto& client) {
365 // start clients in active cores (#1 ~ #jobs)
366 if (client.is_active()) {
367 mono_time start_time = mono_clock::now();
368 client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
369 // make sure handshake won't hurt the performance
370 return seastar::sleep(1s).then([&client, start_time] {
371 if (client.conn_stats.connected_time == mono_clock::zero()) {
372 logger().error("\n{} not connected after 1s!\n", client.lname);
373 ceph_assert(false);
374 }
375 client.conn_stats.connecting_time = start_time;
376 });
377 }
378 return seastar::now();
379 });
380 }
381
382 private:
383 class TimerReport {
384 private:
385 const unsigned jobs;
386 const unsigned msgtime;
387 const unsigned bytes_of_block;
388
389 unsigned elapsed = 0u;
390 std::vector<mono_time> start_times;
391 std::vector<PeriodStats> snaps;
392 std::vector<ConnStats> summaries;
393
394 public:
395 TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
396 : jobs{jobs},
397 msgtime{msgtime},
398 bytes_of_block{bs},
399 start_times{jobs, mono_clock::zero()},
400 snaps{jobs},
401 summaries{jobs} {}
402
403 unsigned get_elapsed() const { return elapsed; }
404
405 PeriodStats& get_snap_by_job(seastar::shard_id sid) {
406 ceph_assert(sid >= 1 && sid <= jobs);
407 return snaps[sid - 1];
408 }
409
410 ConnStats& get_summary_by_job(seastar::shard_id sid) {
411 ceph_assert(sid >= 1 && sid <= jobs);
412 return summaries[sid - 1];
413 }
414
415 bool should_stop() const {
416 return elapsed >= msgtime;
417 }
418
419 seastar::future<> ticktock() {
420 return seastar::sleep(1s).then([this] {
421 ++elapsed;
422 });
423 }
424
425 void report_header() {
426 std::ostringstream sout;
427 sout << std::setfill(' ')
428 << std::setw(7) << "sec"
429 << std::setw(6) << "depth"
430 << std::setw(8) << "IOPS"
431 << std::setw(8) << "MB/s"
432 << std::setw(8) << "lat(ms)";
433 std::cout << sout.str() << std::endl;
434 }
435
436 void report_period() {
437 if (elapsed == 1) {
438 // init this->start_times at the first period
439 for (unsigned i=0; i<jobs; ++i) {
440 start_times[i] = snaps[i].start_time;
441 }
442 }
443 std::chrono::duration<double> elapsed_d = 0s;
444 unsigned depth = 0u;
445 unsigned ops = 0u;
446 unsigned sampled_count = 0u;
447 double total_lat_s = 0.0;
448 for (const auto& snap: snaps) {
449 elapsed_d += (snap.finish_time - snap.start_time);
450 depth += snap.depth;
451 ops += (snap.finish_count - snap.start_count);
452 sampled_count += snap.sampled_count;
453 total_lat_s += snap.total_lat_s;
454 }
455 double elapsed_s = elapsed_d.count() / jobs;
456 double iops = ops/elapsed_s;
457 std::ostringstream sout;
458 sout << setfill(' ')
459 << std::setw(7) << elapsed_s
460 << std::setw(6) << depth
461 << std::setw(8) << iops
462 << std::setw(8) << iops * bytes_of_block / 1048576
463 << std::setw(8) << (total_lat_s / sampled_count * 1000);
464 std::cout << sout.str() << std::endl;
465 }
466
467 void report_summary() const {
468 std::chrono::duration<double> elapsed_d = 0s;
469 unsigned ops = 0u;
470 unsigned sampled_count = 0u;
471 double total_lat_s = 0.0;
472 for (const auto& summary: summaries) {
473 elapsed_d += (summary.finish_time - summary.start_time);
474 ops += (summary.received_count - summary.start_count);
475 sampled_count += summary.sampled_count;
476 total_lat_s += summary.total_lat_s;
477 }
478 double elapsed_s = elapsed_d.count() / jobs;
479 double iops = ops / elapsed_s;
480 std::ostringstream sout;
481 sout << "--------------"
482 << " summary "
483 << "--------------\n"
484 << setfill(' ')
485 << std::setw(7) << elapsed_s
486 << std::setw(6) << "-"
487 << std::setw(8) << iops
488 << std::setw(8) << iops * bytes_of_block / 1048576
489 << std::setw(8) << (total_lat_s / sampled_count * 1000)
490 << "\n";
491 std::cout << sout.str() << std::endl;
492 }
493 };
494
495 seastar::future<> report_period(TimerReport& report) {
496 return container().invoke_on_all([&report] (auto& client) {
497 if (client.is_active()) {
498 PeriodStats& snap = report.get_snap_by_job(client.sid);
499 client.period_stats.reset(client.conn_stats.received_count,
500 &snap);
501 snap.depth = client.get_current_depth();
502 }
503 }).then([&report] {
504 report.report_period();
505 });
506 }
507
508 seastar::future<> report_summary(TimerReport& report) {
509 return container().invoke_on_all([&report] (auto& client) {
510 if (client.is_active()) {
511 ConnStats& summary = report.get_summary_by_job(client.sid);
512 summary = client.conn_stats;
513 summary.finish_time = mono_clock::now();
514 }
515 }).then([&report] {
516 report.report_summary();
517 });
518 }
519
520 public:
521 seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
522 logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
523 return container().invoke_on_all([] (auto& client) {
524 if (client.is_active()) {
525 client.do_dispatch_messages(client.active_conn.get());
526 }
527 }).then([ramptime] {
528 logger().info("[all clients]: ramping up {} seconds...", ramptime);
529 return seastar::sleep(std::chrono::seconds(ramptime));
530 }).then([this] {
531 return container().invoke_on_all([] (auto& client) {
532 if (client.is_active()) {
533 client.conn_stats.start();
534 client.period_stats.reset(client.conn_stats.received_count);
535 }
536 });
537 }).then([this, msgtime] {
538 logger().info("[all clients]: reporting {} seconds...\n", msgtime);
539 return seastar::do_with(
540 TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
541 report.report_header();
542 return seastar::do_until(
543 [&report] { return report.should_stop(); },
544 [&report, this] {
545 return report.ticktock().then([&report, this] {
546 // report period every 1s
547 return report_period(report);
548 }).then([&report, this] {
549 // report summary every 10s
550 if (report.get_elapsed() % 10 == 0) {
551 return report_summary(report);
552 } else {
553 return seastar::now();
554 }
555 });
556 }
557 ).then([&report, this] {
558 // report the final summary
559 if (report.get_elapsed() % 10 != 0) {
560 return report_summary(report);
561 } else {
562 return seastar::now();
563 }
564 });
565 });
566 });
567 }
568
569 private:
570 seastar::future<> send_msg(crimson::net::Connection* conn) {
571 ceph_assert(seastar::this_shard_id() == sid);
572 return depth.wait(1).then([this, conn] {
573 const static pg_t pgid;
574 const static object_locator_t oloc;
575 const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
576 pgid.pool(), oloc.nspace);
577 static spg_t spgid(pgid);
578 auto m = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
579 bufferlist data(msg_data);
580 m->write(0, msg_len, data);
581 // use tid as the identity of each round
582 m->set_tid(sent_count);
583
584 // sample message latency
585 if (sent_count % SAMPLE_RATE == 0) {
586 auto index = sent_count % time_msgs_sent.size();
587 ceph_assert(time_msgs_sent[index] == mono_clock::zero());
588 time_msgs_sent[index] = mono_clock::now();
589 }
590
591 return conn->send(std::move(m));
592 });
593 }
594
595 class DepthBroken: public std::exception {};
596
597 seastar::future<> stop_dispatch_messages() {
598 stop_send = true;
599 depth.broken(DepthBroken());
600 return stopped_send_promise.get_future();
601 }
602
603 void do_dispatch_messages(crimson::net::Connection* conn) {
604 ceph_assert(seastar::this_shard_id() == sid);
605 ceph_assert(sent_count == 0);
606 conn_stats.start_time = mono_clock::now();
607 // forwarded to stopped_send_promise
608 (void) seastar::do_until(
609 [this] { return stop_send; },
610 [this, conn] {
611 sent_count += 1;
612 return send_msg(conn);
613 }
614 ).handle_exception_type([] (const DepthBroken& e) {
615 // ok, stopped by stop_dispatch_messages()
616 }).then([this, conn] {
617 std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
618 std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
619 unsigned ops = conn_stats.received_count - conn_stats.start_count;
620 logger().info("{}: stopped sending OSDOPs.\n"
621 "{}(depth={}):\n"
622 " connect time: {}s\n"
623 " messages received: {}\n"
624 " messaging time: {}s\n"
625 " latency: {}ms\n"
626 " IOPS: {}\n"
627 " throughput: {}MB/s\n",
628 *conn,
629 lname,
630 nr_depth,
631 dur_conn.count(),
632 ops,
633 dur_msg.count(),
634 conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
635 ops / dur_msg.count(),
636 ops / dur_msg.count() * msg_len / 1048576);
637 stopped_send_promise.set_value();
638 });
639 }
640 };
641 };
642
643 return seastar::when_all(
644 test_state::Server::create(server_conf.core, server_conf.block_size),
645 create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
646 crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
647 return crimson::common::local_conf().start();
648 }).then([crc_enabled] {
649 return crimson::common::local_conf().set_val(
650 "ms_crc_data", crc_enabled ? "true" : "false");
651 })
652 ).then([=](auto&& ret) {
653 auto fp_server = std::move(std::get<0>(ret).get0());
654 auto client = std::move(std::get<1>(ret).get0());
655 test_state::Server* server = fp_server.get();
656 if (mode == perf_mode_t::both) {
657 logger().info("\nperf settings:\n {}\n {}\n",
658 client_conf.str(), server_conf.str());
659 ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
660 ceph_assert(client_conf.jobs > 0);
661 ceph_assert(seastar::smp::count >= 1+server_conf.core);
662 ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
663 return seastar::when_all_succeed(
664 server->init(server_conf.addr),
665 client->init()
666 ).then_unpack([client, addr = client_conf.server_addr] {
667 return client->connect_wait_verify(addr);
668 }).then([client, ramptime = client_conf.ramptime,
669 msgtime = client_conf.msgtime] {
670 return client->dispatch_with_timer(ramptime, msgtime);
671 }).then([client] {
672 return client->shutdown();
673 }).then([server, fp_server = std::move(fp_server)] () mutable {
674 return server->shutdown().then([cleanup = std::move(fp_server)] {});
675 });
676 } else if (mode == perf_mode_t::client) {
677 logger().info("\nperf settings:\n {}\n", client_conf.str());
678 ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
679 ceph_assert(client_conf.jobs > 0);
680 return client->init(
681 ).then([client, addr = client_conf.server_addr] {
682 return client->connect_wait_verify(addr);
683 }).then([client, ramptime = client_conf.ramptime,
684 msgtime = client_conf.msgtime] {
685 return client->dispatch_with_timer(ramptime, msgtime);
686 }).then([client] {
687 return client->shutdown();
688 });
689 } else { // mode == perf_mode_t::server
690 ceph_assert(seastar::smp::count >= 1+server_conf.core);
691 logger().info("\nperf settings:\n {}\n", server_conf.str());
692 return server->init(server_conf.addr
693 // dispatch ops
694 ).then([server] {
695 return server->wait();
696 // shutdown
697 }).then([server, fp_server = std::move(fp_server)] () mutable {
698 return server->shutdown().then([cleanup = std::move(fp_server)] {});
699 });
700 }
701 }).finally([] {
702 return crimson::common::sharded_conf().stop();
703 });
704 }
705
706 }
707
708 int main(int argc, char** argv)
709 {
710 seastar::app_template app;
711 app.add_options()
712 ("mode", bpo::value<unsigned>()->default_value(0),
713 "0: both, 1:client, 2:server")
714 ("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
715 "server address(only support msgr v2 protocol)")
716 ("ramptime", bpo::value<unsigned>()->default_value(5),
717 "seconds of client ramp-up time")
718 ("msgtime", bpo::value<unsigned>()->default_value(15),
719 "seconds of client messaging time")
720 ("jobs", bpo::value<unsigned>()->default_value(1),
721 "number of client jobs (messengers)")
722 ("cbs", bpo::value<unsigned>()->default_value(4096),
723 "client block size")
724 ("depth", bpo::value<unsigned>()->default_value(512),
725 "client io depth")
726 ("core", bpo::value<unsigned>()->default_value(0),
727 "server running core")
728 ("sbs", bpo::value<unsigned>()->default_value(0),
729 "server block size")
730 ("crc-enabled", bpo::value<bool>()->default_value(false),
731 "enable CRC checks");
732 return app.run(argc, argv, [&app] {
733 auto&& config = app.configuration();
734 auto mode = config["mode"].as<unsigned>();
735 ceph_assert(mode <= 2);
736 auto _mode = static_cast<perf_mode_t>(mode);
737 bool crc_enabled = config["crc-enabled"].as<bool>();
738 auto server_conf = server_config::load(config);
739 auto client_conf = client_config::load(config);
740 return run(_mode, client_conf, server_conf, crc_enabled
741 ).then([] {
742 logger().info("\nsuccessful!\n");
743 }).handle_exception([] (auto eptr) {
744 logger().info("\nfailed!\n");
745 return seastar::make_exception_future<>(eptr);
746 });
747 });
748 }