]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2015 Haomai Wang | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <stdlib.h> | |
18 | #include <stdint.h> | |
19 | #include <string> | |
20 | #include <unistd.h> | |
21 | #include <iostream> | |
22 | ||
23 | using namespace std; | |
24 | ||
7c673cae FG |
25 | #include "common/ceph_argparse.h" |
26 | #include "common/debug.h" | |
27 | #include "common/Cycles.h" | |
28 | #include "global/global_init.h" | |
29 | #include "msg/Messenger.h" | |
30 | #include "messages/MOSDOp.h" | |
9f95a23c | 31 | #include "auth/DummyAuth.h" |
7c673cae | 32 | |
31f18b77 FG |
33 | #include <atomic> |
34 | ||
7c673cae FG |
35 | class MessengerClient { |
36 | class ClientThread; | |
37 | class ClientDispatcher : public Dispatcher { | |
38 | uint64_t think_time; | |
39 | ClientThread *thread; | |
40 | ||
41 | public: | |
42 | ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {} | |
43 | bool ms_can_fast_dispatch_any() const override { return true; } | |
44 | bool ms_can_fast_dispatch(const Message *m) const override { | |
45 | switch (m->get_type()) { | |
46 | case CEPH_MSG_OSD_OPREPLY: | |
47 | return true; | |
48 | default: | |
49 | return false; | |
50 | } | |
51 | } | |
52 | ||
53 | void ms_handle_fast_connect(Connection *con) override {} | |
54 | void ms_handle_fast_accept(Connection *con) override {} | |
55 | bool ms_dispatch(Message *m) override { return true; } | |
56 | void ms_fast_dispatch(Message *m) override; | |
57 | bool ms_handle_reset(Connection *con) override { return true; } | |
58 | void ms_handle_remote_reset(Connection *con) override {} | |
59 | bool ms_handle_refused(Connection *con) override { return false; } | |
11fdf7f2 TL |
60 | int ms_handle_authentication(Connection *con) override { |
61 | return 1; | |
7c673cae FG |
62 | } |
63 | }; | |
64 | ||
65 | class ClientThread : public Thread { | |
66 | Messenger *msgr; | |
67 | int concurrent; | |
68 | ConnectionRef conn; | |
31f18b77 | 69 | std::atomic<unsigned> client_inc = { 0 }; |
7c673cae FG |
70 | object_t oid; |
71 | object_locator_t oloc; | |
72 | pg_t pgid; | |
73 | int msg_len; | |
74 | bufferlist data; | |
75 | int ops; | |
76 | ClientDispatcher dispatcher; | |
77 | ||
78 | public: | |
9f95a23c TL |
79 | ceph::mutex lock = ceph::make_mutex("MessengerBenchmark::ClientThread::lock"); |
80 | ceph::condition_variable cond; | |
7c673cae FG |
81 | uint64_t inflight; |
82 | ||
83 | ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us): | |
31f18b77 | 84 | msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops), |
9f95a23c | 85 | dispatcher(think_time_us, this), inflight(0) { |
7c673cae FG |
86 | m->add_dispatcher_head(&dispatcher); |
87 | bufferptr ptr(msg_len); | |
88 | memset(ptr.c_str(), 0, msg_len); | |
89 | data.append(ptr); | |
90 | } | |
91 | void *entry() override { | |
9f95a23c | 92 | std::unique_lock locker{lock}; |
7c673cae FG |
93 | for (int i = 0; i < ops; ++i) { |
94 | if (inflight > uint64_t(concurrent)) { | |
9f95a23c | 95 | cond.wait(locker); |
7c673cae FG |
96 | } |
97 | hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), | |
98 | oloc.nspace); | |
99 | spg_t spgid(pgid); | |
31f18b77 | 100 | MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0); |
11fdf7f2 TL |
101 | bufferlist msg_data(data); |
102 | m->write(0, msg_len, msg_data); | |
7c673cae FG |
103 | inflight++; |
104 | conn->send_message(m); | |
105 | //cerr << __func__ << " send m=" << m << std::endl; | |
106 | } | |
9f95a23c | 107 | locker.unlock(); |
7c673cae FG |
108 | msgr->shutdown(); |
109 | return 0; | |
110 | } | |
111 | }; | |
112 | ||
113 | string type; | |
114 | string serveraddr; | |
115 | int think_time_us; | |
116 | vector<Messenger*> msgrs; | |
117 | vector<ClientThread*> clients; | |
9f95a23c | 118 | DummyAuthClientServer dummy_auth; |
7c673cae FG |
119 | |
120 | public: | |
11fdf7f2 | 121 | MessengerClient(const string &t, const string &addr, int delay): |
9f95a23c TL |
122 | type(t), serveraddr(addr), think_time_us(delay), |
123 | dummy_auth(g_ceph_context) { | |
7c673cae FG |
124 | } |
125 | ~MessengerClient() { | |
126 | for (uint64_t i = 0; i < clients.size(); ++i) | |
127 | delete clients[i]; | |
128 | for (uint64_t i = 0; i < msgrs.size(); ++i) { | |
129 | msgrs[i]->shutdown(); | |
130 | msgrs[i]->wait(); | |
131 | } | |
132 | } | |
133 | void ready(int c, int jobs, int ops, int msg_len) { | |
134 | entity_addr_t addr; | |
135 | addr.parse(serveraddr.c_str()); | |
136 | addr.set_nonce(0); | |
9f95a23c | 137 | dummy_auth.auth_registry.refresh_config(); |
7c673cae FG |
138 | for (int i = 0; i < jobs; ++i) { |
139 | Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0); | |
140 | msgr->set_default_policy(Messenger::Policy::lossless_client(0)); | |
9f95a23c | 141 | msgr->set_auth_client(&dummy_auth); |
11fdf7f2 TL |
142 | msgr->start(); |
143 | entity_addrvec_t addrs(addr); | |
144 | ConnectionRef conn = msgr->connect_to_osd(addrs); | |
7c673cae FG |
145 | ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us); |
146 | msgrs.push_back(msgr); | |
147 | clients.push_back(t); | |
7c673cae FG |
148 | } |
149 | usleep(1000*1000); | |
150 | } | |
151 | void start() { | |
152 | for (uint64_t i = 0; i < clients.size(); ++i) | |
153 | clients[i]->create("client"); | |
154 | for (uint64_t i = 0; i < msgrs.size(); ++i) | |
155 | msgrs[i]->wait(); | |
156 | } | |
157 | }; | |
158 | ||
159 | void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) { | |
160 | usleep(think_time); | |
161 | m->put(); | |
9f95a23c | 162 | std::lock_guard l{thread->lock}; |
7c673cae | 163 | thread->inflight--; |
9f95a23c | 164 | thread->cond.notify_all(); |
7c673cae FG |
165 | } |
166 | ||
167 | ||
168 | void usage(const string &name) { | |
169 | cerr << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl; | |
170 | cerr << " [server ip:port]: connect to the ip:port pair" << std::endl; | |
171 | cerr << " [numjobs]: how much client threads spawned and do benchmark" << std::endl; | |
172 | cerr << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl; | |
173 | cerr << " [ios]: how much messages sent for each client" << std::endl; | |
174 | cerr << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl; | |
175 | cerr << " [msg length]: message data bytes" << std::endl; | |
176 | } | |
177 | ||
178 | int main(int argc, char **argv) | |
179 | { | |
180 | vector<const char*> args; | |
181 | argv_to_vec(argc, (const char **)argv, args); | |
182 | ||
183 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, | |
11fdf7f2 TL |
184 | CODE_ENVIRONMENT_UTILITY, |
185 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); | |
7c673cae | 186 | common_init_finish(g_ceph_context); |
11fdf7f2 | 187 | g_ceph_context->_conf.apply_changes(nullptr); |
7c673cae FG |
188 | |
189 | if (args.size() < 6) { | |
190 | usage(argv[0]); | |
191 | return 1; | |
192 | } | |
193 | ||
194 | int numjobs = atoi(args[1]); | |
195 | int concurrent = atoi(args[2]); | |
196 | int ios = atoi(args[3]); | |
197 | int think_time = atoi(args[4]); | |
198 | int len = atoi(args[5]); | |
199 | ||
11fdf7f2 | 200 | std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf.get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type; |
7c673cae FG |
201 | |
202 | cerr << " using ms-public-type " << public_msgr_type << std::endl; | |
203 | cerr << " server ip:port " << args[0] << std::endl; | |
204 | cerr << " numjobs " << numjobs << std::endl; | |
205 | cerr << " concurrency " << concurrent << std::endl; | |
206 | cerr << " ios " << ios << std::endl; | |
207 | cerr << " thinktime(us) " << think_time << std::endl; | |
208 | cerr << " message data bytes " << len << std::endl; | |
209 | ||
210 | MessengerClient client(public_msgr_type, args[0], think_time); | |
211 | ||
212 | client.ready(concurrent, numjobs, ios, len); | |
213 | Cycles::init(); | |
214 | uint64_t start = Cycles::rdtsc(); | |
215 | client.start(); | |
216 | uint64_t stop = Cycles::rdtsc(); | |
217 | cerr << " Total op " << ios << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl; | |
218 | ||
219 | return 0; | |
220 | } |