]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2015 Haomai Wang | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <stdlib.h> | |
18 | #include <stdint.h> | |
19 | #include <string> | |
20 | #include <unistd.h> | |
21 | #include <iostream> | |
22 | ||
23 | using namespace std; | |
24 | ||
7c673cae FG |
25 | #include "common/ceph_argparse.h" |
26 | #include "common/debug.h" | |
27 | #include "common/Cycles.h" | |
28 | #include "global/global_init.h" | |
29 | #include "msg/Messenger.h" | |
30 | #include "messages/MOSDOp.h" | |
31 | ||
31f18b77 FG |
32 | #include <atomic> |
33 | ||
7c673cae FG |
34 | class MessengerClient { |
35 | class ClientThread; | |
36 | class ClientDispatcher : public Dispatcher { | |
37 | uint64_t think_time; | |
38 | ClientThread *thread; | |
39 | ||
40 | public: | |
41 | ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {} | |
42 | bool ms_can_fast_dispatch_any() const override { return true; } | |
43 | bool ms_can_fast_dispatch(const Message *m) const override { | |
44 | switch (m->get_type()) { | |
45 | case CEPH_MSG_OSD_OPREPLY: | |
46 | return true; | |
47 | default: | |
48 | return false; | |
49 | } | |
50 | } | |
51 | ||
52 | void ms_handle_fast_connect(Connection *con) override {} | |
53 | void ms_handle_fast_accept(Connection *con) override {} | |
54 | bool ms_dispatch(Message *m) override { return true; } | |
55 | void ms_fast_dispatch(Message *m) override; | |
56 | bool ms_handle_reset(Connection *con) override { return true; } | |
57 | void ms_handle_remote_reset(Connection *con) override {} | |
58 | bool ms_handle_refused(Connection *con) override { return false; } | |
59 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
60 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
28e407b8 AA |
61 | bool& isvalid, CryptoKey& session_key, |
62 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) override { | |
7c673cae FG |
63 | isvalid = true; |
64 | return true; | |
65 | } | |
66 | }; | |
67 | ||
68 | class ClientThread : public Thread { | |
69 | Messenger *msgr; | |
70 | int concurrent; | |
71 | ConnectionRef conn; | |
31f18b77 | 72 | std::atomic<unsigned> client_inc = { 0 }; |
7c673cae FG |
73 | object_t oid; |
74 | object_locator_t oloc; | |
75 | pg_t pgid; | |
76 | int msg_len; | |
77 | bufferlist data; | |
78 | int ops; | |
79 | ClientDispatcher dispatcher; | |
80 | ||
81 | public: | |
82 | Mutex lock; | |
83 | Cond cond; | |
84 | uint64_t inflight; | |
85 | ||
86 | ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us): | |
31f18b77 | 87 | msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops), |
224ce89b | 88 | dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) { |
7c673cae FG |
89 | m->add_dispatcher_head(&dispatcher); |
90 | bufferptr ptr(msg_len); | |
91 | memset(ptr.c_str(), 0, msg_len); | |
92 | data.append(ptr); | |
93 | } | |
94 | void *entry() override { | |
95 | lock.Lock(); | |
96 | for (int i = 0; i < ops; ++i) { | |
97 | if (inflight > uint64_t(concurrent)) { | |
98 | cond.Wait(lock); | |
99 | } | |
100 | hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), | |
101 | oloc.nspace); | |
102 | spg_t spgid(pgid); | |
31f18b77 | 103 | MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0); |
7c673cae FG |
104 | m->write(0, msg_len, data); |
105 | inflight++; | |
106 | conn->send_message(m); | |
107 | //cerr << __func__ << " send m=" << m << std::endl; | |
108 | } | |
109 | lock.Unlock(); | |
110 | msgr->shutdown(); | |
111 | return 0; | |
112 | } | |
113 | }; | |
114 | ||
115 | string type; | |
116 | string serveraddr; | |
117 | int think_time_us; | |
118 | vector<Messenger*> msgrs; | |
119 | vector<ClientThread*> clients; | |
120 | ||
121 | public: | |
122 | MessengerClient(string t, string addr, int delay): | |
123 | type(t), serveraddr(addr), think_time_us(delay) { | |
124 | } | |
125 | ~MessengerClient() { | |
126 | for (uint64_t i = 0; i < clients.size(); ++i) | |
127 | delete clients[i]; | |
128 | for (uint64_t i = 0; i < msgrs.size(); ++i) { | |
129 | msgrs[i]->shutdown(); | |
130 | msgrs[i]->wait(); | |
131 | } | |
132 | } | |
133 | void ready(int c, int jobs, int ops, int msg_len) { | |
134 | entity_addr_t addr; | |
135 | addr.parse(serveraddr.c_str()); | |
136 | addr.set_nonce(0); | |
137 | for (int i = 0; i < jobs; ++i) { | |
138 | Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0); | |
139 | msgr->set_default_policy(Messenger::Policy::lossless_client(0)); | |
140 | entity_inst_t inst(entity_name_t::OSD(0), addr); | |
141 | ConnectionRef conn = msgr->get_connection(inst); | |
142 | ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us); | |
143 | msgrs.push_back(msgr); | |
144 | clients.push_back(t); | |
145 | msgr->start(); | |
146 | } | |
147 | usleep(1000*1000); | |
148 | } | |
149 | void start() { | |
150 | for (uint64_t i = 0; i < clients.size(); ++i) | |
151 | clients[i]->create("client"); | |
152 | for (uint64_t i = 0; i < msgrs.size(); ++i) | |
153 | msgrs[i]->wait(); | |
154 | } | |
155 | }; | |
156 | ||
157 | void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) { | |
158 | usleep(think_time); | |
159 | m->put(); | |
160 | Mutex::Locker l(thread->lock); | |
161 | thread->inflight--; | |
162 | thread->cond.Signal(); | |
163 | } | |
164 | ||
165 | ||
166 | void usage(const string &name) { | |
167 | cerr << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl; | |
168 | cerr << " [server ip:port]: connect to the ip:port pair" << std::endl; | |
169 | cerr << " [numjobs]: how much client threads spawned and do benchmark" << std::endl; | |
170 | cerr << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl; | |
171 | cerr << " [ios]: how much messages sent for each client" << std::endl; | |
172 | cerr << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl; | |
173 | cerr << " [msg length]: message data bytes" << std::endl; | |
174 | } | |
175 | ||
176 | int main(int argc, char **argv) | |
177 | { | |
178 | vector<const char*> args; | |
179 | argv_to_vec(argc, (const char **)argv, args); | |
180 | ||
181 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, | |
182 | CODE_ENVIRONMENT_UTILITY, 0); | |
183 | common_init_finish(g_ceph_context); | |
184 | g_ceph_context->_conf->apply_changes(NULL); | |
185 | ||
186 | if (args.size() < 6) { | |
187 | usage(argv[0]); | |
188 | return 1; | |
189 | } | |
190 | ||
191 | int numjobs = atoi(args[1]); | |
192 | int concurrent = atoi(args[2]); | |
193 | int ios = atoi(args[3]); | |
194 | int think_time = atoi(args[4]); | |
195 | int len = atoi(args[5]); | |
196 | ||
197 | std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf->get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type; | |
198 | ||
199 | cerr << " using ms-public-type " << public_msgr_type << std::endl; | |
200 | cerr << " server ip:port " << args[0] << std::endl; | |
201 | cerr << " numjobs " << numjobs << std::endl; | |
202 | cerr << " concurrency " << concurrent << std::endl; | |
203 | cerr << " ios " << ios << std::endl; | |
204 | cerr << " thinktime(us) " << think_time << std::endl; | |
205 | cerr << " message data bytes " << len << std::endl; | |
206 | ||
207 | MessengerClient client(public_msgr_type, args[0], think_time); | |
208 | ||
209 | client.ready(concurrent, numjobs, ios, len); | |
210 | Cycles::init(); | |
211 | uint64_t start = Cycles::rdtsc(); | |
212 | client.start(); | |
213 | uint64_t stop = Cycles::rdtsc(); | |
214 | cerr << " Total op " << ios << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl; | |
215 | ||
216 | return 0; | |
217 | } |