// vim: ts=8 sw=2 smarttab
#include <map>
-#include <random>
#include <boost/program_options.hpp>
+#include <boost/iterator/counting_iterator.hpp>
#include <seastar/core/app-template.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/future-util.hh>
+#include <seastar/core/lowres_clock.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/smp.hh>
+#include <seastar/core/thread.hh>
#include "common/ceph_time.h"
#include "messages/MOSDOp.h"
+#include "include/random.h"
#include "crimson/auth/DummyAuth.h"
#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
+#include "crimson/osd/stop_signal.h"
using namespace std;
using namespace std::chrono_literals;
+using lowres_clock_t = seastar::lowres_system_clock;
+
namespace bpo = boost::program_options;
namespace {
});
}
+double get_reactor_utilization() {
+ auto &value_map = seastar::metrics::impl::get_value_map();
+ auto found = value_map.find("reactor_utilization");
+ assert(found != value_map.end());
+ auto &[full_name, metric_family] = *found;
+ std::ignore = full_name;
+ assert(metric_family.size() == 1);
+ const auto& [labels, metric] = *metric_family.begin();
+ std::ignore = labels;
+ auto value = (*metric)();
+ return value.ui();
+}
+
enum class perf_mode_t {
both,
client,
unsigned block_size;
unsigned ramptime;
unsigned msgtime;
- unsigned jobs;
+ unsigned num_clients;
+ unsigned num_conns;
unsigned depth;
+ bool skip_core_0;
std::string str() const {
std::ostringstream out;
<< "](bs=" << block_size
<< ", ramptime=" << ramptime
<< ", msgtime=" << msgtime
- << ", jobs=" << jobs
+ << ", num_clients=" << num_clients
+ << ", num_conns=" << num_conns
<< ", depth=" << depth
+ << ", skip_core_0=" << skip_core_0
<< ")";
return out.str();
}
static client_config load(bpo::variables_map& options) {
client_config conf;
entity_addr_t addr;
- ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+ ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
ceph_assert_always(addr.is_msgr2());
conf.server_addr = addr;
- conf.block_size = options["cbs"].as<unsigned>();
+ conf.block_size = options["client-bs"].as<unsigned>();
conf.ramptime = options["ramptime"].as<unsigned>();
conf.msgtime = options["msgtime"].as<unsigned>();
- conf.jobs = options["jobs"].as<unsigned>();
+ conf.num_clients = options["clients"].as<unsigned>();
+ ceph_assert_always(conf.num_clients > 0);
+ conf.num_conns = options["conns-per-client"].as<unsigned>();
+ ceph_assert_always(conf.num_conns > 0);
conf.depth = options["depth"].as<unsigned>();
- ceph_assert(conf.depth % conf.jobs == 0);
+ conf.skip_core_0 = options["client-skip-core-0"].as<bool>();
return conf;
}
};
struct server_config {
entity_addr_t addr;
unsigned block_size;
+ bool is_fixed_cpu;
unsigned core;
std::string str() const {
std::ostringstream out;
out << "server[" << addr
<< "](bs=" << block_size
+ << ", is_fixed_cpu=" << is_fixed_cpu
<< ", core=" << core
<< ")";
return out.str();
static server_config load(bpo::variables_map& options) {
server_config conf;
entity_addr_t addr;
- ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+ ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
ceph_assert_always(addr.is_msgr2());
conf.addr = addr;
- conf.block_size = options["sbs"].as<unsigned>();
- conf.core = options["core"].as<unsigned>();
+ conf.block_size = options["server-bs"].as<unsigned>();
+ conf.is_fixed_cpu = options["server-fixed-cpu"].as<bool>();
+ conf.core = options["server-core"].as<unsigned>();
return conf;
}
};
-const unsigned SAMPLE_RATE = 7;
+const unsigned SAMPLE_RATE = 256;
static seastar::future<> run(
perf_mode_t mode,
bool crc_enabled)
{
struct test_state {
- struct Server;
- using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
-
struct Server final
- : public crimson::net::Dispatcher {
+ : public crimson::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ // available only in msgr_sid
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
const seastar::shard_id msgr_sid;
std::string lname;
+
+ bool is_fixed_cpu = true;
+ bool is_stopped = false;
+ std::optional<seastar::future<>> fut_report;
+
+ unsigned conn_count = 0;
+ unsigned msg_count = 0;
+ MessageRef last_msg;
+
+ // available in all shards
unsigned msg_len;
bufferlist msg_data;
- Server(unsigned msg_len)
- : msgr_sid{seastar::this_shard_id()},
+ Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report)
+ : msgr_sid{msgr_sid},
msg_len{msg_len} {
- lname = "server#";
- lname += std::to_string(msgr_sid);
+ lname = fmt::format("server@{}", msgr_sid);
msg_data.append_zero(msg_len);
+
+ if (seastar::this_shard_id() == msgr_sid &&
+ needs_report) {
+ start_report();
+ }
+ }
+
+ void ms_handle_connect(
+ crimson::net::ConnectionRef,
+ seastar::shard_id) override {
+ ceph_abort("impossible, server won't connect");
+ }
+
+ void ms_handle_accept(
+ crimson::net::ConnectionRef,
+ seastar::shard_id new_shard,
+ bool is_replace) override {
+ ceph_assert_always(new_shard == seastar::this_shard_id());
+ auto &server = container().local();
+ ++server.conn_count;
+ }
+
+ void ms_handle_reset(
+ crimson::net::ConnectionRef,
+ bool) override {
+ auto &server = container().local();
+ --server.conn_count;
}
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
+ assert(c->get_shard_id() == seastar::this_shard_id());
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ auto &server = container().local();
+
// server replies with MOSDOp to generate server-side write workload
const static pg_t pgid;
const static object_locator_t oloc;
pgid.pool(), oloc.nspace);
static spg_t spgid(pgid);
auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
- bufferlist data(msg_data);
- rep->write(0, msg_len, data);
+ bufferlist data(server.msg_data);
+ rep->write(0, server.msg_len, data);
rep->set_tid(m->get_tid());
+ ++server.msg_count;
std::ignore = c->send(std::move(rep));
+
+ if (server.msg_count % 16 == 0) {
+ server.last_msg = std::move(m);
+ }
return {seastar::now()};
}
- seastar::future<> init(const entity_addr_t& addr) {
- return seastar::smp::submit_to(msgr_sid, [addr, this] {
+ seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) {
+ return container().invoke_on(
+ msgr_sid, [addr, is_fixed_cpu](auto &server) {
// server msgr is always with nonce 0
- msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
- msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
- return msgr->start({this});
+ server.msgr = crimson::net::Messenger::create(
+ entity_name_t::OSD(server.msgr_sid),
+ server.lname, 0, is_fixed_cpu);
+ server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ server.msgr->set_auth_client(&server.dummy_auth);
+ server.msgr->set_auth_server(&server.dummy_auth);
+ server.is_fixed_cpu = is_fixed_cpu;
+ return server.msgr->bind(entity_addrvec_t{addr}
+ ).safe_then([&server] {
+ return server.msgr->start({&server});
}, crimson::net::Messenger::bind_ertr::all_same_way(
[addr] (const std::error_code& e) {
logger().error("Server: "
}));
});
}
+
seastar::future<> shutdown() {
logger().info("{} shutdown...", lname);
- return seastar::smp::submit_to(msgr_sid, [this] {
- ceph_assert(msgr);
- msgr->stop();
- return msgr->shutdown();
+ return container().invoke_on(
+ msgr_sid, [](auto &server) {
+ server.is_stopped = true;
+ ceph_assert(server.msgr);
+ server.msgr->stop();
+ return server.msgr->shutdown(
+ ).then([&server] {
+ if (server.fut_report.has_value()) {
+ return std::move(server.fut_report.value());
+ } else {
+ return seastar::now();
+ }
+ });
});
}
- seastar::future<> wait() {
- return seastar::smp::submit_to(msgr_sid, [this] {
- ceph_assert(msgr);
- return msgr->wait();
- });
+
+ private:
+ struct ShardReport {
+ unsigned msg_count = 0;
+
+ // per-interval metrics
+ double reactor_utilization;
+ unsigned conn_count = 0;
+ int msg_size = 0;
+ unsigned msg_count_interval = 0;
+ };
+
+ // should not be called frequently to impact performance
+ void get_report(ShardReport& last) {
+ unsigned last_msg_count = last.msg_count;
+ int msg_size = -1;
+ if (last_msg) {
+ auto msg = boost::static_pointer_cast<MOSDOp>(last_msg);
+ msg->finish_decode();
+ ceph_assert_always(msg->ops.size() == 1);
+ msg_size = msg->ops[0].op.extent.length;
+ last_msg.reset();
+ }
+
+ last.msg_count = msg_count;
+ last.reactor_utilization = get_reactor_utilization();
+ last.conn_count = conn_count;
+ last.msg_size = msg_size;
+ last.msg_count_interval = msg_count - last_msg_count;
}
- static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
- return seastar::smp::submit_to(msgr_sid, [msg_len] {
- return seastar::make_foreign(std::make_unique<Server>(msg_len));
- });
+ struct TimerReport {
+ unsigned elapsed = 0u;
+ mono_time start_time = mono_clock::zero();
+ std::vector<ShardReport> reports;
+
+ TimerReport(unsigned shards) : reports(shards) {}
+ };
+
+ void start_report() {
+ seastar::promise<> pr_report;
+ fut_report = pr_report.get_future();
+ seastar::do_with(
+ TimerReport(seastar::smp::count),
+ [this](auto &report) {
+ return seastar::do_until(
+ [this] { return is_stopped; },
+ [&report, this] {
+ return seastar::sleep(2s
+ ).then([&report, this] {
+ report.elapsed += 2;
+ if (is_fixed_cpu) {
+ return seastar::smp::submit_to(msgr_sid,
+ [&report, this] {
+ auto &server = container().local();
+ server.get_report(report.reports[seastar::this_shard_id()]);
+ }).then([&report, this] {
+ auto now = mono_clock::now();
+ auto prv = report.start_time;
+ report.start_time = now;
+ if (prv == mono_clock::zero()) {
+ // cannot compute duration
+ return;
+ }
+ std::chrono::duration<double> duration_d = now - prv;
+ double duration = duration_d.count();
+ auto &ireport = report.reports[msgr_sid];
+ double iops = ireport.msg_count_interval / duration;
+ double throughput_MB = -1;
+ if (ireport.msg_size >= 0) {
+ throughput_MB = iops * ireport.msg_size / 1048576;
+ }
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << report.elapsed
+ << "(" << std::setw(5) << duration << ") "
+ << std::setw(9) << iops << "IOPS "
+ << std::setw(8) << throughput_MB << "MiB/s "
+ << ireport.reactor_utilization
+ << "(" << ireport.conn_count << ")";
+ std::cout << sout.str() << std::endl;
+ });
+ } else {
+ return seastar::smp::invoke_on_all([&report, this] {
+ auto &server = container().local();
+ server.get_report(report.reports[seastar::this_shard_id()]);
+ }).then([&report, this] {
+ auto now = mono_clock::now();
+ auto prv = report.start_time;
+ report.start_time = now;
+ if (prv == mono_clock::zero()) {
+ // cannot compute duration
+ return;
+ }
+ std::chrono::duration<double> duration_d = now - prv;
+ double duration = duration_d.count();
+ unsigned num_msgs = 0;
+ // -1 means unavailable, -2 means mismatch
+ int msg_size = -1;
+ for (auto &i : report.reports) {
+ if (i.msg_size >= 0) {
+ if (msg_size == -2) {
+ // pass
+ } else if (msg_size == -1) {
+ msg_size = i.msg_size;
+ } else {
+ if (msg_size != i.msg_size) {
+ msg_size = -2;
+ }
+ }
+ }
+ num_msgs += i.msg_count_interval;
+ }
+ double iops = num_msgs / duration;
+ double throughput_MB = msg_size;
+ if (msg_size >= 0) {
+ throughput_MB = iops * msg_size / 1048576;
+ }
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << report.elapsed
+ << "(" << std::setw(5) << duration << ") "
+ << std::setw(9) << iops << "IOPS "
+ << std::setw(8) << throughput_MB << "MiB/s ";
+ for (auto &i : report.reports) {
+ sout << i.reactor_utilization
+ << "(" << i.conn_count << ") ";
+ }
+ std::cout << sout.str() << std::endl;
+ });
+ }
+ });
+ }
+ );
+ }).then([this] {
+ logger().info("report is stopped!");
+ }).forward_to(std::move(pr_report));
}
};
unsigned start_count = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
// for reporting only
mono_time finish_time = mono_clock::zero();
- void start() {
+ void start_connecting() {
+ connecting_time = mono_clock::now();
+ }
+
+ void finish_connecting() {
+ ceph_assert_always(connected_time == mono_clock::zero());
+ connected_time = mono_clock::now();
+ }
+
+ void start_collect() {
+ ceph_assert_always(connected_time != mono_clock::zero());
start_time = mono_clock::now();
start_count = received_count;
sampled_count = 0u;
- total_lat_s = 0.0;
+ sampled_total_lat_s = 0.0;
finish_time = mono_clock::zero();
}
+
+ void prepare_summary(const ConnStats ¤t) {
+ *this = current;
+ finish_time = mono_clock::now();
+ }
};
- ConnStats conn_stats;
struct PeriodStats {
mono_time start_time = mono_clock::zero();
unsigned start_count = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
// for reporting only
mono_time finish_time = mono_clock::zero();
unsigned finish_count = 0u;
unsigned depth = 0u;
- void reset(unsigned received_count, PeriodStats* snap = nullptr) {
- if (snap) {
- snap->start_time = start_time;
- snap->start_count = start_count;
- snap->sampled_count = sampled_count;
- snap->total_lat_s = total_lat_s;
- snap->finish_time = mono_clock::now();
- snap->finish_count = received_count;
- }
+ void start_collect(unsigned received_count) {
start_time = mono_clock::now();
start_count = received_count;
sampled_count = 0u;
- total_lat_s = 0.0;
+ sampled_total_lat_s = 0.0;
+ }
+
+ void reset_period(
+ unsigned received_count, unsigned _depth, PeriodStats &snapshot) {
+ snapshot.start_time = start_time;
+ snapshot.start_count = start_count;
+ snapshot.sampled_count = sampled_count;
+ snapshot.sampled_total_lat_s = sampled_total_lat_s;
+ snapshot.finish_time = mono_clock::now();
+ snapshot.finish_count = received_count;
+ snapshot.depth = _depth;
+
+ start_collect(received_count);
+ }
+ };
+
+ struct JobReport {
+ std::string name;
+ unsigned depth = 0;
+ double connect_time_s = 0;
+ unsigned total_msgs = 0;
+ double messaging_time_s = 0;
+ double latency_ms = 0;
+ double iops = 0;
+ double throughput_mbps = 0;
+
+ void account(const JobReport &stats) {
+ depth += stats.depth;
+ connect_time_s += stats.connect_time_s;
+ total_msgs += stats.total_msgs;
+ messaging_time_s += stats.messaging_time_s;
+ latency_ms += stats.latency_ms;
+ iops += stats.iops;
+ throughput_mbps += stats.throughput_mbps;
+ }
+
+ void report() const {
+ auto str = fmt::format(
+ "{}(depth={}):\n"
+ " connect time: {:08f}s\n"
+ " messages received: {}\n"
+ " messaging time: {:08f}s\n"
+ " latency: {:08f}ms\n"
+ " IOPS: {:08f}\n"
+ " out throughput: {:08f}MB/s",
+ name, depth, connect_time_s,
+ total_msgs, messaging_time_s,
+ latency_ms, iops,
+ throughput_mbps);
+ std::cout << str << std::endl;
+ }
+ };
+
+ struct ConnectionPriv : public crimson::net::Connection::user_private_t {
+ unsigned index;
+ ConnectionPriv(unsigned i) : index{i} {}
+ };
+
+ struct ConnState {
+ crimson::net::MessengerRef msgr;
+ ConnStats conn_stats;
+ PeriodStats period_stats;
+ seastar::semaphore depth;
+ std::vector<lowres_clock_t::time_point> time_msgs_sent;
+ unsigned sent_count = 0u;
+ crimson::net::ConnectionRef active_conn;
+ bool stop_send = false;
+ seastar::promise<JobReport> stopped_send_promise;
+
+ ConnState(std::size_t _depth)
+ : depth{_depth},
+ time_msgs_sent{_depth, lowres_clock_t::time_point::min()} {}
+
+ unsigned get_current_units() const {
+ ceph_assert(depth.available_units() >= 0);
+ return depth.current();
+ }
+
+ seastar::future<JobReport> stop_dispatch_messages() {
+ stop_send = true;
+ depth.broken(DepthBroken());
+ return stopped_send_promise.get_future();
}
};
- PeriodStats period_stats;
const seastar::shard_id sid;
- std::string lname;
+ const unsigned id;
+ const std::optional<unsigned> server_sid;
- const unsigned jobs;
- crimson::net::MessengerRef msgr;
+ const unsigned num_clients;
+ const unsigned num_conns;
const unsigned msg_len;
bufferlist msg_data;
const unsigned nr_depth;
- seastar::semaphore depth;
- std::vector<mono_time> time_msgs_sent;
+ const unsigned nonce_base;
crimson::auth::DummyAuthClientServer dummy_auth;
- unsigned sent_count = 0u;
- crimson::net::ConnectionRef active_conn = nullptr;
+ std::vector<ConnState> conn_states;
- bool stop_send = false;
- seastar::promise<> stopped_send_promise;
-
- Client(unsigned jobs, unsigned msg_len, unsigned depth)
+ Client(unsigned num_clients,
+ unsigned num_conns,
+ unsigned msg_len,
+ unsigned _depth,
+ unsigned nonce_base,
+ std::optional<unsigned> server_sid)
: sid{seastar::this_shard_id()},
- jobs{jobs},
+ id{sid + num_clients - seastar::smp::count},
+ server_sid{server_sid},
+ num_clients{num_clients},
+ num_conns{num_conns},
msg_len{msg_len},
- nr_depth{depth/jobs},
- depth{nr_depth},
- time_msgs_sent{depth/jobs, mono_clock::zero()} {
- lname = "client#";
- lname += std::to_string(sid);
+ nr_depth{_depth},
+ nonce_base{nonce_base} {
+ if (is_active()) {
+ for (unsigned i = 0; i < num_conns; ++i) {
+ conn_states.emplace_back(nr_depth);
+ }
+ }
msg_data.append_zero(msg_len);
}
- unsigned get_current_depth() const {
- ceph_assert(depth.available_units() >= 0);
- return nr_depth - depth.current();
+ std::string get_name(unsigned i) {
+ return fmt::format("client{}Conn{}@{}", id, i, sid);
}
- void ms_handle_connect(crimson::net::ConnectionRef conn) override {
- conn_stats.connected_time = mono_clock::now();
+ void ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard) override {
+ ceph_assert_always(prv_shard == seastar::this_shard_id());
+ assert(is_active());
+ unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+ auto &conn_state = conn_states[index];
+ conn_state.conn_stats.finish_connecting();
}
+
std::optional<seastar::future<>> ms_dispatch(
- crimson::net::ConnectionRef, MessageRef m) override {
+ crimson::net::ConnectionRef conn, MessageRef m) override {
+ assert(is_active());
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+ assert(index < num_conns);
+ auto &conn_state = conn_states[index];
+
auto msg_id = m->get_tid();
if (msg_id % SAMPLE_RATE == 0) {
- auto index = msg_id % time_msgs_sent.size();
- ceph_assert(time_msgs_sent[index] != mono_clock::zero());
- std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
- conn_stats.total_lat_s += cur_latency.count();
- ++(conn_stats.sampled_count);
- period_stats.total_lat_s += cur_latency.count();
- ++(period_stats.sampled_count);
- time_msgs_sent[index] = mono_clock::zero();
+ auto msg_index = msg_id % conn_state.time_msgs_sent.size();
+ ceph_assert(conn_state.time_msgs_sent[msg_index] !=
+ lowres_clock_t::time_point::min());
+ std::chrono::duration<double> cur_latency =
+ lowres_clock_t::now() - conn_state.time_msgs_sent[msg_index];
+ conn_state.conn_stats.sampled_total_lat_s += cur_latency.count();
+ ++(conn_state.conn_stats.sampled_count);
+ conn_state.period_stats.sampled_total_lat_s += cur_latency.count();
+ ++(conn_state.period_stats.sampled_count);
+ conn_state.time_msgs_sent[msg_index] = lowres_clock_t::time_point::min();
}
- ++(conn_stats.received_count);
- depth.signal(1);
+ ++(conn_state.conn_stats.received_count);
+ conn_state.depth.signal(1);
return {seastar::now()};
}
// should start messenger at this shard?
bool is_active() {
ceph_assert(seastar::this_shard_id() == sid);
- return sid != 0 && sid <= jobs;
+ return sid + num_clients >= seastar::smp::count;
}
seastar::future<> init() {
- return container().invoke_on_all([] (auto& client) {
+ return container().invoke_on_all([](auto& client) {
if (client.is_active()) {
- client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
- client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- client.msgr->set_auth_client(&client.dummy_auth);
- client.msgr->set_auth_server(&client.dummy_auth);
- return client.msgr->start({&client});
+ return seastar::do_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(client.num_conns),
+ [&client](auto i) {
+ auto &conn_state = client.conn_states[i];
+ std::string name = client.get_name(i);
+ conn_state.msgr = crimson::net::Messenger::create(
+ entity_name_t::OSD(client.id * client.num_conns + i),
+ name, client.nonce_base + client.id * client.num_conns + i, true);
+ conn_state.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ conn_state.msgr->set_auth_client(&client.dummy_auth);
+ conn_state.msgr->set_auth_server(&client.dummy_auth);
+ return conn_state.msgr->start({&client});
+ });
}
return seastar::now();
});
}
seastar::future<> shutdown() {
- return container().invoke_on_all([] (auto& client) {
- if (client.is_active()) {
- logger().info("{} shutdown...", client.lname);
- ceph_assert(client.msgr);
- client.msgr->stop();
- return client.msgr->shutdown().then([&client] {
- return client.stop_dispatch_messages();
+ return seastar::do_with(
+ std::vector<JobReport>(num_clients * num_conns),
+ [this](auto &all_stats) {
+ return container().invoke_on_all([&all_stats](auto& client) {
+ if (!client.is_active()) {
+ return seastar::now();
+ }
+
+ return seastar::parallel_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(client.num_conns),
+ [&all_stats, &client](auto i) {
+ logger().info("{} shutdown...", client.get_name(i));
+ auto &conn_state = client.conn_states[i];
+ return conn_state.stop_dispatch_messages(
+ ).then([&all_stats, &client, i](auto stats) {
+ all_stats[client.id * client.num_conns + i] = stats;
+ });
+ }).then([&client] {
+ return seastar::do_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(client.num_conns),
+ [&client](auto i) {
+ auto &conn_state = client.conn_states[i];
+ ceph_assert(conn_state.msgr);
+ conn_state.msgr->stop();
+ return conn_state.msgr->shutdown();
+ });
});
- }
- return seastar::now();
+ }).then([&all_stats, this] {
+ auto nr_jobs = all_stats.size();
+ JobReport summary;
+ std::vector<JobReport> clients(num_clients);
+
+ for (unsigned i = 0; i < nr_jobs; ++i) {
+ auto &stats = all_stats[i];
+ stats.report();
+ clients[i / num_conns].account(stats);
+ summary.account(stats);
+ }
+
+ std::cout << std::endl;
+ std::cout << "per client:" << std::endl;
+ for (unsigned i = 0; i < num_clients; ++i) {
+ auto &stats = clients[i];
+ stats.name = fmt::format("client{}", i);
+ stats.connect_time_s /= num_conns;
+ stats.messaging_time_s /= num_conns;
+ stats.latency_ms /= num_conns;
+ stats.report();
+ }
+
+ std::cout << std::endl;
+ summary.name = fmt::format("all", nr_jobs);
+ summary.connect_time_s /= nr_jobs;
+ summary.messaging_time_s /= nr_jobs;
+ summary.latency_ms /= nr_jobs;
+ summary.report();
+ });
});
}
seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
- return container().invoke_on_all([peer_addr] (auto& client) {
- // start clients in active cores (#1 ~ #jobs)
+ return container().invoke_on_all([peer_addr](auto& client) {
+ // start clients in active cores
if (client.is_active()) {
- mono_time start_time = mono_clock::now();
- client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ conn_state.conn_stats.start_connecting();
+ conn_state.active_conn = conn_state.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ conn_state.active_conn->set_user_private(
+ std::make_unique<ConnectionPriv>(i));
+ }
// make sure handshake won't hurt the performance
- return seastar::sleep(1s).then([&client, start_time] {
- if (client.conn_stats.connected_time == mono_clock::zero()) {
- logger().error("\n{} not connected after 1s!\n", client.lname);
- ceph_assert(false);
+ return seastar::sleep(1s).then([&client] {
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ if (conn_state.conn_stats.connected_time == mono_clock::zero()) {
+ logger().error("\n{} not connected after 1s!\n",
+ client.get_name(i));
+ ceph_assert(false);
+ }
}
- client.conn_stats.connecting_time = start_time;
});
}
return seastar::now();
private:
class TimerReport {
private:
- const unsigned jobs;
+ const unsigned num_clients;
+ const unsigned num_conns;
const unsigned msgtime;
const unsigned bytes_of_block;
unsigned elapsed = 0u;
- std::vector<mono_time> start_times;
std::vector<PeriodStats> snaps;
std::vector<ConnStats> summaries;
+ std::vector<double> client_reactor_utilizations;
+ std::optional<double> server_reactor_utilization;
public:
- TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
- : jobs{jobs},
+ TimerReport(unsigned num_clients, unsigned num_conns, unsigned msgtime, unsigned bs)
+ : num_clients{num_clients},
+ num_conns{num_conns},
msgtime{msgtime},
bytes_of_block{bs},
- start_times{jobs, mono_clock::zero()},
- snaps{jobs},
- summaries{jobs} {}
+ snaps{num_clients * num_conns},
+ summaries{num_clients * num_conns},
+ client_reactor_utilizations(num_clients) {}
unsigned get_elapsed() const { return elapsed; }
- PeriodStats& get_snap_by_job(seastar::shard_id sid) {
- ceph_assert(sid >= 1 && sid <= jobs);
- return snaps[sid - 1];
+ PeriodStats& get_snap(unsigned client_id, unsigned i) {
+ return snaps[client_id * num_conns + i];
}
- ConnStats& get_summary_by_job(seastar::shard_id sid) {
- ceph_assert(sid >= 1 && sid <= jobs);
- return summaries[sid - 1];
+ ConnStats& get_summary(unsigned client_id, unsigned i) {
+ return summaries[client_id * num_conns + i];
+ }
+
+ void set_client_reactor_utilization(unsigned client_id, double ru) {
+ client_reactor_utilizations[client_id] = ru;
+ }
+
+ void set_server_reactor_utilization(double ru) {
+ server_reactor_utilization = ru;
}
bool should_stop() const {
});
}
- void report_header() {
+ void report_header() const {
std::ostringstream sout;
sout << std::setfill(' ')
- << std::setw(7) << "sec"
- << std::setw(6) << "depth"
- << std::setw(8) << "IOPS"
- << std::setw(8) << "MB/s"
- << std::setw(8) << "lat(ms)";
+ << std::setw(6) << "sec"
+ << std::setw(7) << "depth"
+ << std::setw(10) << "IOPS"
+ << std::setw(9) << "MB/s"
+ << std::setw(9) << "lat(ms)";
std::cout << sout.str() << std::endl;
}
void report_period() {
- if (elapsed == 1) {
- // init this->start_times at the first period
- for (unsigned i=0; i<jobs; ++i) {
- start_times[i] = snaps[i].start_time;
- }
- }
std::chrono::duration<double> elapsed_d = 0s;
unsigned depth = 0u;
unsigned ops = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
for (const auto& snap: snaps) {
elapsed_d += (snap.finish_time - snap.start_time);
depth += snap.depth;
ops += (snap.finish_count - snap.start_count);
sampled_count += snap.sampled_count;
- total_lat_s += snap.total_lat_s;
+ sampled_total_lat_s += snap.sampled_total_lat_s;
}
- double elapsed_s = elapsed_d.count() / jobs;
+ double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
double iops = ops/elapsed_s;
std::ostringstream sout;
sout << setfill(' ')
- << std::setw(7) << elapsed_s
+ << std::setw(5) << elapsed_s
+ << " "
<< std::setw(6) << depth
- << std::setw(8) << iops
+ << " "
+ << std::setw(9) << iops
+ << " "
<< std::setw(8) << iops * bytes_of_block / 1048576
- << std::setw(8) << (total_lat_s / sampled_count * 1000);
+ << " "
+ << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
+ << " -- ";
+ if (server_reactor_utilization.has_value()) {
+ sout << *server_reactor_utilization << " -- ";
+ }
+ for (double cru : client_reactor_utilizations) {
+ sout << cru << ",";
+ }
std::cout << sout.str() << std::endl;
}
std::chrono::duration<double> elapsed_d = 0s;
unsigned ops = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
for (const auto& summary: summaries) {
elapsed_d += (summary.finish_time - summary.start_time);
ops += (summary.received_count - summary.start_count);
sampled_count += summary.sampled_count;
- total_lat_s += summary.total_lat_s;
+ sampled_total_lat_s += summary.sampled_total_lat_s;
}
- double elapsed_s = elapsed_d.count() / jobs;
+ double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
double iops = ops / elapsed_s;
std::ostringstream sout;
sout << "--------------"
<< std::setw(6) << "-"
<< std::setw(8) << iops
<< std::setw(8) << iops * bytes_of_block / 1048576
- << std::setw(8) << (total_lat_s / sampled_count * 1000)
+ << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
<< "\n";
std::cout << sout.str() << std::endl;
}
seastar::future<> report_period(TimerReport& report) {
return container().invoke_on_all([&report] (auto& client) {
if (client.is_active()) {
- PeriodStats& snap = report.get_snap_by_job(client.sid);
- client.period_stats.reset(client.conn_stats.received_count,
- &snap);
- snap.depth = client.get_current_depth();
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ PeriodStats& snap = report.get_snap(client.id, i);
+ conn_state.period_stats.reset_period(
+ conn_state.conn_stats.received_count,
+ client.nr_depth - conn_state.get_current_units(),
+ snap);
+ }
+ report.set_client_reactor_utilization(client.id, get_reactor_utilization());
+ }
+ if (client.server_sid.has_value() &&
+ seastar::this_shard_id() == *client.server_sid) {
+ assert(!client.is_active());
+ report.set_server_reactor_utilization(get_reactor_utilization());
}
}).then([&report] {
report.report_period();
seastar::future<> report_summary(TimerReport& report) {
return container().invoke_on_all([&report] (auto& client) {
if (client.is_active()) {
- ConnStats& summary = report.get_summary_by_job(client.sid);
- summary = client.conn_stats;
- summary.finish_time = mono_clock::now();
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ ConnStats& summary = report.get_summary(client.id, i);
+ summary.prepare_summary(conn_state.conn_stats);
+ }
}
}).then([&report] {
report.report_summary();
public:
seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
- logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
+ logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns",
+ num_clients, num_conns);
return container().invoke_on_all([] (auto& client) {
if (client.is_active()) {
- client.do_dispatch_messages(client.active_conn.get());
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ client.do_dispatch_messages(i);
+ }
}
}).then([ramptime] {
logger().info("[all clients]: ramping up {} seconds...", ramptime);
}).then([this] {
return container().invoke_on_all([] (auto& client) {
if (client.is_active()) {
- client.conn_stats.start();
- client.period_stats.reset(client.conn_stats.received_count);
+ for (unsigned i = 0; i < client.num_conns; ++i) {
+ auto &conn_state = client.conn_states[i];
+ conn_state.conn_stats.start_collect();
+ conn_state.period_stats.start_collect(conn_state.conn_stats.received_count);
+ }
}
});
}).then([this, msgtime] {
logger().info("[all clients]: reporting {} seconds...\n", msgtime);
return seastar::do_with(
- TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
+ TimerReport(num_clients, num_conns, msgtime, msg_len),
+ [this](auto& report) {
report.report_header();
return seastar::do_until(
[&report] { return report.should_stop(); },
}
private:
- seastar::future<> send_msg(crimson::net::Connection* conn) {
+ seastar::future<> send_msg(ConnState &conn_state) {
ceph_assert(seastar::this_shard_id() == sid);
- return depth.wait(1).then([this, conn] {
+ conn_state.sent_count += 1;
+ return conn_state.depth.wait(1
+ ).then([this, &conn_state] {
const static pg_t pgid;
const static object_locator_t oloc;
const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
bufferlist data(msg_data);
m->write(0, msg_len, data);
// use tid as the identity of each round
- m->set_tid(sent_count);
+ m->set_tid(conn_state.sent_count);
// sample message latency
- if (sent_count % SAMPLE_RATE == 0) {
- auto index = sent_count % time_msgs_sent.size();
- ceph_assert(time_msgs_sent[index] == mono_clock::zero());
- time_msgs_sent[index] = mono_clock::now();
+ if (unlikely(conn_state.sent_count % SAMPLE_RATE == 0)) {
+ auto index = conn_state.sent_count % conn_state.time_msgs_sent.size();
+ ceph_assert(conn_state.time_msgs_sent[index] ==
+ lowres_clock_t::time_point::min());
+ conn_state.time_msgs_sent[index] = lowres_clock_t::now();
}
- return conn->send(std::move(m));
+ return conn_state.active_conn->send(std::move(m));
});
}
class DepthBroken: public std::exception {};
- seastar::future<> stop_dispatch_messages() {
- stop_send = true;
- depth.broken(DepthBroken());
- return stopped_send_promise.get_future();
+ seastar::future<JobReport> stop_dispatch_messages(unsigned i) {
+ auto &conn_state = conn_states[i];
+ conn_state.stop_send = true;
+ conn_state.depth.broken(DepthBroken());
+ return conn_state.stopped_send_promise.get_future();
}
- void do_dispatch_messages(crimson::net::Connection* conn) {
+ void do_dispatch_messages(unsigned i) {
ceph_assert(seastar::this_shard_id() == sid);
- ceph_assert(sent_count == 0);
- conn_stats.start_time = mono_clock::now();
+ auto &conn_state = conn_states[i];
+ ceph_assert(conn_state.sent_count == 0);
+ conn_state.conn_stats.start_time = mono_clock::now();
// forwarded to stopped_send_promise
(void) seastar::do_until(
- [this] { return stop_send; },
- [this, conn] {
- sent_count += 1;
- return send_msg(conn);
- }
+ [&conn_state] { return conn_state.stop_send; },
+ [this, &conn_state] { return send_msg(conn_state); }
).handle_exception_type([] (const DepthBroken& e) {
// ok, stopped by stop_dispatch_messages()
- }).then([this, conn] {
- std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
- std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
- unsigned ops = conn_stats.received_count - conn_stats.start_count;
- logger().info("{}: stopped sending OSDOPs.\n"
- "{}(depth={}):\n"
- " connect time: {}s\n"
- " messages received: {}\n"
- " messaging time: {}s\n"
- " latency: {}ms\n"
- " IOPS: {}\n"
- " throughput: {}MB/s\n",
- *conn,
- lname,
- nr_depth,
- dur_conn.count(),
- ops,
- dur_msg.count(),
- conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
- ops / dur_msg.count(),
- ops / dur_msg.count() * msg_len / 1048576);
- stopped_send_promise.set_value();
+ }).then([this, &conn_state, i] {
+ std::string name = get_name(i);
+ logger().info("{} {}: stopped sending OSDOPs",
+ name, *conn_state.active_conn);
+
+ std::chrono::duration<double> dur_conn =
+ conn_state.conn_stats.connected_time -
+ conn_state.conn_stats.connecting_time;
+ std::chrono::duration<double> dur_msg =
+ mono_clock::now() - conn_state.conn_stats.start_time;
+ unsigned ops =
+ conn_state.conn_stats.received_count -
+ conn_state.conn_stats.start_count;
+
+ JobReport stats;
+ stats.name = name;
+ stats.depth = nr_depth;
+ stats.connect_time_s = dur_conn.count();
+ stats.total_msgs = ops;
+ stats.messaging_time_s = dur_msg.count();
+ stats.latency_ms =
+ conn_state.conn_stats.sampled_total_lat_s /
+ conn_state.conn_stats.sampled_count * 1000;
+ stats.iops = ops / dur_msg.count();
+ stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576;
+
+ conn_state.stopped_send_promise.set_value(stats);
});
}
};
};
+ std::optional<unsigned> server_sid;
+ bool server_needs_report = false;
+ if (mode == perf_mode_t::both) {
+ ceph_assert(server_conf.is_fixed_cpu == true);
+ server_sid = server_conf.core;
+ } else if (mode == perf_mode_t::server) {
+ server_needs_report = true;
+ }
return seastar::when_all(
- test_state::Server::create(server_conf.core, server_conf.block_size),
- create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
- crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
- return crimson::common::local_conf().start();
- }).then([crc_enabled] {
- return crimson::common::local_conf().set_val(
- "ms_crc_data", crc_enabled ? "true" : "false");
- })
+ seastar::futurize_invoke([mode, server_conf, server_needs_report] {
+ if (mode == perf_mode_t::client) {
+ return seastar::make_ready_future<test_state::Server*>(nullptr);
+ } else {
+ return create_sharded<test_state::Server>(
+ server_conf.core,
+ server_conf.block_size,
+ server_needs_report);
+ }
+ }),
+ seastar::futurize_invoke([mode, client_conf, server_sid] {
+ if (mode == perf_mode_t::server) {
+ return seastar::make_ready_future<test_state::Client*>(nullptr);
+ } else {
+ unsigned nonce_base = ceph::util::generate_random_number<unsigned>();
+ logger().info("client nonce_base={}", nonce_base);
+ return create_sharded<test_state::Client>(
+ client_conf.num_clients,
+ client_conf.num_conns,
+ client_conf.block_size,
+ client_conf.depth,
+ nonce_base,
+ server_sid);
+ }
+ }),
+ crimson::common::sharded_conf().start(
+ EntityName{}, std::string_view{"ceph"}
+ ).then([] {
+ return crimson::common::local_conf().start();
+ }).then([crc_enabled] {
+ return crimson::common::local_conf().set_val(
+ "ms_crc_data", crc_enabled ? "true" : "false");
+ })
).then([=](auto&& ret) {
- auto fp_server = std::move(std::get<0>(ret).get0());
+ auto server = std::move(std::get<0>(ret).get0());
auto client = std::move(std::get<1>(ret).get0());
- test_state::Server* server = fp_server.get();
+ // reserve core 0 for potentially better performance
if (mode == perf_mode_t::both) {
- logger().info("\nperf settings:\n {}\n {}\n",
- client_conf.str(), server_conf.str());
- ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
- ceph_assert(client_conf.jobs > 0);
- ceph_assert(seastar::smp::count >= 1+server_conf.core);
- ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
+ logger().info("\nperf settings:\n smp={}\n {}\n {}\n",
+ seastar::smp::count, client_conf.str(), server_conf.str());
+ if (client_conf.skip_core_0) {
+ ceph_assert(seastar::smp::count > client_conf.num_clients);
+ } else {
+ ceph_assert(seastar::smp::count >= client_conf.num_clients);
+ }
+ ceph_assert(client_conf.num_clients > 0);
+ ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients);
return seastar::when_all_succeed(
- server->init(server_conf.addr),
+ // it is not reasonable to allow server/client to shared cores for
+ // performance benchmarking purposes.
+ server->init(server_conf.addr, server_conf.is_fixed_cpu),
client->init()
).then_unpack([client, addr = client_conf.server_addr] {
return client->connect_wait_verify(addr);
return client->dispatch_with_timer(ramptime, msgtime);
}).then([client] {
return client->shutdown();
- }).then([server, fp_server = std::move(fp_server)] () mutable {
- return server->shutdown().then([cleanup = std::move(fp_server)] {});
+ }).then([server] {
+ return server->shutdown();
});
} else if (mode == perf_mode_t::client) {
- logger().info("\nperf settings:\n {}\n", client_conf.str());
- ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
- ceph_assert(client_conf.jobs > 0);
+ logger().info("\nperf settings:\n smp={}\n {}\n",
+ seastar::smp::count, client_conf.str());
+ if (client_conf.skip_core_0) {
+ ceph_assert(seastar::smp::count > client_conf.num_clients);
+ } else {
+ ceph_assert(seastar::smp::count >= client_conf.num_clients);
+ }
+ ceph_assert(client_conf.num_clients > 0);
return client->init(
).then([client, addr = client_conf.server_addr] {
return client->connect_wait_verify(addr);
return client->shutdown();
});
} else { // mode == perf_mode_t::server
- ceph_assert(seastar::smp::count >= 1+server_conf.core);
- logger().info("\nperf settings:\n {}\n", server_conf.str());
- return server->init(server_conf.addr
- // dispatch ops
- ).then([server] {
- return server->wait();
- // shutdown
- }).then([server, fp_server = std::move(fp_server)] () mutable {
- return server->shutdown().then([cleanup = std::move(fp_server)] {});
+ ceph_assert(seastar::smp::count > server_conf.core);
+ logger().info("\nperf settings:\n smp={}\n {}\n",
+ seastar::smp::count, server_conf.str());
+ return seastar::async([server, server_conf] {
+ // FIXME: SIGINT is not received by stop_signal
+ seastar_apps_lib::stop_signal should_stop;
+ server->init(server_conf.addr, server_conf.is_fixed_cpu).get();
+ should_stop.wait().get();
+ server->shutdown().get();
});
}
}).finally([] {
app.add_options()
("mode", bpo::value<unsigned>()->default_value(0),
"0: both, 1:client, 2:server")
- ("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+ ("server-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
"server address(only support msgr v2 protocol)")
("ramptime", bpo::value<unsigned>()->default_value(5),
"seconds of client ramp-up time")
("msgtime", bpo::value<unsigned>()->default_value(15),
"seconds of client messaging time")
- ("jobs", bpo::value<unsigned>()->default_value(1),
- "number of client jobs (messengers)")
- ("cbs", bpo::value<unsigned>()->default_value(4096),
+ ("clients", bpo::value<unsigned>()->default_value(1),
+ "number of client messengers")
+ ("conns-per-client", bpo::value<unsigned>()->default_value(1),
+ "number of connections per client")
+ ("client-bs", bpo::value<unsigned>()->default_value(4096),
"client block size")
("depth", bpo::value<unsigned>()->default_value(512),
- "client io depth")
- ("core", bpo::value<unsigned>()->default_value(0),
- "server running core")
- ("sbs", bpo::value<unsigned>()->default_value(0),
+ "client io depth per job")
+ ("client-skip-core-0", bpo::value<bool>()->default_value(true),
+ "client skip core 0")
+ ("server-fixed-cpu", bpo::value<bool>()->default_value(true),
+ "server is in the fixed cpu mode, non-fixed doesn't support the mode both")
+ ("server-core", bpo::value<unsigned>()->default_value(1),
+ "server messenger running core")
+ ("server-bs", bpo::value<unsigned>()->default_value(0),
"server block size")
("crc-enabled", bpo::value<bool>()->default_value(false),
"enable CRC checks");