]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2015 Haomai Wang | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <stdlib.h> | |
18 | #include <stdint.h> | |
19 | #include <string> | |
20 | #include <unistd.h> | |
21 | #include <iostream> | |
22 | ||
23 | using namespace std; | |
24 | ||
7c673cae FG |
25 | #include "common/ceph_argparse.h" |
26 | #include "common/debug.h" | |
27 | #include "common/Cycles.h" | |
28 | #include "global/global_init.h" | |
29 | #include "msg/Messenger.h" | |
30 | #include "messages/MOSDOp.h" | |
31 | ||
31f18b77 FG |
32 | #include <atomic> |
33 | ||
7c673cae FG |
34 | class MessengerClient { |
35 | class ClientThread; | |
36 | class ClientDispatcher : public Dispatcher { | |
37 | uint64_t think_time; | |
38 | ClientThread *thread; | |
39 | ||
40 | public: | |
41 | ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {} | |
42 | bool ms_can_fast_dispatch_any() const override { return true; } | |
43 | bool ms_can_fast_dispatch(const Message *m) const override { | |
44 | switch (m->get_type()) { | |
45 | case CEPH_MSG_OSD_OPREPLY: | |
46 | return true; | |
47 | default: | |
48 | return false; | |
49 | } | |
50 | } | |
51 | ||
52 | void ms_handle_fast_connect(Connection *con) override {} | |
53 | void ms_handle_fast_accept(Connection *con) override {} | |
54 | bool ms_dispatch(Message *m) override { return true; } | |
55 | void ms_fast_dispatch(Message *m) override; | |
56 | bool ms_handle_reset(Connection *con) override { return true; } | |
57 | void ms_handle_remote_reset(Connection *con) override {} | |
58 | bool ms_handle_refused(Connection *con) override { return false; } | |
59 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
60 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
61 | bool& isvalid, CryptoKey& session_key) override { | |
62 | isvalid = true; | |
63 | return true; | |
64 | } | |
65 | }; | |
66 | ||
67 | class ClientThread : public Thread { | |
68 | Messenger *msgr; | |
69 | int concurrent; | |
70 | ConnectionRef conn; | |
31f18b77 | 71 | std::atomic<unsigned> client_inc = { 0 }; |
7c673cae FG |
72 | object_t oid; |
73 | object_locator_t oloc; | |
74 | pg_t pgid; | |
75 | int msg_len; | |
76 | bufferlist data; | |
77 | int ops; | |
78 | ClientDispatcher dispatcher; | |
79 | ||
80 | public: | |
81 | Mutex lock; | |
82 | Cond cond; | |
83 | uint64_t inflight; | |
84 | ||
85 | ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us): | |
31f18b77 | 86 | msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops), |
224ce89b | 87 | dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) { |
7c673cae FG |
88 | m->add_dispatcher_head(&dispatcher); |
89 | bufferptr ptr(msg_len); | |
90 | memset(ptr.c_str(), 0, msg_len); | |
91 | data.append(ptr); | |
92 | } | |
93 | void *entry() override { | |
94 | lock.Lock(); | |
95 | for (int i = 0; i < ops; ++i) { | |
96 | if (inflight > uint64_t(concurrent)) { | |
97 | cond.Wait(lock); | |
98 | } | |
99 | hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), | |
100 | oloc.nspace); | |
101 | spg_t spgid(pgid); | |
31f18b77 | 102 | MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0); |
7c673cae FG |
103 | m->write(0, msg_len, data); |
104 | inflight++; | |
105 | conn->send_message(m); | |
106 | //cerr << __func__ << " send m=" << m << std::endl; | |
107 | } | |
108 | lock.Unlock(); | |
109 | msgr->shutdown(); | |
110 | return 0; | |
111 | } | |
112 | }; | |
113 | ||
114 | string type; | |
115 | string serveraddr; | |
116 | int think_time_us; | |
117 | vector<Messenger*> msgrs; | |
118 | vector<ClientThread*> clients; | |
119 | ||
120 | public: | |
121 | MessengerClient(string t, string addr, int delay): | |
122 | type(t), serveraddr(addr), think_time_us(delay) { | |
123 | } | |
124 | ~MessengerClient() { | |
125 | for (uint64_t i = 0; i < clients.size(); ++i) | |
126 | delete clients[i]; | |
127 | for (uint64_t i = 0; i < msgrs.size(); ++i) { | |
128 | msgrs[i]->shutdown(); | |
129 | msgrs[i]->wait(); | |
130 | } | |
131 | } | |
132 | void ready(int c, int jobs, int ops, int msg_len) { | |
133 | entity_addr_t addr; | |
134 | addr.parse(serveraddr.c_str()); | |
135 | addr.set_nonce(0); | |
136 | for (int i = 0; i < jobs; ++i) { | |
137 | Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0); | |
138 | msgr->set_default_policy(Messenger::Policy::lossless_client(0)); | |
139 | entity_inst_t inst(entity_name_t::OSD(0), addr); | |
140 | ConnectionRef conn = msgr->get_connection(inst); | |
141 | ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us); | |
142 | msgrs.push_back(msgr); | |
143 | clients.push_back(t); | |
144 | msgr->start(); | |
145 | } | |
146 | usleep(1000*1000); | |
147 | } | |
148 | void start() { | |
149 | for (uint64_t i = 0; i < clients.size(); ++i) | |
150 | clients[i]->create("client"); | |
151 | for (uint64_t i = 0; i < msgrs.size(); ++i) | |
152 | msgrs[i]->wait(); | |
153 | } | |
154 | }; | |
155 | ||
156 | void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) { | |
157 | usleep(think_time); | |
158 | m->put(); | |
159 | Mutex::Locker l(thread->lock); | |
160 | thread->inflight--; | |
161 | thread->cond.Signal(); | |
162 | } | |
163 | ||
164 | ||
165 | void usage(const string &name) { | |
166 | cerr << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl; | |
167 | cerr << " [server ip:port]: connect to the ip:port pair" << std::endl; | |
168 | cerr << " [numjobs]: how much client threads spawned and do benchmark" << std::endl; | |
169 | cerr << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl; | |
170 | cerr << " [ios]: how much messages sent for each client" << std::endl; | |
171 | cerr << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl; | |
172 | cerr << " [msg length]: message data bytes" << std::endl; | |
173 | } | |
174 | ||
175 | int main(int argc, char **argv) | |
176 | { | |
177 | vector<const char*> args; | |
178 | argv_to_vec(argc, (const char **)argv, args); | |
179 | ||
180 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, | |
181 | CODE_ENVIRONMENT_UTILITY, 0); | |
182 | common_init_finish(g_ceph_context); | |
183 | g_ceph_context->_conf->apply_changes(NULL); | |
184 | ||
185 | if (args.size() < 6) { | |
186 | usage(argv[0]); | |
187 | return 1; | |
188 | } | |
189 | ||
190 | int numjobs = atoi(args[1]); | |
191 | int concurrent = atoi(args[2]); | |
192 | int ios = atoi(args[3]); | |
193 | int think_time = atoi(args[4]); | |
194 | int len = atoi(args[5]); | |
195 | ||
196 | std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf->get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type; | |
197 | ||
198 | cerr << " using ms-public-type " << public_msgr_type << std::endl; | |
199 | cerr << " server ip:port " << args[0] << std::endl; | |
200 | cerr << " numjobs " << numjobs << std::endl; | |
201 | cerr << " concurrency " << concurrent << std::endl; | |
202 | cerr << " ios " << ios << std::endl; | |
203 | cerr << " thinktime(us) " << think_time << std::endl; | |
204 | cerr << " message data bytes " << len << std::endl; | |
205 | ||
206 | MessengerClient client(public_msgr_type, args[0], think_time); | |
207 | ||
208 | client.ready(concurrent, numjobs, ios, len); | |
209 | Cycles::init(); | |
210 | uint64_t start = Cycles::rdtsc(); | |
211 | client.start(); | |
212 | uint64_t stop = Cycles::rdtsc(); | |
213 | cerr << " Total op " << ios << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl; | |
214 | ||
215 | return 0; | |
216 | } |