X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fcrimson%2Ftools%2Fperf_crimson_msgr.cc;h=aa5753442e28e44ff32d093226fc02feb8b0ddf7;hb=aee94f6923ba628a85d855d0c5316d0da78bfa2a;hp=ef5602b0f27bf8b8ed2c9b31c1a6099024406495;hpb=27f45121cc74e31203777ad565f78d8aad9b92a2;p=ceph.git diff --git a/ceph/src/crimson/tools/perf_crimson_msgr.cc b/ceph/src/crimson/tools/perf_crimson_msgr.cc index ef5602b0f..aa5753442 100644 --- a/ceph/src/crimson/tools/perf_crimson_msgr.cc +++ b/ceph/src/crimson/tools/perf_crimson_msgr.cc @@ -2,19 +2,22 @@ // vim: ts=8 sw=2 smarttab #include -#include #include +#include #include #include #include +#include #include #include #include #include +#include #include "common/ceph_time.h" #include "messages/MOSDOp.h" +#include "include/random.h" #include "crimson/auth/DummyAuth.h" #include "crimson/common/log.h" @@ -22,10 +25,13 @@ #include "crimson/net/Connection.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Messenger.h" +#include "crimson/osd/stop_signal.h" using namespace std; using namespace std::chrono_literals; +using lowres_clock_t = seastar::lowres_system_clock; + namespace bpo = boost::program_options; namespace { @@ -54,6 +60,19 @@ seastar::future create_sharded(Args... args) { }); } +double get_reactor_utilization() { + auto &value_map = seastar::metrics::impl::get_value_map(); + auto found = value_map.find("reactor_utilization"); + assert(found != value_map.end()); + auto &[full_name, metric_family] = *found; + std::ignore = full_name; + assert(metric_family.size() == 1); + const auto& [labels, metric] = *metric_family.begin(); + std::ignore = labels; + auto value = (*metric)(); + return value.ui(); +} + enum class perf_mode_t { both, client, @@ -65,8 +84,10 @@ struct client_config { unsigned block_size; unsigned ramptime; unsigned msgtime; - unsigned jobs; + unsigned num_clients; + unsigned num_conns; unsigned depth; + bool skip_core_0; std::string str() const { std::ostringstream out; @@ -74,8 +95,10 @@ struct client_config { << "](bs=" << block_size << ", ramptime=" << ramptime << ", msgtime=" << msgtime - << ", jobs=" << jobs + << ", num_clients=" << num_clients + << ", num_conns=" << num_conns << ", depth=" << depth + << ", skip_core_0=" << skip_core_0 << ")"; return out.str(); } @@ -83,16 +106,19 @@ struct client_config { static client_config load(bpo::variables_map& options) { client_config conf; entity_addr_t addr; - ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); + ceph_assert(addr.parse(options["server-addr"].as().c_str(), nullptr)); ceph_assert_always(addr.is_msgr2()); conf.server_addr = addr; - conf.block_size = options["cbs"].as(); + conf.block_size = options["client-bs"].as(); conf.ramptime = options["ramptime"].as(); conf.msgtime = options["msgtime"].as(); - conf.jobs = options["jobs"].as(); + conf.num_clients = options["clients"].as(); + ceph_assert_always(conf.num_clients > 0); + conf.num_conns = options["conns-per-client"].as(); + ceph_assert_always(conf.num_conns > 0); conf.depth = options["depth"].as(); - ceph_assert(conf.depth % conf.jobs == 0); + conf.skip_core_0 = options["client-skip-core-0"].as(); return conf; } }; @@ -100,12 +126,14 @@ struct client_config { struct server_config { entity_addr_t addr; unsigned block_size; + bool is_fixed_cpu; unsigned core; std::string str() const { std::ostringstream out; out << "server[" << addr << "](bs=" << block_size + << ", is_fixed_cpu=" << is_fixed_cpu << ", core=" << core << ")"; return out.str(); @@ -114,17 +142,18 @@ struct server_config { static server_config load(bpo::variables_map& options) { server_config conf; entity_addr_t addr; - ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); + ceph_assert(addr.parse(options["server-addr"].as().c_str(), nullptr)); ceph_assert_always(addr.is_msgr2()); conf.addr = addr; - conf.block_size = options["sbs"].as(); - conf.core = options["core"].as(); + conf.block_size = options["server-bs"].as(); + conf.is_fixed_cpu = options["server-fixed-cpu"].as(); + conf.core = options["server-core"].as(); return conf; } }; -const unsigned SAMPLE_RATE = 7; +const unsigned SAMPLE_RATE = 256; static seastar::future<> run( perf_mode_t mode, @@ -133,30 +162,68 @@ static seastar::future<> run( bool crc_enabled) { struct test_state { - struct Server; - using ServerFRef = seastar::foreign_ptr>; - struct Server final - : public crimson::net::Dispatcher { + : public crimson::net::Dispatcher, + public seastar::peering_sharded_service { + // available only in msgr_sid crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; const seastar::shard_id msgr_sid; std::string lname; + + bool is_fixed_cpu = true; + bool is_stopped = false; + std::optional> fut_report; + + unsigned conn_count = 0; + unsigned msg_count = 0; + MessageRef last_msg; + + // available in all shards unsigned msg_len; bufferlist msg_data; - Server(unsigned msg_len) - : msgr_sid{seastar::this_shard_id()}, + Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report) + : msgr_sid{msgr_sid}, msg_len{msg_len} { - lname = "server#"; - lname += std::to_string(msgr_sid); + lname = fmt::format("server@{}", msgr_sid); msg_data.append_zero(msg_len); + + if (seastar::this_shard_id() == msgr_sid && + needs_report) { + start_report(); + } + } + + void ms_handle_connect( + crimson::net::ConnectionRef, + seastar::shard_id) override { + ceph_abort("impossible, server won't connect"); + } + + void ms_handle_accept( + crimson::net::ConnectionRef, + seastar::shard_id new_shard, + bool is_replace) override { + ceph_assert_always(new_shard == seastar::this_shard_id()); + auto &server = container().local(); + ++server.conn_count; + } + + void ms_handle_reset( + crimson::net::ConnectionRef, + bool) override { + auto &server = container().local(); + --server.conn_count; } std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { + assert(c->get_shard_id() == seastar::this_shard_id()); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + auto &server = container().local(); + // server replies with MOSDOp to generate server-side write workload const static pg_t pgid; const static object_locator_t oloc; @@ -164,22 +231,32 @@ static seastar::future<> run( pgid.pool(), oloc.nspace); static spg_t spgid(pgid); auto rep = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); - bufferlist data(msg_data); - rep->write(0, msg_len, data); + bufferlist data(server.msg_data); + rep->write(0, server.msg_len, data); rep->set_tid(m->get_tid()); + ++server.msg_count; std::ignore = c->send(std::move(rep)); + + if (server.msg_count % 16 == 0) { + server.last_msg = std::move(m); + } return {seastar::now()}; } - seastar::future<> init(const entity_addr_t& addr) { - return seastar::smp::submit_to(msgr_sid, [addr, this] { + seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) { + return container().invoke_on( + msgr_sid, [addr, is_fixed_cpu](auto &server) { // server msgr is always with nonce 0 - msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0); - msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - msgr->set_auth_client(&dummy_auth); - msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { - return msgr->start({this}); + server.msgr = crimson::net::Messenger::create( + entity_name_t::OSD(server.msgr_sid), + server.lname, 0, is_fixed_cpu); + server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + server.msgr->set_auth_client(&server.dummy_auth); + server.msgr->set_auth_server(&server.dummy_auth); + server.is_fixed_cpu = is_fixed_cpu; + return server.msgr->bind(entity_addrvec_t{addr} + ).safe_then([&server] { + return server.msgr->start({&server}); }, crimson::net::Messenger::bind_ertr::all_same_way( [addr] (const std::error_code& e) { logger().error("Server: " @@ -188,25 +265,161 @@ static seastar::future<> run( })); }); } + seastar::future<> shutdown() { logger().info("{} shutdown...", lname); - return seastar::smp::submit_to(msgr_sid, [this] { - ceph_assert(msgr); - msgr->stop(); - return msgr->shutdown(); + return container().invoke_on( + msgr_sid, [](auto &server) { + server.is_stopped = true; + ceph_assert(server.msgr); + server.msgr->stop(); + return server.msgr->shutdown( + ).then([&server] { + if (server.fut_report.has_value()) { + return std::move(server.fut_report.value()); + } else { + return seastar::now(); + } + }); }); } - seastar::future<> wait() { - return seastar::smp::submit_to(msgr_sid, [this] { - ceph_assert(msgr); - return msgr->wait(); - }); + + private: + struct ShardReport { + unsigned msg_count = 0; + + // per-interval metrics + double reactor_utilization; + unsigned conn_count = 0; + int msg_size = 0; + unsigned msg_count_interval = 0; + }; + + // should not be called frequently to impact performance + void get_report(ShardReport& last) { + unsigned last_msg_count = last.msg_count; + int msg_size = -1; + if (last_msg) { + auto msg = boost::static_pointer_cast(last_msg); + msg->finish_decode(); + ceph_assert_always(msg->ops.size() == 1); + msg_size = msg->ops[0].op.extent.length; + last_msg.reset(); + } + + last.msg_count = msg_count; + last.reactor_utilization = get_reactor_utilization(); + last.conn_count = conn_count; + last.msg_size = msg_size; + last.msg_count_interval = msg_count - last_msg_count; } - static seastar::future create(seastar::shard_id msgr_sid, unsigned msg_len) { - return seastar::smp::submit_to(msgr_sid, [msg_len] { - return seastar::make_foreign(std::make_unique(msg_len)); - }); + struct TimerReport { + unsigned elapsed = 0u; + mono_time start_time = mono_clock::zero(); + std::vector reports; + + TimerReport(unsigned shards) : reports(shards) {} + }; + + void start_report() { + seastar::promise<> pr_report; + fut_report = pr_report.get_future(); + seastar::do_with( + TimerReport(seastar::smp::count), + [this](auto &report) { + return seastar::do_until( + [this] { return is_stopped; }, + [&report, this] { + return seastar::sleep(2s + ).then([&report, this] { + report.elapsed += 2; + if (is_fixed_cpu) { + return seastar::smp::submit_to(msgr_sid, + [&report, this] { + auto &server = container().local(); + server.get_report(report.reports[seastar::this_shard_id()]); + }).then([&report, this] { + auto now = mono_clock::now(); + auto prv = report.start_time; + report.start_time = now; + if (prv == mono_clock::zero()) { + // cannot compute duration + return; + } + std::chrono::duration duration_d = now - prv; + double duration = duration_d.count(); + auto &ireport = report.reports[msgr_sid]; + double iops = ireport.msg_count_interval / duration; + double throughput_MB = -1; + if (ireport.msg_size >= 0) { + throughput_MB = iops * ireport.msg_size / 1048576; + } + std::ostringstream sout; + sout << setfill(' ') + << report.elapsed + << "(" << std::setw(5) << duration << ") " + << std::setw(9) << iops << "IOPS " + << std::setw(8) << throughput_MB << "MiB/s " + << ireport.reactor_utilization + << "(" << ireport.conn_count << ")"; + std::cout << sout.str() << std::endl; + }); + } else { + return seastar::smp::invoke_on_all([&report, this] { + auto &server = container().local(); + server.get_report(report.reports[seastar::this_shard_id()]); + }).then([&report, this] { + auto now = mono_clock::now(); + auto prv = report.start_time; + report.start_time = now; + if (prv == mono_clock::zero()) { + // cannot compute duration + return; + } + std::chrono::duration duration_d = now - prv; + double duration = duration_d.count(); + unsigned num_msgs = 0; + // -1 means unavailable, -2 means mismatch + int msg_size = -1; + for (auto &i : report.reports) { + if (i.msg_size >= 0) { + if (msg_size == -2) { + // pass + } else if (msg_size == -1) { + msg_size = i.msg_size; + } else { + if (msg_size != i.msg_size) { + msg_size = -2; + } + } + } + num_msgs += i.msg_count_interval; + } + double iops = num_msgs / duration; + double throughput_MB = msg_size; + if (msg_size >= 0) { + throughput_MB = iops * msg_size / 1048576; + } + std::ostringstream sout; + sout << setfill(' ') + << report.elapsed + << "(" << std::setw(5) << duration << ") " + << std::setw(9) << iops << "IOPS " + << std::setw(8) << throughput_MB << "MiB/s "; + for (auto &i : report.reports) { + sout << i.reactor_utilization + << "(" << i.conn_count << ") "; + } + std::cout << sout.str() << std::endl; + }); + } + }); + } + ); + }).then([this] { + logger().info("report is stopped!"); + }).forward_to(std::move(pr_report)); } }; @@ -223,106 +436,212 @@ static seastar::future<> run( unsigned start_count = 0u; unsigned sampled_count = 0u; - double total_lat_s = 0.0; + double sampled_total_lat_s = 0.0; // for reporting only mono_time finish_time = mono_clock::zero(); - void start() { + void start_connecting() { + connecting_time = mono_clock::now(); + } + + void finish_connecting() { + ceph_assert_always(connected_time == mono_clock::zero()); + connected_time = mono_clock::now(); + } + + void start_collect() { + ceph_assert_always(connected_time != mono_clock::zero()); start_time = mono_clock::now(); start_count = received_count; sampled_count = 0u; - total_lat_s = 0.0; + sampled_total_lat_s = 0.0; finish_time = mono_clock::zero(); } + + void prepare_summary(const ConnStats ¤t) { + *this = current; + finish_time = mono_clock::now(); + } }; - ConnStats conn_stats; struct PeriodStats { mono_time start_time = mono_clock::zero(); unsigned start_count = 0u; unsigned sampled_count = 0u; - double total_lat_s = 0.0; + double sampled_total_lat_s = 0.0; // for reporting only mono_time finish_time = mono_clock::zero(); unsigned finish_count = 0u; unsigned depth = 0u; - void reset(unsigned received_count, PeriodStats* snap = nullptr) { - if (snap) { - snap->start_time = start_time; - snap->start_count = start_count; - snap->sampled_count = sampled_count; - snap->total_lat_s = total_lat_s; - snap->finish_time = mono_clock::now(); - snap->finish_count = received_count; - } + void start_collect(unsigned received_count) { start_time = mono_clock::now(); start_count = received_count; sampled_count = 0u; - total_lat_s = 0.0; + sampled_total_lat_s = 0.0; + } + + void reset_period( + unsigned received_count, unsigned _depth, PeriodStats &snapshot) { + snapshot.start_time = start_time; + snapshot.start_count = start_count; + snapshot.sampled_count = sampled_count; + snapshot.sampled_total_lat_s = sampled_total_lat_s; + snapshot.finish_time = mono_clock::now(); + snapshot.finish_count = received_count; + snapshot.depth = _depth; + + start_collect(received_count); + } + }; + + struct JobReport { + std::string name; + unsigned depth = 0; + double connect_time_s = 0; + unsigned total_msgs = 0; + double messaging_time_s = 0; + double latency_ms = 0; + double iops = 0; + double throughput_mbps = 0; + + void account(const JobReport &stats) { + depth += stats.depth; + connect_time_s += stats.connect_time_s; + total_msgs += stats.total_msgs; + messaging_time_s += stats.messaging_time_s; + latency_ms += stats.latency_ms; + iops += stats.iops; + throughput_mbps += stats.throughput_mbps; + } + + void report() const { + auto str = fmt::format( + "{}(depth={}):\n" + " connect time: {:08f}s\n" + " messages received: {}\n" + " messaging time: {:08f}s\n" + " latency: {:08f}ms\n" + " IOPS: {:08f}\n" + " out throughput: {:08f}MB/s", + name, depth, connect_time_s, + total_msgs, messaging_time_s, + latency_ms, iops, + throughput_mbps); + std::cout << str << std::endl; + } + }; + + struct ConnectionPriv : public crimson::net::Connection::user_private_t { + unsigned index; + ConnectionPriv(unsigned i) : index{i} {} + }; + + struct ConnState { + crimson::net::MessengerRef msgr; + ConnStats conn_stats; + PeriodStats period_stats; + seastar::semaphore depth; + std::vector time_msgs_sent; + unsigned sent_count = 0u; + crimson::net::ConnectionRef active_conn; + bool stop_send = false; + seastar::promise stopped_send_promise; + + ConnState(std::size_t _depth) + : depth{_depth}, + time_msgs_sent{_depth, lowres_clock_t::time_point::min()} {} + + unsigned get_current_units() const { + ceph_assert(depth.available_units() >= 0); + return depth.current(); + } + + seastar::future stop_dispatch_messages() { + stop_send = true; + depth.broken(DepthBroken()); + return stopped_send_promise.get_future(); } }; - PeriodStats period_stats; const seastar::shard_id sid; - std::string lname; + const unsigned id; + const std::optional server_sid; - const unsigned jobs; - crimson::net::MessengerRef msgr; + const unsigned num_clients; + const unsigned num_conns; const unsigned msg_len; bufferlist msg_data; const unsigned nr_depth; - seastar::semaphore depth; - std::vector time_msgs_sent; + const unsigned nonce_base; crimson::auth::DummyAuthClientServer dummy_auth; - unsigned sent_count = 0u; - crimson::net::ConnectionRef active_conn = nullptr; + std::vector conn_states; - bool stop_send = false; - seastar::promise<> stopped_send_promise; - - Client(unsigned jobs, unsigned msg_len, unsigned depth) + Client(unsigned num_clients, + unsigned num_conns, + unsigned msg_len, + unsigned _depth, + unsigned nonce_base, + std::optional server_sid) : sid{seastar::this_shard_id()}, - jobs{jobs}, + id{sid + num_clients - seastar::smp::count}, + server_sid{server_sid}, + num_clients{num_clients}, + num_conns{num_conns}, msg_len{msg_len}, - nr_depth{depth/jobs}, - depth{nr_depth}, - time_msgs_sent{depth/jobs, mono_clock::zero()} { - lname = "client#"; - lname += std::to_string(sid); + nr_depth{_depth}, + nonce_base{nonce_base} { + if (is_active()) { + for (unsigned i = 0; i < num_conns; ++i) { + conn_states.emplace_back(nr_depth); + } + } msg_data.append_zero(msg_len); } - unsigned get_current_depth() const { - ceph_assert(depth.available_units() >= 0); - return nr_depth - depth.current(); + std::string get_name(unsigned i) { + return fmt::format("client{}Conn{}@{}", id, i, sid); } - void ms_handle_connect(crimson::net::ConnectionRef conn) override { - conn_stats.connected_time = mono_clock::now(); + void ms_handle_connect( + crimson::net::ConnectionRef conn, + seastar::shard_id prv_shard) override { + ceph_assert_always(prv_shard == seastar::this_shard_id()); + assert(is_active()); + unsigned index = static_cast(conn->get_user_private()).index; + auto &conn_state = conn_states[index]; + conn_state.conn_stats.finish_connecting(); } + std::optional> ms_dispatch( - crimson::net::ConnectionRef, MessageRef m) override { + crimson::net::ConnectionRef conn, MessageRef m) override { + assert(is_active()); // server replies with MOSDOp to generate server-side write workload ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + unsigned index = static_cast(conn->get_user_private()).index; + assert(index < num_conns); + auto &conn_state = conn_states[index]; + auto msg_id = m->get_tid(); if (msg_id % SAMPLE_RATE == 0) { - auto index = msg_id % time_msgs_sent.size(); - ceph_assert(time_msgs_sent[index] != mono_clock::zero()); - std::chrono::duration cur_latency = mono_clock::now() - time_msgs_sent[index]; - conn_stats.total_lat_s += cur_latency.count(); - ++(conn_stats.sampled_count); - period_stats.total_lat_s += cur_latency.count(); - ++(period_stats.sampled_count); - time_msgs_sent[index] = mono_clock::zero(); + auto msg_index = msg_id % conn_state.time_msgs_sent.size(); + ceph_assert(conn_state.time_msgs_sent[msg_index] != + lowres_clock_t::time_point::min()); + std::chrono::duration cur_latency = + lowres_clock_t::now() - conn_state.time_msgs_sent[msg_index]; + conn_state.conn_stats.sampled_total_lat_s += cur_latency.count(); + ++(conn_state.conn_stats.sampled_count); + conn_state.period_stats.sampled_total_lat_s += cur_latency.count(); + ++(conn_state.period_stats.sampled_count); + conn_state.time_msgs_sent[msg_index] = lowres_clock_t::time_point::min(); } - ++(conn_stats.received_count); - depth.signal(1); + ++(conn_state.conn_stats.received_count); + conn_state.depth.signal(1); return {seastar::now()}; } @@ -330,49 +649,115 @@ static seastar::future<> run( // should start messenger at this shard? bool is_active() { ceph_assert(seastar::this_shard_id() == sid); - return sid != 0 && sid <= jobs; + return sid + num_clients >= seastar::smp::count; } seastar::future<> init() { - return container().invoke_on_all([] (auto& client) { + return container().invoke_on_all([](auto& client) { if (client.is_active()) { - client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid); - client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - client.msgr->set_auth_client(&client.dummy_auth); - client.msgr->set_auth_server(&client.dummy_auth); - return client.msgr->start({&client}); + return seastar::do_for_each( + boost::make_counting_iterator(0u), + boost::make_counting_iterator(client.num_conns), + [&client](auto i) { + auto &conn_state = client.conn_states[i]; + std::string name = client.get_name(i); + conn_state.msgr = crimson::net::Messenger::create( + entity_name_t::OSD(client.id * client.num_conns + i), + name, client.nonce_base + client.id * client.num_conns + i, true); + conn_state.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + conn_state.msgr->set_auth_client(&client.dummy_auth); + conn_state.msgr->set_auth_server(&client.dummy_auth); + return conn_state.msgr->start({&client}); + }); } return seastar::now(); }); } seastar::future<> shutdown() { - return container().invoke_on_all([] (auto& client) { - if (client.is_active()) { - logger().info("{} shutdown...", client.lname); - ceph_assert(client.msgr); - client.msgr->stop(); - return client.msgr->shutdown().then([&client] { - return client.stop_dispatch_messages(); + return seastar::do_with( + std::vector(num_clients * num_conns), + [this](auto &all_stats) { + return container().invoke_on_all([&all_stats](auto& client) { + if (!client.is_active()) { + return seastar::now(); + } + + return seastar::parallel_for_each( + boost::make_counting_iterator(0u), + boost::make_counting_iterator(client.num_conns), + [&all_stats, &client](auto i) { + logger().info("{} shutdown...", client.get_name(i)); + auto &conn_state = client.conn_states[i]; + return conn_state.stop_dispatch_messages( + ).then([&all_stats, &client, i](auto stats) { + all_stats[client.id * client.num_conns + i] = stats; + }); + }).then([&client] { + return seastar::do_for_each( + boost::make_counting_iterator(0u), + boost::make_counting_iterator(client.num_conns), + [&client](auto i) { + auto &conn_state = client.conn_states[i]; + ceph_assert(conn_state.msgr); + conn_state.msgr->stop(); + return conn_state.msgr->shutdown(); + }); }); - } - return seastar::now(); + }).then([&all_stats, this] { + auto nr_jobs = all_stats.size(); + JobReport summary; + std::vector clients(num_clients); + + for (unsigned i = 0; i < nr_jobs; ++i) { + auto &stats = all_stats[i]; + stats.report(); + clients[i / num_conns].account(stats); + summary.account(stats); + } + + std::cout << std::endl; + std::cout << "per client:" << std::endl; + for (unsigned i = 0; i < num_clients; ++i) { + auto &stats = clients[i]; + stats.name = fmt::format("client{}", i); + stats.connect_time_s /= num_conns; + stats.messaging_time_s /= num_conns; + stats.latency_ms /= num_conns; + stats.report(); + } + + std::cout << std::endl; + summary.name = fmt::format("all", nr_jobs); + summary.connect_time_s /= nr_jobs; + summary.messaging_time_s /= nr_jobs; + summary.latency_ms /= nr_jobs; + summary.report(); + }); }); } seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) { - return container().invoke_on_all([peer_addr] (auto& client) { - // start clients in active cores (#1 ~ #jobs) + return container().invoke_on_all([peer_addr](auto& client) { + // start clients in active cores if (client.is_active()) { - mono_time start_time = mono_clock::now(); - client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD); + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + conn_state.conn_stats.start_connecting(); + conn_state.active_conn = conn_state.msgr->connect(peer_addr, entity_name_t::TYPE_OSD); + conn_state.active_conn->set_user_private( + std::make_unique(i)); + } // make sure handshake won't hurt the performance - return seastar::sleep(1s).then([&client, start_time] { - if (client.conn_stats.connected_time == mono_clock::zero()) { - logger().error("\n{} not connected after 1s!\n", client.lname); - ceph_assert(false); + return seastar::sleep(1s).then([&client] { + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + if (conn_state.conn_stats.connected_time == mono_clock::zero()) { + logger().error("\n{} not connected after 1s!\n", + client.get_name(i)); + ceph_assert(false); + } } - client.conn_stats.connecting_time = start_time; }); } return seastar::now(); @@ -382,34 +767,43 @@ static seastar::future<> run( private: class TimerReport { private: - const unsigned jobs; + const unsigned num_clients; + const unsigned num_conns; const unsigned msgtime; const unsigned bytes_of_block; unsigned elapsed = 0u; - std::vector start_times; std::vector snaps; std::vector summaries; + std::vector client_reactor_utilizations; + std::optional server_reactor_utilization; public: - TimerReport(unsigned jobs, unsigned msgtime, unsigned bs) - : jobs{jobs}, + TimerReport(unsigned num_clients, unsigned num_conns, unsigned msgtime, unsigned bs) + : num_clients{num_clients}, + num_conns{num_conns}, msgtime{msgtime}, bytes_of_block{bs}, - start_times{jobs, mono_clock::zero()}, - snaps{jobs}, - summaries{jobs} {} + snaps{num_clients * num_conns}, + summaries{num_clients * num_conns}, + client_reactor_utilizations(num_clients) {} unsigned get_elapsed() const { return elapsed; } - PeriodStats& get_snap_by_job(seastar::shard_id sid) { - ceph_assert(sid >= 1 && sid <= jobs); - return snaps[sid - 1]; + PeriodStats& get_snap(unsigned client_id, unsigned i) { + return snaps[client_id * num_conns + i]; } - ConnStats& get_summary_by_job(seastar::shard_id sid) { - ceph_assert(sid >= 1 && sid <= jobs); - return summaries[sid - 1]; + ConnStats& get_summary(unsigned client_id, unsigned i) { + return summaries[client_id * num_conns + i]; + } + + void set_client_reactor_utilization(unsigned client_id, double ru) { + client_reactor_utilizations[client_id] = ru; + } + + void set_server_reactor_utilization(double ru) { + server_reactor_utilization = ru; } bool should_stop() const { @@ -422,45 +816,50 @@ static seastar::future<> run( }); } - void report_header() { + void report_header() const { std::ostringstream sout; sout << std::setfill(' ') - << std::setw(7) << "sec" - << std::setw(6) << "depth" - << std::setw(8) << "IOPS" - << std::setw(8) << "MB/s" - << std::setw(8) << "lat(ms)"; + << std::setw(6) << "sec" + << std::setw(7) << "depth" + << std::setw(10) << "IOPS" + << std::setw(9) << "MB/s" + << std::setw(9) << "lat(ms)"; std::cout << sout.str() << std::endl; } void report_period() { - if (elapsed == 1) { - // init this->start_times at the first period - for (unsigned i=0; i elapsed_d = 0s; unsigned depth = 0u; unsigned ops = 0u; unsigned sampled_count = 0u; - double total_lat_s = 0.0; + double sampled_total_lat_s = 0.0; for (const auto& snap: snaps) { elapsed_d += (snap.finish_time - snap.start_time); depth += snap.depth; ops += (snap.finish_count - snap.start_count); sampled_count += snap.sampled_count; - total_lat_s += snap.total_lat_s; + sampled_total_lat_s += snap.sampled_total_lat_s; } - double elapsed_s = elapsed_d.count() / jobs; + double elapsed_s = elapsed_d.count() / (num_clients * num_conns); double iops = ops/elapsed_s; std::ostringstream sout; sout << setfill(' ') - << std::setw(7) << elapsed_s + << std::setw(5) << elapsed_s + << " " << std::setw(6) << depth - << std::setw(8) << iops + << " " + << std::setw(9) << iops + << " " << std::setw(8) << iops * bytes_of_block / 1048576 - << std::setw(8) << (total_lat_s / sampled_count * 1000); + << " " + << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000) + << " -- "; + if (server_reactor_utilization.has_value()) { + sout << *server_reactor_utilization << " -- "; + } + for (double cru : client_reactor_utilizations) { + sout << cru << ","; + } std::cout << sout.str() << std::endl; } @@ -468,14 +867,14 @@ static seastar::future<> run( std::chrono::duration elapsed_d = 0s; unsigned ops = 0u; unsigned sampled_count = 0u; - double total_lat_s = 0.0; + double sampled_total_lat_s = 0.0; for (const auto& summary: summaries) { elapsed_d += (summary.finish_time - summary.start_time); ops += (summary.received_count - summary.start_count); sampled_count += summary.sampled_count; - total_lat_s += summary.total_lat_s; + sampled_total_lat_s += summary.sampled_total_lat_s; } - double elapsed_s = elapsed_d.count() / jobs; + double elapsed_s = elapsed_d.count() / (num_clients * num_conns); double iops = ops / elapsed_s; std::ostringstream sout; sout << "--------------" @@ -486,7 +885,7 @@ static seastar::future<> run( << std::setw(6) << "-" << std::setw(8) << iops << std::setw(8) << iops * bytes_of_block / 1048576 - << std::setw(8) << (total_lat_s / sampled_count * 1000) + << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000) << "\n"; std::cout << sout.str() << std::endl; } @@ -495,10 +894,20 @@ static seastar::future<> run( seastar::future<> report_period(TimerReport& report) { return container().invoke_on_all([&report] (auto& client) { if (client.is_active()) { - PeriodStats& snap = report.get_snap_by_job(client.sid); - client.period_stats.reset(client.conn_stats.received_count, - &snap); - snap.depth = client.get_current_depth(); + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + PeriodStats& snap = report.get_snap(client.id, i); + conn_state.period_stats.reset_period( + conn_state.conn_stats.received_count, + client.nr_depth - conn_state.get_current_units(), + snap); + } + report.set_client_reactor_utilization(client.id, get_reactor_utilization()); + } + if (client.server_sid.has_value() && + seastar::this_shard_id() == *client.server_sid) { + assert(!client.is_active()); + report.set_server_reactor_utilization(get_reactor_utilization()); } }).then([&report] { report.report_period(); @@ -508,9 +917,11 @@ static seastar::future<> run( seastar::future<> report_summary(TimerReport& report) { return container().invoke_on_all([&report] (auto& client) { if (client.is_active()) { - ConnStats& summary = report.get_summary_by_job(client.sid); - summary = client.conn_stats; - summary.finish_time = mono_clock::now(); + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + ConnStats& summary = report.get_summary(client.id, i); + summary.prepare_summary(conn_state.conn_stats); + } } }).then([&report] { report.report_summary(); @@ -519,10 +930,13 @@ static seastar::future<> run( public: seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) { - logger().info("[all clients]: start sending MOSDOps from {} clients", jobs); + logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns", + num_clients, num_conns); return container().invoke_on_all([] (auto& client) { if (client.is_active()) { - client.do_dispatch_messages(client.active_conn.get()); + for (unsigned i = 0; i < client.num_conns; ++i) { + client.do_dispatch_messages(i); + } } }).then([ramptime] { logger().info("[all clients]: ramping up {} seconds...", ramptime); @@ -530,14 +944,18 @@ static seastar::future<> run( }).then([this] { return container().invoke_on_all([] (auto& client) { if (client.is_active()) { - client.conn_stats.start(); - client.period_stats.reset(client.conn_stats.received_count); + for (unsigned i = 0; i < client.num_conns; ++i) { + auto &conn_state = client.conn_states[i]; + conn_state.conn_stats.start_collect(); + conn_state.period_stats.start_collect(conn_state.conn_stats.received_count); + } } }); }).then([this, msgtime] { logger().info("[all clients]: reporting {} seconds...\n", msgtime); return seastar::do_with( - TimerReport(jobs, msgtime, msg_len), [this] (auto& report) { + TimerReport(num_clients, num_conns, msgtime, msg_len), + [this](auto& report) { report.report_header(); return seastar::do_until( [&report] { return report.should_stop(); }, @@ -567,9 +985,11 @@ static seastar::future<> run( } private: - seastar::future<> send_msg(crimson::net::Connection* conn) { + seastar::future<> send_msg(ConnState &conn_state) { ceph_assert(seastar::this_shard_id() == sid); - return depth.wait(1).then([this, conn] { + conn_state.sent_count += 1; + return conn_state.depth.wait(1 + ).then([this, &conn_state] { const static pg_t pgid; const static object_locator_t oloc; const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), @@ -579,89 +999,132 @@ static seastar::future<> run( bufferlist data(msg_data); m->write(0, msg_len, data); // use tid as the identity of each round - m->set_tid(sent_count); + m->set_tid(conn_state.sent_count); // sample message latency - if (sent_count % SAMPLE_RATE == 0) { - auto index = sent_count % time_msgs_sent.size(); - ceph_assert(time_msgs_sent[index] == mono_clock::zero()); - time_msgs_sent[index] = mono_clock::now(); + if (unlikely(conn_state.sent_count % SAMPLE_RATE == 0)) { + auto index = conn_state.sent_count % conn_state.time_msgs_sent.size(); + ceph_assert(conn_state.time_msgs_sent[index] == + lowres_clock_t::time_point::min()); + conn_state.time_msgs_sent[index] = lowres_clock_t::now(); } - return conn->send(std::move(m)); + return conn_state.active_conn->send(std::move(m)); }); } class DepthBroken: public std::exception {}; - seastar::future<> stop_dispatch_messages() { - stop_send = true; - depth.broken(DepthBroken()); - return stopped_send_promise.get_future(); + seastar::future stop_dispatch_messages(unsigned i) { + auto &conn_state = conn_states[i]; + conn_state.stop_send = true; + conn_state.depth.broken(DepthBroken()); + return conn_state.stopped_send_promise.get_future(); } - void do_dispatch_messages(crimson::net::Connection* conn) { + void do_dispatch_messages(unsigned i) { ceph_assert(seastar::this_shard_id() == sid); - ceph_assert(sent_count == 0); - conn_stats.start_time = mono_clock::now(); + auto &conn_state = conn_states[i]; + ceph_assert(conn_state.sent_count == 0); + conn_state.conn_stats.start_time = mono_clock::now(); // forwarded to stopped_send_promise (void) seastar::do_until( - [this] { return stop_send; }, - [this, conn] { - sent_count += 1; - return send_msg(conn); - } + [&conn_state] { return conn_state.stop_send; }, + [this, &conn_state] { return send_msg(conn_state); } ).handle_exception_type([] (const DepthBroken& e) { // ok, stopped by stop_dispatch_messages() - }).then([this, conn] { - std::chrono::duration dur_conn = conn_stats.connected_time - conn_stats.connecting_time; - std::chrono::duration dur_msg = mono_clock::now() - conn_stats.start_time; - unsigned ops = conn_stats.received_count - conn_stats.start_count; - logger().info("{}: stopped sending OSDOPs.\n" - "{}(depth={}):\n" - " connect time: {}s\n" - " messages received: {}\n" - " messaging time: {}s\n" - " latency: {}ms\n" - " IOPS: {}\n" - " throughput: {}MB/s\n", - *conn, - lname, - nr_depth, - dur_conn.count(), - ops, - dur_msg.count(), - conn_stats.total_lat_s / conn_stats.sampled_count * 1000, - ops / dur_msg.count(), - ops / dur_msg.count() * msg_len / 1048576); - stopped_send_promise.set_value(); + }).then([this, &conn_state, i] { + std::string name = get_name(i); + logger().info("{} {}: stopped sending OSDOPs", + name, *conn_state.active_conn); + + std::chrono::duration dur_conn = + conn_state.conn_stats.connected_time - + conn_state.conn_stats.connecting_time; + std::chrono::duration dur_msg = + mono_clock::now() - conn_state.conn_stats.start_time; + unsigned ops = + conn_state.conn_stats.received_count - + conn_state.conn_stats.start_count; + + JobReport stats; + stats.name = name; + stats.depth = nr_depth; + stats.connect_time_s = dur_conn.count(); + stats.total_msgs = ops; + stats.messaging_time_s = dur_msg.count(); + stats.latency_ms = + conn_state.conn_stats.sampled_total_lat_s / + conn_state.conn_stats.sampled_count * 1000; + stats.iops = ops / dur_msg.count(); + stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576; + + conn_state.stopped_send_promise.set_value(stats); }); } }; }; + std::optional server_sid; + bool server_needs_report = false; + if (mode == perf_mode_t::both) { + ceph_assert(server_conf.is_fixed_cpu == true); + server_sid = server_conf.core; + } else if (mode == perf_mode_t::server) { + server_needs_report = true; + } return seastar::when_all( - test_state::Server::create(server_conf.core, server_conf.block_size), - create_sharded(client_conf.jobs, client_conf.block_size, client_conf.depth), - crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] { - return crimson::common::local_conf().start(); - }).then([crc_enabled] { - return crimson::common::local_conf().set_val( - "ms_crc_data", crc_enabled ? "true" : "false"); - }) + seastar::futurize_invoke([mode, server_conf, server_needs_report] { + if (mode == perf_mode_t::client) { + return seastar::make_ready_future(nullptr); + } else { + return create_sharded( + server_conf.core, + server_conf.block_size, + server_needs_report); + } + }), + seastar::futurize_invoke([mode, client_conf, server_sid] { + if (mode == perf_mode_t::server) { + return seastar::make_ready_future(nullptr); + } else { + unsigned nonce_base = ceph::util::generate_random_number(); + logger().info("client nonce_base={}", nonce_base); + return create_sharded( + client_conf.num_clients, + client_conf.num_conns, + client_conf.block_size, + client_conf.depth, + nonce_base, + server_sid); + } + }), + crimson::common::sharded_conf().start( + EntityName{}, std::string_view{"ceph"} + ).then([] { + return crimson::common::local_conf().start(); + }).then([crc_enabled] { + return crimson::common::local_conf().set_val( + "ms_crc_data", crc_enabled ? "true" : "false"); + }) ).then([=](auto&& ret) { - auto fp_server = std::move(std::get<0>(ret).get0()); + auto server = std::move(std::get<0>(ret).get0()); auto client = std::move(std::get<1>(ret).get0()); - test_state::Server* server = fp_server.get(); + // reserve core 0 for potentially better performance if (mode == perf_mode_t::both) { - logger().info("\nperf settings:\n {}\n {}\n", - client_conf.str(), server_conf.str()); - ceph_assert(seastar::smp::count >= 1+client_conf.jobs); - ceph_assert(client_conf.jobs > 0); - ceph_assert(seastar::smp::count >= 1+server_conf.core); - ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs); + logger().info("\nperf settings:\n smp={}\n {}\n {}\n", + seastar::smp::count, client_conf.str(), server_conf.str()); + if (client_conf.skip_core_0) { + ceph_assert(seastar::smp::count > client_conf.num_clients); + } else { + ceph_assert(seastar::smp::count >= client_conf.num_clients); + } + ceph_assert(client_conf.num_clients > 0); + ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients); return seastar::when_all_succeed( - server->init(server_conf.addr), + // it is not reasonable to allow server/client to shared cores for + // performance benchmarking purposes. + server->init(server_conf.addr, server_conf.is_fixed_cpu), client->init() ).then_unpack([client, addr = client_conf.server_addr] { return client->connect_wait_verify(addr); @@ -670,13 +1133,18 @@ static seastar::future<> run( return client->dispatch_with_timer(ramptime, msgtime); }).then([client] { return client->shutdown(); - }).then([server, fp_server = std::move(fp_server)] () mutable { - return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }).then([server] { + return server->shutdown(); }); } else if (mode == perf_mode_t::client) { - logger().info("\nperf settings:\n {}\n", client_conf.str()); - ceph_assert(seastar::smp::count >= 1+client_conf.jobs); - ceph_assert(client_conf.jobs > 0); + logger().info("\nperf settings:\n smp={}\n {}\n", + seastar::smp::count, client_conf.str()); + if (client_conf.skip_core_0) { + ceph_assert(seastar::smp::count > client_conf.num_clients); + } else { + ceph_assert(seastar::smp::count >= client_conf.num_clients); + } + ceph_assert(client_conf.num_clients > 0); return client->init( ).then([client, addr = client_conf.server_addr] { return client->connect_wait_verify(addr); @@ -687,15 +1155,15 @@ static seastar::future<> run( return client->shutdown(); }); } else { // mode == perf_mode_t::server - ceph_assert(seastar::smp::count >= 1+server_conf.core); - logger().info("\nperf settings:\n {}\n", server_conf.str()); - return server->init(server_conf.addr - // dispatch ops - ).then([server] { - return server->wait(); - // shutdown - }).then([server, fp_server = std::move(fp_server)] () mutable { - return server->shutdown().then([cleanup = std::move(fp_server)] {}); + ceph_assert(seastar::smp::count > server_conf.core); + logger().info("\nperf settings:\n smp={}\n {}\n", + seastar::smp::count, server_conf.str()); + return seastar::async([server, server_conf] { + // FIXME: SIGINT is not received by stop_signal + seastar_apps_lib::stop_signal should_stop; + server->init(server_conf.addr, server_conf.is_fixed_cpu).get(); + should_stop.wait().get(); + server->shutdown().get(); }); } }).finally([] { @@ -711,21 +1179,27 @@ int main(int argc, char** argv) app.add_options() ("mode", bpo::value()->default_value(0), "0: both, 1:client, 2:server") - ("addr", bpo::value()->default_value("v2:127.0.0.1:9010"), + ("server-addr", bpo::value()->default_value("v2:127.0.0.1:9010"), "server address(only support msgr v2 protocol)") ("ramptime", bpo::value()->default_value(5), "seconds of client ramp-up time") ("msgtime", bpo::value()->default_value(15), "seconds of client messaging time") - ("jobs", bpo::value()->default_value(1), - "number of client jobs (messengers)") - ("cbs", bpo::value()->default_value(4096), + ("clients", bpo::value()->default_value(1), + "number of client messengers") + ("conns-per-client", bpo::value()->default_value(1), + "number of connections per client") + ("client-bs", bpo::value()->default_value(4096), "client block size") ("depth", bpo::value()->default_value(512), - "client io depth") - ("core", bpo::value()->default_value(0), - "server running core") - ("sbs", bpo::value()->default_value(0), + "client io depth per job") + ("client-skip-core-0", bpo::value()->default_value(true), + "client skip core 0") + ("server-fixed-cpu", bpo::value()->default_value(true), + "server is in the fixed cpu mode, non-fixed doesn't support the mode both") + ("server-core", bpo::value()->default_value(1), + "server messenger running core") + ("server-bs", bpo::value()->default_value(0), "server block size") ("crc-enabled", bpo::value()->default_value(false), "enable CRC checks");