]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/tools/perf_crimson_msgr.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / tools / perf_crimson_msgr.cc
index ef5602b0f27bf8b8ed2c9b31c1a6099024406495..aa5753442e28e44ff32d093226fc02feb8b0ddf7 100644 (file)
@@ -2,19 +2,22 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <map>
-#include <random>
 #include <boost/program_options.hpp>
+#include <boost/iterator/counting_iterator.hpp>
 
 #include <seastar/core/app-template.hh>
 #include <seastar/core/do_with.hh>
 #include <seastar/core/future-util.hh>
+#include <seastar/core/lowres_clock.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sleep.hh>
 #include <seastar/core/semaphore.hh>
 #include <seastar/core/smp.hh>
+#include <seastar/core/thread.hh>
 
 #include "common/ceph_time.h"
 #include "messages/MOSDOp.h"
+#include "include/random.h"
 
 #include "crimson/auth/DummyAuth.h"
 #include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
+#include "crimson/osd/stop_signal.h"
 
 using namespace std;
 using namespace std::chrono_literals;
 
+using lowres_clock_t = seastar::lowres_system_clock;
+
 namespace bpo = boost::program_options;
 
 namespace {
@@ -54,6 +60,19 @@ seastar::future<T*> create_sharded(Args... args) {
   });
 }
 
+double get_reactor_utilization() {
+  auto &value_map = seastar::metrics::impl::get_value_map();
+  auto found = value_map.find("reactor_utilization");
+  assert(found != value_map.end());
+  auto &[full_name, metric_family] = *found;
+  std::ignore = full_name;
+  assert(metric_family.size() == 1);
+  const auto& [labels, metric] = *metric_family.begin();
+  std::ignore = labels;
+  auto value = (*metric)();
+  return value.ui();
+}
+
 enum class perf_mode_t {
   both,
   client,
@@ -65,8 +84,10 @@ struct client_config {
   unsigned block_size;
   unsigned ramptime;
   unsigned msgtime;
-  unsigned jobs;
+  unsigned num_clients;
+  unsigned num_conns;
   unsigned depth;
+  bool skip_core_0;
 
   std::string str() const {
     std::ostringstream out;
@@ -74,8 +95,10 @@ struct client_config {
         << "](bs=" << block_size
         << ", ramptime=" << ramptime
         << ", msgtime=" << msgtime
-        << ", jobs=" << jobs
+        << ", num_clients=" << num_clients
+        << ", num_conns=" << num_conns
         << ", depth=" << depth
+        << ", skip_core_0=" << skip_core_0
         << ")";
     return out.str();
   }
@@ -83,16 +106,19 @@ struct client_config {
   static client_config load(bpo::variables_map& options) {
     client_config conf;
     entity_addr_t addr;
-    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+    ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
     ceph_assert_always(addr.is_msgr2());
 
     conf.server_addr = addr;
-    conf.block_size = options["cbs"].as<unsigned>();
+    conf.block_size = options["client-bs"].as<unsigned>();
     conf.ramptime = options["ramptime"].as<unsigned>();
     conf.msgtime = options["msgtime"].as<unsigned>();
-    conf.jobs = options["jobs"].as<unsigned>();
+    conf.num_clients = options["clients"].as<unsigned>();
+    ceph_assert_always(conf.num_clients > 0);
+    conf.num_conns = options["conns-per-client"].as<unsigned>();
+    ceph_assert_always(conf.num_conns > 0);
     conf.depth = options["depth"].as<unsigned>();
-    ceph_assert(conf.depth % conf.jobs == 0);
+    conf.skip_core_0 = options["client-skip-core-0"].as<bool>();
     return conf;
   }
 };
@@ -100,12 +126,14 @@ struct client_config {
 struct server_config {
   entity_addr_t addr;
   unsigned block_size;
+  bool is_fixed_cpu;
   unsigned core;
 
   std::string str() const {
     std::ostringstream out;
     out << "server[" << addr
         << "](bs=" << block_size
+        << ", is_fixed_cpu=" << is_fixed_cpu
         << ", core=" << core
         << ")";
     return out.str();
@@ -114,17 +142,18 @@ struct server_config {
   static server_config load(bpo::variables_map& options) {
     server_config conf;
     entity_addr_t addr;
-    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+    ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
     ceph_assert_always(addr.is_msgr2());
 
     conf.addr = addr;
-    conf.block_size = options["sbs"].as<unsigned>();
-    conf.core = options["core"].as<unsigned>();
+    conf.block_size = options["server-bs"].as<unsigned>();
+    conf.is_fixed_cpu = options["server-fixed-cpu"].as<bool>();
+    conf.core = options["server-core"].as<unsigned>();
     return conf;
   }
 };
 
-const unsigned SAMPLE_RATE = 7;
+const unsigned SAMPLE_RATE = 256;
 
 static seastar::future<> run(
     perf_mode_t mode,
@@ -133,30 +162,68 @@ static seastar::future<> run(
     bool crc_enabled)
 {
   struct test_state {
-    struct Server;
-    using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
-
     struct Server final
-        : public crimson::net::Dispatcher {
+        : public crimson::net::Dispatcher,
+          public seastar::peering_sharded_service<Server> {
+      // available only in msgr_sid
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
       const seastar::shard_id msgr_sid;
       std::string lname;
+
+      bool is_fixed_cpu = true;
+      bool is_stopped = false;
+      std::optional<seastar::future<>> fut_report;
+
+      unsigned conn_count = 0;
+      unsigned msg_count = 0;
+      MessageRef last_msg;
+
+      // available in all shards
       unsigned msg_len;
       bufferlist msg_data;
 
-      Server(unsigned msg_len)
-        : msgr_sid{seastar::this_shard_id()},
+      Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report)
+        : msgr_sid{msgr_sid},
           msg_len{msg_len} {
-        lname = "server#";
-        lname += std::to_string(msgr_sid);
+        lname = fmt::format("server@{}", msgr_sid);
         msg_data.append_zero(msg_len);
+
+        if (seastar::this_shard_id() == msgr_sid &&
+            needs_report) {
+          start_report();
+        }
+      }
+
+      void ms_handle_connect(
+          crimson::net::ConnectionRef,
+          seastar::shard_id) override {
+        ceph_abort("impossible, server won't connect");
+      }
+
+      void ms_handle_accept(
+          crimson::net::ConnectionRef,
+          seastar::shard_id new_shard,
+          bool is_replace) override {
+        ceph_assert_always(new_shard == seastar::this_shard_id());
+        auto &server = container().local();
+        ++server.conn_count;
+      }
+
+      void ms_handle_reset(
+          crimson::net::ConnectionRef,
+          bool) override {
+        auto &server = container().local();
+        --server.conn_count;
       }
 
       std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
+        assert(c->get_shard_id() == seastar::this_shard_id());
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
+        auto &server = container().local();
+
         // server replies with MOSDOp to generate server-side write workload
         const static pg_t pgid;
         const static object_locator_t oloc;
@@ -164,22 +231,32 @@ static seastar::future<> run(
                                     pgid.pool(), oloc.nspace);
         static spg_t spgid(pgid);
         auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
-        bufferlist data(msg_data);
-        rep->write(0, msg_len, data);
+        bufferlist data(server.msg_data);
+        rep->write(0, server.msg_len, data);
         rep->set_tid(m->get_tid());
+        ++server.msg_count;
         std::ignore = c->send(std::move(rep));
+
+        if (server.msg_count % 16 == 0) {
+          server.last_msg = std::move(m);
+        }
         return {seastar::now()};
       }
 
-      seastar::future<> init(const entity_addr_t& addr) {
-        return seastar::smp::submit_to(msgr_sid, [addr, this] {
+      seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) {
+        return container().invoke_on(
+            msgr_sid, [addr, is_fixed_cpu](auto &server) {
           // server msgr is always with nonce 0
-          msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
-          msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-          msgr->set_auth_client(&dummy_auth);
-          msgr->set_auth_server(&dummy_auth);
-          return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
-            return msgr->start({this});
+          server.msgr = crimson::net::Messenger::create(
+              entity_name_t::OSD(server.msgr_sid),
+              server.lname, 0, is_fixed_cpu);
+          server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+          server.msgr->set_auth_client(&server.dummy_auth);
+          server.msgr->set_auth_server(&server.dummy_auth);
+          server.is_fixed_cpu = is_fixed_cpu;
+          return server.msgr->bind(entity_addrvec_t{addr}
+          ).safe_then([&server] {
+            return server.msgr->start({&server});
           }, crimson::net::Messenger::bind_ertr::all_same_way(
               [addr] (const std::error_code& e) {
             logger().error("Server: "
@@ -188,25 +265,161 @@ static seastar::future<> run(
           }));
         });
       }
+
       seastar::future<> shutdown() {
         logger().info("{} shutdown...", lname);
-        return seastar::smp::submit_to(msgr_sid, [this] {
-          ceph_assert(msgr);
-          msgr->stop();
-          return msgr->shutdown();
+        return container().invoke_on(
+            msgr_sid, [](auto &server) {
+          server.is_stopped = true;
+          ceph_assert(server.msgr);
+          server.msgr->stop();
+          return server.msgr->shutdown(
+          ).then([&server] {
+            if (server.fut_report.has_value()) {
+              return std::move(server.fut_report.value());
+            } else {
+              return seastar::now();
+            }
+          });
         });
       }
-      seastar::future<> wait() {
-        return seastar::smp::submit_to(msgr_sid, [this] {
-          ceph_assert(msgr);
-          return msgr->wait();
-        });
+
+    private:
+      struct ShardReport {
+        unsigned msg_count = 0;
+
+        // per-interval metrics
+        double reactor_utilization;
+        unsigned conn_count = 0;
+        int msg_size = 0;
+        unsigned msg_count_interval = 0;
+      };
+
+      // should not be called frequently to impact performance
+      void get_report(ShardReport& last) {
+        unsigned last_msg_count = last.msg_count;
+        int msg_size = -1;
+        if (last_msg) {
+          auto msg = boost::static_pointer_cast<MOSDOp>(last_msg);
+          msg->finish_decode();
+          ceph_assert_always(msg->ops.size() == 1);
+          msg_size = msg->ops[0].op.extent.length;
+          last_msg.reset();
+        }
+
+        last.msg_count = msg_count;
+        last.reactor_utilization = get_reactor_utilization();
+        last.conn_count = conn_count;
+        last.msg_size = msg_size;
+        last.msg_count_interval = msg_count - last_msg_count;
       }
 
-      static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
-        return seastar::smp::submit_to(msgr_sid, [msg_len] {
-          return seastar::make_foreign(std::make_unique<Server>(msg_len));
-        });
+      struct TimerReport {
+        unsigned elapsed = 0u;
+        mono_time start_time = mono_clock::zero();
+        std::vector<ShardReport> reports;
+
+        TimerReport(unsigned shards) : reports(shards) {}
+      };
+
+      void start_report() {
+        seastar::promise<> pr_report;
+        fut_report = pr_report.get_future();
+        seastar::do_with(
+            TimerReport(seastar::smp::count),
+            [this](auto &report) {
+          return seastar::do_until(
+            [this] { return is_stopped; },
+            [&report, this] {
+              return seastar::sleep(2s
+              ).then([&report, this] {
+                report.elapsed += 2;
+                if (is_fixed_cpu) {
+                  return seastar::smp::submit_to(msgr_sid,
+                      [&report, this] {
+                    auto &server = container().local();
+                    server.get_report(report.reports[seastar::this_shard_id()]);
+                  }).then([&report, this] {
+                    auto now = mono_clock::now();
+                    auto prv = report.start_time;
+                    report.start_time = now;
+                    if (prv == mono_clock::zero()) {
+                      // cannot compute duration
+                      return;
+                    }
+                    std::chrono::duration<double> duration_d = now - prv;
+                    double duration = duration_d.count();
+                    auto &ireport = report.reports[msgr_sid];
+                    double iops = ireport.msg_count_interval / duration;
+                    double throughput_MB = -1;
+                    if (ireport.msg_size >= 0) {
+                      throughput_MB = iops * ireport.msg_size / 1048576;
+                    }
+                    std::ostringstream sout;
+                    sout << setfill(' ')
+                         << report.elapsed
+                         << "(" << std::setw(5) << duration << ") "
+                         << std::setw(9) << iops << "IOPS "
+                         << std::setw(8) << throughput_MB << "MiB/s "
+                         << ireport.reactor_utilization
+                         << "(" << ireport.conn_count << ")";
+                    std::cout << sout.str() << std::endl;
+                  });
+                } else {
+                  return seastar::smp::invoke_on_all([&report, this] {
+                    auto &server = container().local();
+                    server.get_report(report.reports[seastar::this_shard_id()]);
+                  }).then([&report, this] {
+                    auto now = mono_clock::now();
+                    auto prv = report.start_time;
+                    report.start_time = now;
+                    if (prv == mono_clock::zero()) {
+                      // cannot compute duration
+                      return;
+                    }
+                    std::chrono::duration<double> duration_d = now - prv;
+                    double duration = duration_d.count();
+                    unsigned num_msgs = 0;
+                    // -1 means unavailable, -2 means mismatch
+                    int msg_size = -1;
+                    for (auto &i : report.reports) {
+                      if (i.msg_size >= 0) {
+                        if (msg_size == -2) {
+                          // pass
+                        } else if (msg_size == -1) {
+                          msg_size = i.msg_size;
+                        } else {
+                          if (msg_size != i.msg_size) {
+                            msg_size = -2;
+                          }
+                        }
+                      }
+                      num_msgs += i.msg_count_interval;
+                    }
+                    double iops = num_msgs / duration;
+                    double throughput_MB = msg_size;
+                    if (msg_size >= 0) {
+                      throughput_MB = iops * msg_size / 1048576;
+                    }
+                    std::ostringstream sout;
+                    sout << setfill(' ')
+                         << report.elapsed
+                         << "(" << std::setw(5) << duration << ") "
+                         << std::setw(9) << iops << "IOPS "
+                         << std::setw(8) << throughput_MB << "MiB/s ";
+                    for (auto &i : report.reports) {
+                      sout << i.reactor_utilization
+                           << "(" << i.conn_count << ") ";
+                    }
+                    std::cout << sout.str() << std::endl;
+                  });
+                }
+              });
+            }
+          );
+        }).then([this] {
+          logger().info("report is stopped!");
+        }).forward_to(std::move(pr_report));
       }
     };
 
@@ -223,106 +436,212 @@ static seastar::future<> run(
         unsigned start_count = 0u;
 
         unsigned sampled_count = 0u;
-        double total_lat_s = 0.0;
+        double sampled_total_lat_s = 0.0;
 
         // for reporting only
         mono_time finish_time = mono_clock::zero();
 
-        void start() {
+        void start_connecting() {
+          connecting_time = mono_clock::now();
+        }
+
+        void finish_connecting() {
+          ceph_assert_always(connected_time == mono_clock::zero());
+          connected_time = mono_clock::now();
+        }
+
+        void start_collect() {
+          ceph_assert_always(connected_time != mono_clock::zero());
           start_time = mono_clock::now();
           start_count = received_count;
           sampled_count = 0u;
-          total_lat_s = 0.0;
+          sampled_total_lat_s = 0.0;
           finish_time = mono_clock::zero();
         }
+
+        void prepare_summary(const ConnStats &current) {
+          *this = current;
+          finish_time = mono_clock::now();
+        }
       };
-      ConnStats conn_stats;
 
       struct PeriodStats {
         mono_time start_time = mono_clock::zero();
         unsigned start_count = 0u;
         unsigned sampled_count = 0u;
-        double total_lat_s = 0.0;
+        double sampled_total_lat_s = 0.0;
 
         // for reporting only
         mono_time finish_time = mono_clock::zero();
         unsigned finish_count = 0u;
         unsigned depth = 0u;
 
-        void reset(unsigned received_count, PeriodStats* snap = nullptr) {
-          if (snap) {
-            snap->start_time = start_time;
-            snap->start_count = start_count;
-            snap->sampled_count = sampled_count;
-            snap->total_lat_s = total_lat_s;
-            snap->finish_time = mono_clock::now();
-            snap->finish_count = received_count;
-          }
+        void start_collect(unsigned received_count) {
           start_time = mono_clock::now();
           start_count = received_count;
           sampled_count = 0u;
-          total_lat_s = 0.0;
+          sampled_total_lat_s = 0.0;
+        }
+
+        void reset_period(
+            unsigned received_count, unsigned _depth, PeriodStats &snapshot) {
+          snapshot.start_time = start_time;
+          snapshot.start_count = start_count;
+          snapshot.sampled_count = sampled_count;
+          snapshot.sampled_total_lat_s = sampled_total_lat_s;
+          snapshot.finish_time = mono_clock::now();
+          snapshot.finish_count = received_count;
+          snapshot.depth = _depth;
+
+          start_collect(received_count);
+        }
+      };
+
+      struct JobReport {
+        std::string name;
+        unsigned depth = 0;
+        double connect_time_s = 0;
+        unsigned total_msgs = 0;
+        double messaging_time_s = 0;
+        double latency_ms = 0;
+        double iops = 0;
+        double throughput_mbps = 0;
+
+        void account(const JobReport &stats) {
+          depth += stats.depth;
+          connect_time_s += stats.connect_time_s;
+          total_msgs += stats.total_msgs;
+          messaging_time_s += stats.messaging_time_s;
+          latency_ms += stats.latency_ms;
+          iops += stats.iops;
+          throughput_mbps += stats.throughput_mbps;
+        }
+
+        void report() const {
+          auto str = fmt::format(
+            "{}(depth={}):\n"
+            "  connect time: {:08f}s\n"
+            "  messages received: {}\n"
+            "  messaging time: {:08f}s\n"
+            "  latency: {:08f}ms\n"
+            "  IOPS: {:08f}\n"
+            "  out throughput: {:08f}MB/s",
+            name, depth, connect_time_s,
+            total_msgs, messaging_time_s,
+            latency_ms, iops,
+            throughput_mbps);
+          std::cout << str << std::endl;
+        }
+      };
+
+      struct ConnectionPriv : public crimson::net::Connection::user_private_t {
+        unsigned index;
+        ConnectionPriv(unsigned i) : index{i} {}
+      };
+
+      struct ConnState {
+        crimson::net::MessengerRef msgr;
+        ConnStats conn_stats;
+        PeriodStats period_stats;
+        seastar::semaphore depth;
+        std::vector<lowres_clock_t::time_point> time_msgs_sent;
+        unsigned sent_count = 0u;
+        crimson::net::ConnectionRef active_conn;
+        bool stop_send = false;
+        seastar::promise<JobReport> stopped_send_promise;
+
+        ConnState(std::size_t _depth)
+          : depth{_depth},
+            time_msgs_sent{_depth, lowres_clock_t::time_point::min()} {}
+
+        unsigned get_current_units() const {
+          ceph_assert(depth.available_units() >= 0);
+          return depth.current();
+        }
+
+        seastar::future<JobReport> stop_dispatch_messages() {
+          stop_send = true;
+          depth.broken(DepthBroken());
+          return stopped_send_promise.get_future();
         }
       };
-      PeriodStats period_stats;
 
       const seastar::shard_id sid;
-      std::string lname;
+      const unsigned id;
+      const std::optional<unsigned> server_sid;
 
-      const unsigned jobs;
-      crimson::net::MessengerRef msgr;
+      const unsigned num_clients;
+      const unsigned num_conns;
       const unsigned msg_len;
       bufferlist msg_data;
       const unsigned nr_depth;
-      seastar::semaphore depth;
-      std::vector<mono_time> time_msgs_sent;
+      const unsigned nonce_base;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      unsigned sent_count = 0u;
-      crimson::net::ConnectionRef active_conn = nullptr;
+      std::vector<ConnState> conn_states;
 
-      bool stop_send = false;
-      seastar::promise<> stopped_send_promise;
-
-      Client(unsigned jobs, unsigned msg_len, unsigned depth)
+      Client(unsigned num_clients,
+             unsigned num_conns,
+             unsigned msg_len,
+             unsigned _depth,
+             unsigned nonce_base,
+             std::optional<unsigned> server_sid)
         : sid{seastar::this_shard_id()},
-          jobs{jobs},
+          id{sid + num_clients - seastar::smp::count},
+          server_sid{server_sid},
+          num_clients{num_clients},
+          num_conns{num_conns},
           msg_len{msg_len},
-          nr_depth{depth/jobs},
-          depth{nr_depth},
-          time_msgs_sent{depth/jobs, mono_clock::zero()} {
-        lname = "client#";
-        lname += std::to_string(sid);
+          nr_depth{_depth},
+          nonce_base{nonce_base} {
+        if (is_active()) {
+          for (unsigned i = 0; i < num_conns; ++i) {
+            conn_states.emplace_back(nr_depth);
+          }
+        }
         msg_data.append_zero(msg_len);
       }
 
-      unsigned get_current_depth() const {
-        ceph_assert(depth.available_units() >= 0);
-        return nr_depth - depth.current();
+      std::string get_name(unsigned i) {
+        return fmt::format("client{}Conn{}@{}", id, i, sid);
       }
 
-      void ms_handle_connect(crimson::net::ConnectionRef conn) override {
-        conn_stats.connected_time = mono_clock::now();
+      void ms_handle_connect(
+          crimson::net::ConnectionRef conn,
+          seastar::shard_id prv_shard) override {
+        ceph_assert_always(prv_shard == seastar::this_shard_id());
+        assert(is_active());
+        unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+        auto &conn_state = conn_states[index];
+        conn_state.conn_stats.finish_connecting();
       }
+
       std::optional<seastar::future<>> ms_dispatch(
-          crimson::net::ConnectionRef, MessageRef m) override {
+          crimson::net::ConnectionRef conn, MessageRef m) override {
+        assert(is_active());
         // server replies with MOSDOp to generate server-side write workload
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
+        unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+        assert(index < num_conns);
+        auto &conn_state = conn_states[index];
+
         auto msg_id = m->get_tid();
         if (msg_id % SAMPLE_RATE == 0) {
-          auto index = msg_id % time_msgs_sent.size();
-          ceph_assert(time_msgs_sent[index] != mono_clock::zero());
-          std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
-          conn_stats.total_lat_s += cur_latency.count();
-          ++(conn_stats.sampled_count);
-          period_stats.total_lat_s += cur_latency.count();
-          ++(period_stats.sampled_count);
-          time_msgs_sent[index] = mono_clock::zero();
+          auto msg_index = msg_id % conn_state.time_msgs_sent.size();
+          ceph_assert(conn_state.time_msgs_sent[msg_index] !=
+              lowres_clock_t::time_point::min());
+          std::chrono::duration<double> cur_latency =
+              lowres_clock_t::now() - conn_state.time_msgs_sent[msg_index];
+          conn_state.conn_stats.sampled_total_lat_s += cur_latency.count();
+          ++(conn_state.conn_stats.sampled_count);
+          conn_state.period_stats.sampled_total_lat_s += cur_latency.count();
+          ++(conn_state.period_stats.sampled_count);
+          conn_state.time_msgs_sent[msg_index] = lowres_clock_t::time_point::min();
         }
 
-        ++(conn_stats.received_count);
-        depth.signal(1);
+        ++(conn_state.conn_stats.received_count);
+        conn_state.depth.signal(1);
 
         return {seastar::now()};
       }
@@ -330,49 +649,115 @@ static seastar::future<> run(
       // should start messenger at this shard?
       bool is_active() {
         ceph_assert(seastar::this_shard_id() == sid);
-        return sid != 0 && sid <= jobs;
+        return sid + num_clients >= seastar::smp::count;
       }
 
       seastar::future<> init() {
-        return container().invoke_on_all([] (auto& client) {
+        return container().invoke_on_all([](auto& client) {
           if (client.is_active()) {
-            client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
-            client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-            client.msgr->set_auth_client(&client.dummy_auth);
-            client.msgr->set_auth_server(&client.dummy_auth);
-            return client.msgr->start({&client});
+            return seastar::do_for_each(
+                boost::make_counting_iterator(0u),
+                boost::make_counting_iterator(client.num_conns),
+                [&client](auto i) {
+              auto &conn_state = client.conn_states[i];
+              std::string name = client.get_name(i);
+              conn_state.msgr = crimson::net::Messenger::create(
+                  entity_name_t::OSD(client.id * client.num_conns + i),
+                  name, client.nonce_base + client.id * client.num_conns + i, true);
+              conn_state.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+              conn_state.msgr->set_auth_client(&client.dummy_auth);
+              conn_state.msgr->set_auth_server(&client.dummy_auth);
+              return conn_state.msgr->start({&client});
+            });
           }
           return seastar::now();
         });
       }
 
       seastar::future<> shutdown() {
-        return container().invoke_on_all([] (auto& client) {
-          if (client.is_active()) {
-            logger().info("{} shutdown...", client.lname);
-            ceph_assert(client.msgr);
-            client.msgr->stop();
-            return client.msgr->shutdown().then([&client] {
-              return client.stop_dispatch_messages();
+        return seastar::do_with(
+            std::vector<JobReport>(num_clients * num_conns),
+            [this](auto &all_stats) {
+          return container().invoke_on_all([&all_stats](auto& client) {
+            if (!client.is_active()) {
+              return seastar::now();
+            }
+
+            return seastar::parallel_for_each(
+                boost::make_counting_iterator(0u),
+                boost::make_counting_iterator(client.num_conns),
+                [&all_stats, &client](auto i) {
+              logger().info("{} shutdown...", client.get_name(i));
+              auto &conn_state = client.conn_states[i];
+              return conn_state.stop_dispatch_messages(
+              ).then([&all_stats, &client, i](auto stats) {
+                all_stats[client.id * client.num_conns + i] = stats;
+              });
+            }).then([&client] {
+              return seastar::do_for_each(
+                  boost::make_counting_iterator(0u),
+                  boost::make_counting_iterator(client.num_conns),
+                  [&client](auto i) {
+                auto &conn_state = client.conn_states[i];
+                ceph_assert(conn_state.msgr);
+                conn_state.msgr->stop();
+                return conn_state.msgr->shutdown();
+              });
             });
-          }
-          return seastar::now();
+          }).then([&all_stats, this] {
+            auto nr_jobs = all_stats.size();
+            JobReport summary;
+            std::vector<JobReport> clients(num_clients);
+
+            for (unsigned i = 0; i < nr_jobs; ++i) {
+              auto &stats = all_stats[i];
+              stats.report();
+              clients[i / num_conns].account(stats);
+              summary.account(stats);
+            }
+
+            std::cout << std::endl;
+            std::cout << "per client:" << std::endl;
+            for (unsigned i = 0; i < num_clients; ++i) {
+              auto &stats = clients[i];
+              stats.name = fmt::format("client{}", i);
+              stats.connect_time_s /= num_conns;
+              stats.messaging_time_s /= num_conns;
+              stats.latency_ms /= num_conns;
+              stats.report();
+            }
+
+            std::cout << std::endl;
+            summary.name = fmt::format("all", nr_jobs);
+            summary.connect_time_s /= nr_jobs;
+            summary.messaging_time_s /= nr_jobs;
+            summary.latency_ms /= nr_jobs;
+            summary.report();
+          });
         });
       }
 
       seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
-        return container().invoke_on_all([peer_addr] (auto& client) {
-          // start clients in active cores (#1 ~ #jobs)
+        return container().invoke_on_all([peer_addr](auto& client) {
+          // start clients in active cores
           if (client.is_active()) {
-            mono_time start_time = mono_clock::now();
-            client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              auto &conn_state = client.conn_states[i];
+              conn_state.conn_stats.start_connecting();
+              conn_state.active_conn = conn_state.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+              conn_state.active_conn->set_user_private(
+                  std::make_unique<ConnectionPriv>(i));
+            }
             // make sure handshake won't hurt the performance
-            return seastar::sleep(1s).then([&client, start_time] {
-              if (client.conn_stats.connected_time == mono_clock::zero()) {
-                logger().error("\n{} not connected after 1s!\n", client.lname);
-                ceph_assert(false);
+            return seastar::sleep(1s).then([&client] {
+              for (unsigned i = 0; i < client.num_conns; ++i) {
+                auto &conn_state = client.conn_states[i];
+                if (conn_state.conn_stats.connected_time == mono_clock::zero()) {
+                  logger().error("\n{} not connected after 1s!\n",
+                                 client.get_name(i));
+                  ceph_assert(false);
+                }
               }
-              client.conn_stats.connecting_time = start_time;
             });
           }
           return seastar::now();
@@ -382,34 +767,43 @@ static seastar::future<> run(
      private:
       class TimerReport {
        private:
-        const unsigned jobs;
+        const unsigned num_clients;
+        const unsigned num_conns;
         const unsigned msgtime;
         const unsigned bytes_of_block;
 
         unsigned elapsed = 0u;
-        std::vector<mono_time> start_times;
         std::vector<PeriodStats> snaps;
         std::vector<ConnStats> summaries;
+        std::vector<double> client_reactor_utilizations;
+        std::optional<double> server_reactor_utilization;
 
        public:
-        TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
-          : jobs{jobs},
+        TimerReport(unsigned num_clients, unsigned num_conns, unsigned msgtime, unsigned bs)
+          : num_clients{num_clients},
+            num_conns{num_conns},
             msgtime{msgtime},
             bytes_of_block{bs},
-            start_times{jobs, mono_clock::zero()},
-            snaps{jobs},
-            summaries{jobs} {}
+            snaps{num_clients * num_conns},
+            summaries{num_clients * num_conns},
+            client_reactor_utilizations(num_clients) {}
 
         unsigned get_elapsed() const { return elapsed; }
 
-        PeriodStats& get_snap_by_job(seastar::shard_id sid) {
-          ceph_assert(sid >= 1 && sid <= jobs);
-          return snaps[sid - 1];
+        PeriodStats& get_snap(unsigned client_id, unsigned i) {
+          return snaps[client_id * num_conns + i];
         }
 
-        ConnStats& get_summary_by_job(seastar::shard_id sid) {
-          ceph_assert(sid >= 1 && sid <= jobs);
-          return summaries[sid - 1];
+        ConnStats& get_summary(unsigned client_id, unsigned i) {
+          return summaries[client_id * num_conns + i];
+        }
+
+        void set_client_reactor_utilization(unsigned client_id, double ru) {
+          client_reactor_utilizations[client_id] = ru;
+        }
+
+        void set_server_reactor_utilization(double ru) {
+          server_reactor_utilization = ru;
         }
 
         bool should_stop() const {
@@ -422,45 +816,50 @@ static seastar::future<> run(
           });
         }
 
-        void report_header() {
+        void report_header() const {
           std::ostringstream sout;
           sout << std::setfill(' ')
-               << std::setw(7) << "sec"
-               << std::setw(6) << "depth"
-               << std::setw(8) << "IOPS"
-               << std::setw(8) << "MB/s"
-               << std::setw(8) << "lat(ms)";
+               << std::setw(6) << "sec"
+               << std::setw(7) << "depth"
+               << std::setw(10) << "IOPS"
+               << std::setw(9) << "MB/s"
+               << std::setw(9) << "lat(ms)";
           std::cout << sout.str() << std::endl;
         }
 
         void report_period() {
-          if (elapsed == 1) {
-            // init this->start_times at the first period
-            for (unsigned i=0; i<jobs; ++i) {
-              start_times[i] = snaps[i].start_time;
-            }
-          }
           std::chrono::duration<double> elapsed_d = 0s;
           unsigned depth = 0u;
           unsigned ops = 0u;
           unsigned sampled_count = 0u;
-          double total_lat_s = 0.0;
+          double sampled_total_lat_s = 0.0;
           for (const auto& snap: snaps) {
             elapsed_d += (snap.finish_time - snap.start_time);
             depth += snap.depth;
             ops += (snap.finish_count - snap.start_count);
             sampled_count += snap.sampled_count;
-            total_lat_s += snap.total_lat_s;
+            sampled_total_lat_s += snap.sampled_total_lat_s;
           }
-          double elapsed_s = elapsed_d.count() / jobs;
+          double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
           double iops = ops/elapsed_s;
           std::ostringstream sout;
           sout << setfill(' ')
-               << std::setw(7) << elapsed_s
+               << std::setw(5) << elapsed_s
+               << " "
                << std::setw(6) << depth
-               << std::setw(8) << iops
+               << " "
+               << std::setw(9) << iops
+               << " "
                << std::setw(8) << iops * bytes_of_block / 1048576
-               << std::setw(8) << (total_lat_s / sampled_count * 1000);
+               << " "
+               << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
+               << " -- ";
+          if (server_reactor_utilization.has_value()) {
+            sout << *server_reactor_utilization << " -- ";
+          }
+          for (double cru : client_reactor_utilizations) {
+            sout << cru << ",";
+          }
           std::cout << sout.str() << std::endl;
         }
 
@@ -468,14 +867,14 @@ static seastar::future<> run(
           std::chrono::duration<double> elapsed_d = 0s;
           unsigned ops = 0u;
           unsigned sampled_count = 0u;
-          double total_lat_s = 0.0;
+          double sampled_total_lat_s = 0.0;
           for (const auto& summary: summaries) {
             elapsed_d += (summary.finish_time - summary.start_time);
             ops += (summary.received_count - summary.start_count);
             sampled_count += summary.sampled_count;
-            total_lat_s += summary.total_lat_s;
+            sampled_total_lat_s += summary.sampled_total_lat_s;
           }
-          double elapsed_s = elapsed_d.count() / jobs;
+          double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
           double iops = ops / elapsed_s;
           std::ostringstream sout;
           sout << "--------------"
@@ -486,7 +885,7 @@ static seastar::future<> run(
                << std::setw(6) << "-"
                << std::setw(8) << iops
                << std::setw(8) << iops * bytes_of_block / 1048576
-               << std::setw(8) << (total_lat_s / sampled_count * 1000)
+               << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
                << "\n";
           std::cout << sout.str() << std::endl;
         }
@@ -495,10 +894,20 @@ static seastar::future<> run(
       seastar::future<> report_period(TimerReport& report) {
         return container().invoke_on_all([&report] (auto& client) {
           if (client.is_active()) {
-            PeriodStats& snap = report.get_snap_by_job(client.sid);
-            client.period_stats.reset(client.conn_stats.received_count,
-                                      &snap);
-            snap.depth = client.get_current_depth();
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              auto &conn_state = client.conn_states[i];
+              PeriodStats& snap = report.get_snap(client.id, i);
+              conn_state.period_stats.reset_period(
+                  conn_state.conn_stats.received_count,
+                  client.nr_depth - conn_state.get_current_units(),
+                  snap);
+            }
+            report.set_client_reactor_utilization(client.id, get_reactor_utilization());
+          }
+          if (client.server_sid.has_value() &&
+              seastar::this_shard_id() == *client.server_sid) {
+            assert(!client.is_active());
+            report.set_server_reactor_utilization(get_reactor_utilization());
           }
         }).then([&report] {
           report.report_period();
@@ -508,9 +917,11 @@ static seastar::future<> run(
       seastar::future<> report_summary(TimerReport& report) {
         return container().invoke_on_all([&report] (auto& client) {
           if (client.is_active()) {
-            ConnStats& summary = report.get_summary_by_job(client.sid);
-            summary = client.conn_stats;
-            summary.finish_time = mono_clock::now();
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              auto &conn_state = client.conn_states[i];
+              ConnStats& summary = report.get_summary(client.id, i);
+              summary.prepare_summary(conn_state.conn_stats);
+            }
           }
         }).then([&report] {
           report.report_summary();
@@ -519,10 +930,13 @@ static seastar::future<> run(
 
      public:
       seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
-        logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
+        logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns",
+                      num_clients, num_conns);
         return container().invoke_on_all([] (auto& client) {
           if (client.is_active()) {
-            client.do_dispatch_messages(client.active_conn.get());
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              client.do_dispatch_messages(i);
+            }
           }
         }).then([ramptime] {
           logger().info("[all clients]: ramping up {} seconds...", ramptime);
@@ -530,14 +944,18 @@ static seastar::future<> run(
         }).then([this] {
           return container().invoke_on_all([] (auto& client) {
             if (client.is_active()) {
-              client.conn_stats.start();
-              client.period_stats.reset(client.conn_stats.received_count);
+              for (unsigned i = 0; i < client.num_conns; ++i) {
+                auto &conn_state = client.conn_states[i];
+                conn_state.conn_stats.start_collect();
+                conn_state.period_stats.start_collect(conn_state.conn_stats.received_count);
+              }
             }
           });
         }).then([this, msgtime] {
           logger().info("[all clients]: reporting {} seconds...\n", msgtime);
           return seastar::do_with(
-              TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
+              TimerReport(num_clients, num_conns, msgtime, msg_len),
+              [this](auto& report) {
             report.report_header();
             return seastar::do_until(
               [&report] { return report.should_stop(); },
@@ -567,9 +985,11 @@ static seastar::future<> run(
       }
 
      private:
-      seastar::future<> send_msg(crimson::net::Connection* conn) {
+      seastar::future<> send_msg(ConnState &conn_state) {
         ceph_assert(seastar::this_shard_id() == sid);
-        return depth.wait(1).then([this, conn] {
+        conn_state.sent_count += 1;
+        return conn_state.depth.wait(1
+        ).then([this, &conn_state] {
           const static pg_t pgid;
           const static object_locator_t oloc;
           const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
@@ -579,89 +999,132 @@ static seastar::future<> run(
           bufferlist data(msg_data);
           m->write(0, msg_len, data);
           // use tid as the identity of each round
-          m->set_tid(sent_count);
+          m->set_tid(conn_state.sent_count);
 
           // sample message latency
-          if (sent_count % SAMPLE_RATE == 0) {
-            auto index = sent_count % time_msgs_sent.size();
-            ceph_assert(time_msgs_sent[index] == mono_clock::zero());
-            time_msgs_sent[index] = mono_clock::now();
+          if (unlikely(conn_state.sent_count % SAMPLE_RATE == 0)) {
+            auto index = conn_state.sent_count % conn_state.time_msgs_sent.size();
+            ceph_assert(conn_state.time_msgs_sent[index] ==
+                        lowres_clock_t::time_point::min());
+            conn_state.time_msgs_sent[index] = lowres_clock_t::now();
           }
 
-          return conn->send(std::move(m));
+          return conn_state.active_conn->send(std::move(m));
         });
       }
 
       class DepthBroken: public std::exception {};
 
-      seastar::future<> stop_dispatch_messages() {
-        stop_send = true;
-        depth.broken(DepthBroken());
-        return stopped_send_promise.get_future();
+      seastar::future<JobReport> stop_dispatch_messages(unsigned i) {
+        auto &conn_state = conn_states[i];
+        conn_state.stop_send = true;
+        conn_state.depth.broken(DepthBroken());
+        return conn_state.stopped_send_promise.get_future();
       }
 
-      void do_dispatch_messages(crimson::net::Connection* conn) {
+      void do_dispatch_messages(unsigned i) {
         ceph_assert(seastar::this_shard_id() == sid);
-        ceph_assert(sent_count == 0);
-        conn_stats.start_time = mono_clock::now();
+        auto &conn_state = conn_states[i];
+        ceph_assert(conn_state.sent_count == 0);
+        conn_state.conn_stats.start_time = mono_clock::now();
         // forwarded to stopped_send_promise
         (void) seastar::do_until(
-          [this] { return stop_send; },
-          [this, conn] {
-            sent_count += 1;
-            return send_msg(conn);
-          }
+          [&conn_state] { return conn_state.stop_send; },
+          [this, &conn_state] { return send_msg(conn_state); }
         ).handle_exception_type([] (const DepthBroken& e) {
           // ok, stopped by stop_dispatch_messages()
-        }).then([this, conn] {
-          std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
-          std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
-          unsigned ops = conn_stats.received_count - conn_stats.start_count;
-          logger().info("{}: stopped sending OSDOPs.\n"
-                        "{}(depth={}):\n"
-                        "  connect time: {}s\n"
-                        "  messages received: {}\n"
-                        "  messaging time: {}s\n"
-                        "  latency: {}ms\n"
-                        "  IOPS: {}\n"
-                        "  throughput: {}MB/s\n",
-                        *conn,
-                        lname,
-                        nr_depth,
-                        dur_conn.count(),
-                        ops,
-                        dur_msg.count(),
-                        conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
-                        ops / dur_msg.count(),
-                        ops / dur_msg.count() * msg_len / 1048576);
-          stopped_send_promise.set_value();
+        }).then([this, &conn_state, i] {
+          std::string name = get_name(i);
+          logger().info("{} {}: stopped sending OSDOPs",
+                        name, *conn_state.active_conn);
+
+          std::chrono::duration<double> dur_conn =
+              conn_state.conn_stats.connected_time -
+              conn_state.conn_stats.connecting_time;
+          std::chrono::duration<double> dur_msg =
+              mono_clock::now() - conn_state.conn_stats.start_time;
+          unsigned ops =
+              conn_state.conn_stats.received_count -
+              conn_state.conn_stats.start_count;
+
+          JobReport stats;
+          stats.name = name;
+          stats.depth = nr_depth;
+          stats.connect_time_s = dur_conn.count();
+          stats.total_msgs = ops;
+          stats.messaging_time_s = dur_msg.count();
+          stats.latency_ms =
+              conn_state.conn_stats.sampled_total_lat_s /
+              conn_state.conn_stats.sampled_count * 1000;
+          stats.iops = ops / dur_msg.count();
+          stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576;
+
+          conn_state.stopped_send_promise.set_value(stats);
         });
       }
     };
   };
 
+  std::optional<unsigned> server_sid;
+  bool server_needs_report = false;
+  if (mode == perf_mode_t::both) {
+    ceph_assert(server_conf.is_fixed_cpu == true);
+    server_sid = server_conf.core;
+  } else if (mode == perf_mode_t::server) {
+    server_needs_report = true;
+  }
   return seastar::when_all(
-      test_state::Server::create(server_conf.core, server_conf.block_size),
-      create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
-      crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
-        return crimson::common::local_conf().start();
-      }).then([crc_enabled] {
-        return crimson::common::local_conf().set_val(
-            "ms_crc_data", crc_enabled ? "true" : "false");
-      })
+    seastar::futurize_invoke([mode, server_conf, server_needs_report] {
+      if (mode == perf_mode_t::client) {
+        return seastar::make_ready_future<test_state::Server*>(nullptr);
+      } else {
+        return create_sharded<test_state::Server>(
+          server_conf.core,
+          server_conf.block_size,
+          server_needs_report);
+      }
+    }),
+    seastar::futurize_invoke([mode, client_conf, server_sid] {
+      if (mode == perf_mode_t::server) {
+        return seastar::make_ready_future<test_state::Client*>(nullptr);
+      } else {
+        unsigned nonce_base = ceph::util::generate_random_number<unsigned>();
+        logger().info("client nonce_base={}", nonce_base);
+        return create_sharded<test_state::Client>(
+          client_conf.num_clients,
+          client_conf.num_conns,
+          client_conf.block_size,
+          client_conf.depth,
+          nonce_base,
+          server_sid);
+      }
+    }),
+    crimson::common::sharded_conf().start(
+      EntityName{}, std::string_view{"ceph"}
+    ).then([] {
+      return crimson::common::local_conf().start();
+    }).then([crc_enabled] {
+      return crimson::common::local_conf().set_val(
+          "ms_crc_data", crc_enabled ? "true" : "false");
+    })
   ).then([=](auto&& ret) {
-    auto fp_server = std::move(std::get<0>(ret).get0());
+    auto server = std::move(std::get<0>(ret).get0());
     auto client = std::move(std::get<1>(ret).get0());
-    test_state::Server* server = fp_server.get();
+    // reserve core 0 for potentially better performance
     if (mode == perf_mode_t::both) {
-      logger().info("\nperf settings:\n  {}\n  {}\n",
-                    client_conf.str(), server_conf.str());
-      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
-      ceph_assert(client_conf.jobs > 0);
-      ceph_assert(seastar::smp::count >= 1+server_conf.core);
-      ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
+      logger().info("\nperf settings:\n  smp={}\n  {}\n  {}\n",
+                    seastar::smp::count, client_conf.str(), server_conf.str());
+      if (client_conf.skip_core_0) {
+        ceph_assert(seastar::smp::count > client_conf.num_clients);
+      } else {
+        ceph_assert(seastar::smp::count >= client_conf.num_clients);
+      }
+      ceph_assert(client_conf.num_clients > 0);
+      ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients);
       return seastar::when_all_succeed(
-        server->init(server_conf.addr),
+        // it is not reasonable to allow server/client to shared cores for
+        // performance benchmarking purposes.
+        server->init(server_conf.addr, server_conf.is_fixed_cpu),
         client->init()
       ).then_unpack([client, addr = client_conf.server_addr] {
         return client->connect_wait_verify(addr);
@@ -670,13 +1133,18 @@ static seastar::future<> run(
         return client->dispatch_with_timer(ramptime, msgtime);
       }).then([client] {
         return client->shutdown();
-      }).then([server, fp_server = std::move(fp_server)] () mutable {
-        return server->shutdown().then([cleanup = std::move(fp_server)] {});
+      }).then([server] {
+        return server->shutdown();
       });
     } else if (mode == perf_mode_t::client) {
-      logger().info("\nperf settings:\n  {}\n", client_conf.str());
-      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
-      ceph_assert(client_conf.jobs > 0);
+      logger().info("\nperf settings:\n  smp={}\n  {}\n",
+                    seastar::smp::count, client_conf.str());
+      if (client_conf.skip_core_0) {
+        ceph_assert(seastar::smp::count > client_conf.num_clients);
+      } else {
+        ceph_assert(seastar::smp::count >= client_conf.num_clients);
+      }
+      ceph_assert(client_conf.num_clients > 0);
       return client->init(
       ).then([client, addr = client_conf.server_addr] {
         return client->connect_wait_verify(addr);
@@ -687,15 +1155,15 @@ static seastar::future<> run(
         return client->shutdown();
       });
     } else { // mode == perf_mode_t::server
-      ceph_assert(seastar::smp::count >= 1+server_conf.core);
-      logger().info("\nperf settings:\n  {}\n", server_conf.str());
-      return server->init(server_conf.addr
-      // dispatch ops
-      ).then([server] {
-        return server->wait();
-      // shutdown
-      }).then([server, fp_server = std::move(fp_server)] () mutable {
-        return server->shutdown().then([cleanup = std::move(fp_server)] {});
+      ceph_assert(seastar::smp::count > server_conf.core);
+      logger().info("\nperf settings:\n  smp={}\n  {}\n",
+                    seastar::smp::count, server_conf.str());
+      return seastar::async([server, server_conf] {
+        // FIXME: SIGINT is not received by stop_signal
+        seastar_apps_lib::stop_signal should_stop;
+        server->init(server_conf.addr, server_conf.is_fixed_cpu).get();
+        should_stop.wait().get();
+        server->shutdown().get();
       });
     }
   }).finally([] {
@@ -711,21 +1179,27 @@ int main(int argc, char** argv)
   app.add_options()
     ("mode", bpo::value<unsigned>()->default_value(0),
      "0: both, 1:client, 2:server")
-    ("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+    ("server-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
      "server address(only support msgr v2 protocol)")
     ("ramptime", bpo::value<unsigned>()->default_value(5),
      "seconds of client ramp-up time")
     ("msgtime", bpo::value<unsigned>()->default_value(15),
      "seconds of client messaging time")
-    ("jobs", bpo::value<unsigned>()->default_value(1),
-     "number of client jobs (messengers)")
-    ("cbs", bpo::value<unsigned>()->default_value(4096),
+    ("clients", bpo::value<unsigned>()->default_value(1),
+     "number of client messengers")
+    ("conns-per-client", bpo::value<unsigned>()->default_value(1),
+     "number of connections per client")
+    ("client-bs", bpo::value<unsigned>()->default_value(4096),
      "client block size")
     ("depth", bpo::value<unsigned>()->default_value(512),
-     "client io depth")
-    ("core", bpo::value<unsigned>()->default_value(0),
-     "server running core")
-    ("sbs", bpo::value<unsigned>()->default_value(0),
+     "client io depth per job")
+    ("client-skip-core-0", bpo::value<bool>()->default_value(true),
+     "client skip core 0")
+    ("server-fixed-cpu", bpo::value<bool>()->default_value(true),
+     "server is in the fixed cpu mode, non-fixed doesn't support the mode both")
+    ("server-core", bpo::value<unsigned>()->default_value(1),
+     "server messenger running core")
+    ("server-bs", bpo::value<unsigned>()->default_value(0),
      "server block size")
     ("crc-enabled", bpo::value<bool>()->default_value(false),
      "enable CRC checks");