]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <map> | |
5 | #include <random> | |
6 | #include <boost/program_options.hpp> | |
7 | ||
8 | #include <seastar/core/app-template.hh> | |
9 | #include <seastar/core/do_with.hh> | |
10 | #include <seastar/core/future-util.hh> | |
11 | #include <seastar/core/reactor.hh> | |
12 | #include <seastar/core/sleep.hh> | |
13 | #include <seastar/core/semaphore.hh> | |
14 | ||
15 | #include "common/ceph_time.h" | |
16 | #include "messages/MOSDOp.h" | |
17 | #include "messages/MOSDOpReply.h" | |
18 | ||
19 | #include "crimson/common/log.h" | |
20 | #include "crimson/net/Connection.h" | |
21 | #include "crimson/net/Dispatcher.h" | |
22 | #include "crimson/net/Messenger.h" | |
23 | ||
24 | namespace bpo = boost::program_options; | |
25 | ||
26 | namespace { | |
27 | ||
28 | template<typename Message> | |
29 | using Ref = boost::intrusive_ptr<Message>; | |
30 | ||
31 | seastar::logger& logger() { | |
32 | return ceph::get_logger(ceph_subsys_ms); | |
33 | } | |
34 | ||
35 | ||
36 | enum class perf_mode_t { | |
37 | both, | |
38 | client, | |
39 | server | |
40 | }; | |
41 | ||
42 | static std::random_device rd; | |
43 | static std::default_random_engine rng{rd()}; | |
44 | ||
45 | static seastar::future<> run(unsigned rounds, | |
46 | double keepalive_ratio, | |
47 | int bs, | |
48 | int depth, | |
49 | std::string addr, | |
50 | perf_mode_t mode) | |
51 | { | |
52 | struct test_state { | |
53 | struct Server final | |
54 | : public ceph::net::Dispatcher, | |
55 | public seastar::peering_sharded_service<Server> { | |
56 | ceph::net::Messenger *msgr = nullptr; | |
57 | ||
58 | Dispatcher* get_local_shard() override { | |
59 | return &(container().local()); | |
60 | } | |
61 | seastar::future<> stop() { | |
62 | return seastar::make_ready_future<>(); | |
63 | } | |
64 | seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, | |
65 | MessageRef m) override { | |
66 | ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); | |
67 | // reply | |
68 | Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m); | |
69 | req->finish_decode(); | |
70 | return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false }); | |
71 | } | |
72 | ||
73 | seastar::future<> init(const entity_name_t& name, | |
74 | const std::string& lname, | |
75 | const uint64_t nonce, | |
76 | const entity_addr_t& addr) { | |
77 | auto&& fut = ceph::net::Messenger::create(name, lname, nonce, 1); | |
78 | return fut.then([this, addr](ceph::net::Messenger *messenger) { | |
79 | return container().invoke_on_all([messenger](auto& server) { | |
80 | server.msgr = messenger->get_local_shard(); | |
81 | server.msgr->set_crc_header(); | |
82 | }).then([messenger, addr] { | |
83 | return messenger->bind(entity_addrvec_t{addr}); | |
84 | }).then([this, messenger] { | |
85 | return messenger->start(this); | |
86 | }); | |
87 | }); | |
88 | } | |
89 | seastar::future<> shutdown() { | |
90 | ceph_assert(msgr); | |
91 | return msgr->shutdown(); | |
92 | } | |
93 | }; | |
94 | ||
95 | struct Client final | |
96 | : public ceph::net::Dispatcher, | |
97 | public seastar::peering_sharded_service<Client> { | |
98 | ||
99 | struct PingSession : public seastar::enable_shared_from_this<PingSession> { | |
100 | unsigned count = 0u; | |
101 | mono_time connected_time; | |
102 | mono_time finish_time; | |
103 | }; | |
104 | using PingSessionRef = seastar::shared_ptr<PingSession>; | |
105 | ||
106 | unsigned rounds; | |
107 | std::bernoulli_distribution keepalive_dist; | |
108 | ceph::net::Messenger *msgr = nullptr; | |
109 | std::map<ceph::net::Connection*, seastar::promise<>> pending_conns; | |
110 | std::map<ceph::net::ConnectionRef, PingSessionRef> sessions; | |
111 | int msg_len; | |
112 | bufferlist msg_data; | |
113 | seastar::semaphore depth; | |
114 | ||
115 | Client(unsigned rounds, double keepalive_ratio, int msg_len, int depth) | |
116 | : rounds(rounds), | |
117 | keepalive_dist(std::bernoulli_distribution{keepalive_ratio}), | |
118 | depth(depth) { | |
119 | bufferptr ptr(msg_len); | |
120 | memset(ptr.c_str(), 0, msg_len); | |
121 | msg_data.append(ptr); | |
122 | } | |
123 | ||
124 | PingSessionRef find_session(ceph::net::ConnectionRef c) { | |
125 | auto found = sessions.find(c); | |
126 | if (found == sessions.end()) { | |
127 | ceph_assert(false); | |
128 | } | |
129 | return found->second; | |
130 | } | |
131 | ||
132 | Dispatcher* get_local_shard() override { | |
133 | return &(container().local()); | |
134 | } | |
135 | seastar::future<> stop() { | |
136 | return seastar::now(); | |
137 | } | |
138 | seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { | |
139 | logger().info("{}: connected to {}", *conn, conn->get_peer_addr()); | |
140 | auto session = seastar::make_shared<PingSession>(); | |
141 | auto [i, added] = sessions.emplace(conn, session); | |
142 | std::ignore = i; | |
143 | ceph_assert(added); | |
144 | session->connected_time = mono_clock::now(); | |
145 | return seastar::now(); | |
146 | } | |
147 | seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, | |
148 | MessageRef m) override { | |
149 | ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY); | |
150 | depth.signal(1); | |
151 | auto session = find_session(c); | |
152 | ++(session->count); | |
153 | ||
154 | if (session->count == rounds) { | |
155 | logger().info("{}: finished receiving {} OPREPLYs", *c.get(), session->count); | |
156 | session->finish_time = mono_clock::now(); | |
157 | return container().invoke_on_all([conn = c.get()](auto &client) { | |
158 | auto found = client.pending_conns.find(conn); | |
159 | ceph_assert(found != client.pending_conns.end()); | |
160 | found->second.set_value(); | |
161 | }); | |
162 | } else { | |
163 | return seastar::now(); | |
164 | } | |
165 | } | |
166 | ||
167 | seastar::future<> init(const entity_name_t& name, | |
168 | const std::string& lname, | |
169 | const uint64_t nonce) { | |
170 | return ceph::net::Messenger::create(name, lname, nonce, 2) | |
171 | .then([this](ceph::net::Messenger *messenger) { | |
172 | return container().invoke_on_all([messenger](auto& client) { | |
173 | client.msgr = messenger->get_local_shard(); | |
174 | client.msgr->set_crc_header(); | |
175 | }).then([this, messenger] { | |
176 | return messenger->start(this); | |
177 | }); | |
178 | }); | |
179 | } | |
180 | ||
181 | seastar::future<> shutdown() { | |
182 | ceph_assert(msgr); | |
183 | return msgr->shutdown(); | |
184 | } | |
185 | ||
186 | seastar::future<> dispatch_messages(const entity_addr_t& peer_addr, bool foreign_dispatch=true) { | |
187 | mono_time start_time = mono_clock::now(); | |
188 | return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) | |
189 | .then([this, foreign_dispatch, start_time](auto conn) { | |
190 | return seastar::futurize_apply([this, conn, foreign_dispatch] { | |
191 | if (foreign_dispatch) { | |
192 | return do_dispatch_messages(&**conn); | |
193 | } else { | |
194 | // NOTE: this could be faster if we don't switch cores in do_dispatch_messages(). | |
195 | return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { | |
196 | return client.do_dispatch_messages(conn); | |
197 | }); | |
198 | } | |
199 | }).finally([this, conn, start_time] { | |
200 | return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) { | |
201 | auto session = client.find_session((*conn)->shared_from_this()); | |
202 | std::chrono::duration<double> dur_handshake = session->connected_time - start_time; | |
203 | std::chrono::duration<double> dur_messaging = session->finish_time - session->connected_time; | |
204 | logger().info("{}: handshake {}, messaging {}", | |
205 | **conn, dur_handshake.count(), dur_messaging.count()); | |
206 | }); | |
207 | }); | |
208 | }); | |
209 | } | |
210 | ||
211 | private: | |
212 | seastar::future<> send_msg(ceph::net::Connection* conn) { | |
213 | return depth.wait(1).then([this, conn] { | |
214 | const static pg_t pgid; | |
215 | const static object_locator_t oloc; | |
216 | const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), | |
217 | pgid.pool(), oloc.nspace); | |
218 | static spg_t spgid(pgid); | |
219 | MOSDOp *m = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); | |
220 | bufferlist data(msg_data); | |
221 | m->write(0, msg_len, data); | |
222 | MessageRef msg = {m, false}; | |
223 | return conn->send(msg); | |
224 | }); | |
225 | } | |
226 | ||
227 | seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) { | |
228 | return container().invoke_on_all([conn](auto& client) { | |
229 | auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); | |
230 | std::ignore = i; | |
231 | ceph_assert(added); | |
232 | }).then([this, conn] { | |
233 | return seastar::do_with(0u, 0u, | |
234 | [this, conn](auto &count_ping, auto &count_keepalive) { | |
235 | return seastar::do_until( | |
236 | [this, conn, &count_ping, &count_keepalive] { | |
237 | bool stop = (count_ping == rounds); | |
238 | if (stop) { | |
239 | logger().info("{}: finished sending {} OSDOPs with {} keepalives", | |
240 | *conn, count_ping, count_keepalive); | |
241 | } | |
242 | return stop; | |
243 | }, | |
244 | [this, conn, &count_ping, &count_keepalive] { | |
245 | return seastar::repeat([this, conn, &count_ping, &count_keepalive] { | |
246 | if (keepalive_dist(rng)) { | |
247 | return conn->keepalive() | |
248 | .then([&count_keepalive] { | |
249 | count_keepalive += 1; | |
250 | return seastar::make_ready_future<seastar::stop_iteration>( | |
251 | seastar::stop_iteration::no); | |
252 | }); | |
253 | } else { | |
254 | return send_msg(conn) | |
255 | .then([&count_ping] { | |
256 | count_ping += 1; | |
257 | return seastar::make_ready_future<seastar::stop_iteration>( | |
258 | seastar::stop_iteration::yes); | |
259 | }); | |
260 | } | |
261 | }); | |
262 | }).then([this, conn] { | |
263 | auto found = pending_conns.find(conn); | |
264 | return found->second.get_future(); | |
265 | }); | |
266 | }); | |
267 | }); | |
268 | } | |
269 | }; | |
270 | }; | |
271 | ||
272 | return seastar::when_all_succeed( | |
273 | ceph::net::create_sharded<test_state::Server>(), | |
274 | ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio, bs, depth)) | |
275 | .then([rounds, keepalive_ratio, addr, mode](test_state::Server *server, | |
276 | test_state::Client *client) { | |
277 | entity_addr_t target_addr; | |
278 | target_addr.parse(addr.c_str(), nullptr); | |
279 | target_addr.set_type(entity_addr_t::TYPE_LEGACY); | |
280 | if (mode == perf_mode_t::both) { | |
281 | return seastar::when_all_succeed( | |
282 | server->init(entity_name_t::OSD(0), "server", 0, target_addr), | |
283 | client->init(entity_name_t::OSD(1), "client", 0)) | |
284 | // dispatch pingpoing | |
285 | .then([client, target_addr] { | |
286 | return client->dispatch_messages(target_addr, false); | |
287 | // shutdown | |
288 | }).finally([client] { | |
289 | logger().info("client shutdown..."); | |
290 | return client->shutdown(); | |
291 | }).finally([server] { | |
292 | logger().info("server shutdown..."); | |
293 | return server->shutdown(); | |
294 | }); | |
295 | } else if (mode == perf_mode_t::client) { | |
296 | return client->init(entity_name_t::OSD(1), "client", 0) | |
297 | // dispatch pingpoing | |
298 | .then([client, target_addr] { | |
299 | return client->dispatch_messages(target_addr, false); | |
300 | // shutdown | |
301 | }).finally([client] { | |
302 | logger().info("client shutdown..."); | |
303 | return client->shutdown(); | |
304 | }); | |
305 | } else { // mode == perf_mode_t::server | |
306 | return server->init(entity_name_t::OSD(0), "server", 0, target_addr) | |
307 | // dispatch pingpoing | |
308 | .then([server] { | |
309 | return server->msgr->wait(); | |
310 | // shutdown | |
311 | }).finally([server] { | |
312 | logger().info("server shutdown..."); | |
313 | return server->shutdown(); | |
314 | }); | |
315 | } | |
316 | }); | |
317 | } | |
318 | ||
319 | } | |
320 | ||
321 | int main(int argc, char** argv) | |
322 | { | |
323 | seastar::app_template app; | |
324 | app.add_options() | |
325 | ("addr", bpo::value<std::string>()->default_value("0.0.0.0:9010"), | |
326 | "start server") | |
327 | ("mode", bpo::value<int>()->default_value(0), | |
328 | "0: both, 1:client, 2:server") | |
329 | ("rounds", bpo::value<unsigned>()->default_value(65536), | |
330 | "number of messaging rounds") | |
331 | ("keepalive-ratio", bpo::value<double>()->default_value(0), | |
332 | "ratio of keepalive in ping messages") | |
333 | ("bs", bpo::value<int>()->default_value(4096), | |
334 | "block size") | |
335 | ("depth", bpo::value<int>()->default_value(512), | |
336 | "io depth"); | |
337 | return app.run(argc, argv, [&app] { | |
338 | auto&& config = app.configuration(); | |
339 | auto rounds = config["rounds"].as<unsigned>(); | |
340 | auto keepalive_ratio = config["keepalive-ratio"].as<double>(); | |
341 | auto bs = config["bs"].as<int>(); | |
342 | auto depth = config["depth"].as<int>(); | |
343 | auto addr = config["addr"].as<std::string>(); | |
344 | auto mode = config["mode"].as<int>(); | |
345 | logger().info("\nsettings:\n addr={}\n mode={}\n rounds={}\n keepalive-ratio={}\n bs={}\n depth={}", | |
346 | addr, mode, rounds, keepalive_ratio, bs, depth); | |
347 | ceph_assert(mode >= 0 && mode <= 2); | |
348 | auto _mode = static_cast<perf_mode_t>(mode); | |
349 | return run(rounds, keepalive_ratio, bs, depth, addr, _mode) | |
350 | .then([] { | |
351 | std::cout << "successful" << std::endl; | |
352 | }).handle_exception([] (auto eptr) { | |
353 | std::cout << "failed" << std::endl; | |
354 | return seastar::make_exception_future<>(eptr); | |
355 | }); | |
356 | }); | |
357 | } |