]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/crimson/perf_crimson_msgr.cc
update download target update for octopus release
[ceph.git] / ceph / src / test / crimson / perf_crimson_msgr.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <map>
5#include <random>
6#include <boost/program_options.hpp>
7
8#include <seastar/core/app-template.hh>
9#include <seastar/core/do_with.hh>
10#include <seastar/core/future-util.hh>
11#include <seastar/core/reactor.hh>
12#include <seastar/core/sleep.hh>
13#include <seastar/core/semaphore.hh>
14
15#include "common/ceph_time.h"
16#include "messages/MOSDOp.h"
17#include "messages/MOSDOpReply.h"
18
19#include "crimson/common/log.h"
20#include "crimson/net/Connection.h"
21#include "crimson/net/Dispatcher.h"
22#include "crimson/net/Messenger.h"
23
24namespace bpo = boost::program_options;
25
26namespace {
27
28template<typename Message>
29using Ref = boost::intrusive_ptr<Message>;
30
31seastar::logger& logger() {
32 return ceph::get_logger(ceph_subsys_ms);
33}
34
35
36enum class perf_mode_t {
37 both,
38 client,
39 server
40};
41
42static std::random_device rd;
43static std::default_random_engine rng{rd()};
44
45static seastar::future<> run(unsigned rounds,
46 double keepalive_ratio,
47 int bs,
48 int depth,
49 std::string addr,
50 perf_mode_t mode)
51{
52 struct test_state {
53 struct Server final
54 : public ceph::net::Dispatcher,
55 public seastar::peering_sharded_service<Server> {
56 ceph::net::Messenger *msgr = nullptr;
57
58 Dispatcher* get_local_shard() override {
59 return &(container().local());
60 }
61 seastar::future<> stop() {
62 return seastar::make_ready_future<>();
63 }
64 seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
65 MessageRef m) override {
66 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
67 // reply
68 Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m);
69 req->finish_decode();
70 return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false });
71 }
72
73 seastar::future<> init(const entity_name_t& name,
74 const std::string& lname,
75 const uint64_t nonce,
76 const entity_addr_t& addr) {
77 auto&& fut = ceph::net::Messenger::create(name, lname, nonce, 1);
78 return fut.then([this, addr](ceph::net::Messenger *messenger) {
79 return container().invoke_on_all([messenger](auto& server) {
80 server.msgr = messenger->get_local_shard();
81 server.msgr->set_crc_header();
82 }).then([messenger, addr] {
83 return messenger->bind(entity_addrvec_t{addr});
84 }).then([this, messenger] {
85 return messenger->start(this);
86 });
87 });
88 }
89 seastar::future<> shutdown() {
90 ceph_assert(msgr);
91 return msgr->shutdown();
92 }
93 };
94
95 struct Client final
96 : public ceph::net::Dispatcher,
97 public seastar::peering_sharded_service<Client> {
98
99 struct PingSession : public seastar::enable_shared_from_this<PingSession> {
100 unsigned count = 0u;
101 mono_time connected_time;
102 mono_time finish_time;
103 };
104 using PingSessionRef = seastar::shared_ptr<PingSession>;
105
106 unsigned rounds;
107 std::bernoulli_distribution keepalive_dist;
108 ceph::net::Messenger *msgr = nullptr;
109 std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
110 std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
111 int msg_len;
112 bufferlist msg_data;
113 seastar::semaphore depth;
114
115 Client(unsigned rounds, double keepalive_ratio, int msg_len, int depth)
116 : rounds(rounds),
117 keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
118 depth(depth) {
119 bufferptr ptr(msg_len);
120 memset(ptr.c_str(), 0, msg_len);
121 msg_data.append(ptr);
122 }
123
124 PingSessionRef find_session(ceph::net::ConnectionRef c) {
125 auto found = sessions.find(c);
126 if (found == sessions.end()) {
127 ceph_assert(false);
128 }
129 return found->second;
130 }
131
132 Dispatcher* get_local_shard() override {
133 return &(container().local());
134 }
135 seastar::future<> stop() {
136 return seastar::now();
137 }
138 seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
139 logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
140 auto session = seastar::make_shared<PingSession>();
141 auto [i, added] = sessions.emplace(conn, session);
142 std::ignore = i;
143 ceph_assert(added);
144 session->connected_time = mono_clock::now();
145 return seastar::now();
146 }
147 seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
148 MessageRef m) override {
149 ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY);
150 depth.signal(1);
151 auto session = find_session(c);
152 ++(session->count);
153
154 if (session->count == rounds) {
155 logger().info("{}: finished receiving {} OPREPLYs", *c.get(), session->count);
156 session->finish_time = mono_clock::now();
157 return container().invoke_on_all([conn = c.get()](auto &client) {
158 auto found = client.pending_conns.find(conn);
159 ceph_assert(found != client.pending_conns.end());
160 found->second.set_value();
161 });
162 } else {
163 return seastar::now();
164 }
165 }
166
167 seastar::future<> init(const entity_name_t& name,
168 const std::string& lname,
169 const uint64_t nonce) {
170 return ceph::net::Messenger::create(name, lname, nonce, 2)
171 .then([this](ceph::net::Messenger *messenger) {
172 return container().invoke_on_all([messenger](auto& client) {
173 client.msgr = messenger->get_local_shard();
174 client.msgr->set_crc_header();
175 }).then([this, messenger] {
176 return messenger->start(this);
177 });
178 });
179 }
180
181 seastar::future<> shutdown() {
182 ceph_assert(msgr);
183 return msgr->shutdown();
184 }
185
186 seastar::future<> dispatch_messages(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
187 mono_time start_time = mono_clock::now();
188 return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
189 .then([this, foreign_dispatch, start_time](auto conn) {
190 return seastar::futurize_apply([this, conn, foreign_dispatch] {
191 if (foreign_dispatch) {
192 return do_dispatch_messages(&**conn);
193 } else {
194 // NOTE: this could be faster if we don't switch cores in do_dispatch_messages().
195 return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
196 return client.do_dispatch_messages(conn);
197 });
198 }
199 }).finally([this, conn, start_time] {
200 return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
201 auto session = client.find_session((*conn)->shared_from_this());
202 std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
203 std::chrono::duration<double> dur_messaging = session->finish_time - session->connected_time;
204 logger().info("{}: handshake {}, messaging {}",
205 **conn, dur_handshake.count(), dur_messaging.count());
206 });
207 });
208 });
209 }
210
211 private:
212 seastar::future<> send_msg(ceph::net::Connection* conn) {
213 return depth.wait(1).then([this, conn] {
214 const static pg_t pgid;
215 const static object_locator_t oloc;
216 const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
217 pgid.pool(), oloc.nspace);
218 static spg_t spgid(pgid);
219 MOSDOp *m = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
220 bufferlist data(msg_data);
221 m->write(0, msg_len, data);
222 MessageRef msg = {m, false};
223 return conn->send(msg);
224 });
225 }
226
227 seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) {
228 return container().invoke_on_all([conn](auto& client) {
229 auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
230 std::ignore = i;
231 ceph_assert(added);
232 }).then([this, conn] {
233 return seastar::do_with(0u, 0u,
234 [this, conn](auto &count_ping, auto &count_keepalive) {
235 return seastar::do_until(
236 [this, conn, &count_ping, &count_keepalive] {
237 bool stop = (count_ping == rounds);
238 if (stop) {
239 logger().info("{}: finished sending {} OSDOPs with {} keepalives",
240 *conn, count_ping, count_keepalive);
241 }
242 return stop;
243 },
244 [this, conn, &count_ping, &count_keepalive] {
245 return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
246 if (keepalive_dist(rng)) {
247 return conn->keepalive()
248 .then([&count_keepalive] {
249 count_keepalive += 1;
250 return seastar::make_ready_future<seastar::stop_iteration>(
251 seastar::stop_iteration::no);
252 });
253 } else {
254 return send_msg(conn)
255 .then([&count_ping] {
256 count_ping += 1;
257 return seastar::make_ready_future<seastar::stop_iteration>(
258 seastar::stop_iteration::yes);
259 });
260 }
261 });
262 }).then([this, conn] {
263 auto found = pending_conns.find(conn);
264 return found->second.get_future();
265 });
266 });
267 });
268 }
269 };
270 };
271
272 return seastar::when_all_succeed(
273 ceph::net::create_sharded<test_state::Server>(),
274 ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio, bs, depth))
275 .then([rounds, keepalive_ratio, addr, mode](test_state::Server *server,
276 test_state::Client *client) {
277 entity_addr_t target_addr;
278 target_addr.parse(addr.c_str(), nullptr);
279 target_addr.set_type(entity_addr_t::TYPE_LEGACY);
280 if (mode == perf_mode_t::both) {
281 return seastar::when_all_succeed(
282 server->init(entity_name_t::OSD(0), "server", 0, target_addr),
283 client->init(entity_name_t::OSD(1), "client", 0))
284 // dispatch pingpoing
285 .then([client, target_addr] {
286 return client->dispatch_messages(target_addr, false);
287 // shutdown
288 }).finally([client] {
289 logger().info("client shutdown...");
290 return client->shutdown();
291 }).finally([server] {
292 logger().info("server shutdown...");
293 return server->shutdown();
294 });
295 } else if (mode == perf_mode_t::client) {
296 return client->init(entity_name_t::OSD(1), "client", 0)
297 // dispatch pingpoing
298 .then([client, target_addr] {
299 return client->dispatch_messages(target_addr, false);
300 // shutdown
301 }).finally([client] {
302 logger().info("client shutdown...");
303 return client->shutdown();
304 });
305 } else { // mode == perf_mode_t::server
306 return server->init(entity_name_t::OSD(0), "server", 0, target_addr)
307 // dispatch pingpoing
308 .then([server] {
309 return server->msgr->wait();
310 // shutdown
311 }).finally([server] {
312 logger().info("server shutdown...");
313 return server->shutdown();
314 });
315 }
316 });
317}
318
319}
320
321int main(int argc, char** argv)
322{
323 seastar::app_template app;
324 app.add_options()
325 ("addr", bpo::value<std::string>()->default_value("0.0.0.0:9010"),
326 "start server")
327 ("mode", bpo::value<int>()->default_value(0),
328 "0: both, 1:client, 2:server")
329 ("rounds", bpo::value<unsigned>()->default_value(65536),
330 "number of messaging rounds")
331 ("keepalive-ratio", bpo::value<double>()->default_value(0),
332 "ratio of keepalive in ping messages")
333 ("bs", bpo::value<int>()->default_value(4096),
334 "block size")
335 ("depth", bpo::value<int>()->default_value(512),
336 "io depth");
337 return app.run(argc, argv, [&app] {
338 auto&& config = app.configuration();
339 auto rounds = config["rounds"].as<unsigned>();
340 auto keepalive_ratio = config["keepalive-ratio"].as<double>();
341 auto bs = config["bs"].as<int>();
342 auto depth = config["depth"].as<int>();
343 auto addr = config["addr"].as<std::string>();
344 auto mode = config["mode"].as<int>();
345 logger().info("\nsettings:\n addr={}\n mode={}\n rounds={}\n keepalive-ratio={}\n bs={}\n depth={}",
346 addr, mode, rounds, keepalive_ratio, bs, depth);
347 ceph_assert(mode >= 0 && mode <= 2);
348 auto _mode = static_cast<perf_mode_t>(mode);
349 return run(rounds, keepalive_ratio, bs, depth, addr, _mode)
350 .then([] {
351 std::cout << "successful" << std::endl;
352 }).handle_exception([] (auto eptr) {
353 std::cout << "failed" << std::endl;
354 return seastar::make_exception_future<>(eptr);
355 });
356 });
357}