1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Haomai Wang
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
25 #include "common/ceph_argparse.h"
26 #include "common/debug.h"
27 #include "common/Cycles.h"
28 #include "global/global_init.h"
29 #include "msg/Messenger.h"
30 #include "messages/MOSDOp.h"
31 #include "auth/DummyAuth.h"
35 class MessengerClient
{
37 class ClientDispatcher
: public Dispatcher
{
42 ClientDispatcher(uint64_t delay
, ClientThread
*t
): Dispatcher(g_ceph_context
), think_time(delay
), thread(t
) {}
43 bool ms_can_fast_dispatch_any() const override
{ return true; }
44 bool ms_can_fast_dispatch(const Message
*m
) const override
{
45 switch (m
->get_type()) {
46 case CEPH_MSG_OSD_OPREPLY
:
53 void ms_handle_fast_connect(Connection
*con
) override
{}
54 void ms_handle_fast_accept(Connection
*con
) override
{}
55 bool ms_dispatch(Message
*m
) override
{ return true; }
56 void ms_fast_dispatch(Message
*m
) override
;
57 bool ms_handle_reset(Connection
*con
) override
{ return true; }
58 void ms_handle_remote_reset(Connection
*con
) override
{}
59 bool ms_handle_refused(Connection
*con
) override
{ return false; }
60 int ms_handle_authentication(Connection
*con
) override
{
65 class ClientThread
: public Thread
{
69 std::atomic
<unsigned> client_inc
= { 0 };
71 object_locator_t oloc
;
76 ClientDispatcher dispatcher
;
79 ceph::mutex lock
= ceph::make_mutex("MessengerBenchmark::ClientThread::lock");
80 ceph::condition_variable cond
;
83 ClientThread(Messenger
*m
, int c
, ConnectionRef con
, int len
, int ops
, int think_time_us
):
84 msgr(m
), concurrent(c
), conn(con
), oid("object-name"), oloc(1, 1), msg_len(len
), ops(ops
),
85 dispatcher(think_time_us
, this), inflight(0) {
86 m
->add_dispatcher_head(&dispatcher
);
87 bufferptr
ptr(msg_len
);
88 memset(ptr
.c_str(), 0, msg_len
);
91 void *entry() override
{
92 std::unique_lock locker
{lock
};
93 for (int i
= 0; i
< ops
; ++i
) {
94 if (inflight
> uint64_t(concurrent
)) {
97 hobject_t
hobj(oid
, oloc
.key
, CEPH_NOSNAP
, pgid
.ps(), pgid
.pool(),
100 MOSDOp
*m
= new MOSDOp(client_inc
, 0, hobj
, spgid
, 0, 0, 0);
101 bufferlist
msg_data(data
);
102 m
->write(0, msg_len
, msg_data
);
104 conn
->send_message(m
);
105 //cerr << __func__ << " send m=" << m << std::endl;
116 vector
<Messenger
*> msgrs
;
117 vector
<ClientThread
*> clients
;
118 DummyAuthClientServer dummy_auth
;
121 MessengerClient(const string
&t
, const string
&addr
, int delay
):
122 type(t
), serveraddr(addr
), think_time_us(delay
),
123 dummy_auth(g_ceph_context
) {
126 for (uint64_t i
= 0; i
< clients
.size(); ++i
)
128 for (uint64_t i
= 0; i
< msgrs
.size(); ++i
) {
129 msgrs
[i
]->shutdown();
133 void ready(int c
, int jobs
, int ops
, int msg_len
) {
135 addr
.parse(serveraddr
.c_str());
137 dummy_auth
.auth_registry
.refresh_config();
138 for (int i
= 0; i
< jobs
; ++i
) {
139 Messenger
*msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(0), "client", getpid()+i
);
140 msgr
->set_default_policy(Messenger::Policy::lossless_client(0));
141 msgr
->set_auth_client(&dummy_auth
);
143 entity_addrvec_t
addrs(addr
);
144 ConnectionRef conn
= msgr
->connect_to_osd(addrs
);
145 ClientThread
*t
= new ClientThread(msgr
, c
, conn
, msg_len
, ops
, think_time_us
);
146 msgrs
.push_back(msgr
);
147 clients
.push_back(t
);
152 for (uint64_t i
= 0; i
< clients
.size(); ++i
)
153 clients
[i
]->create("client");
154 for (uint64_t i
= 0; i
< msgrs
.size(); ++i
)
159 void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message
*m
) {
162 std::lock_guard l
{thread
->lock
};
164 thread
->cond
.notify_all();
168 void usage(const string
&name
) {
169 cout
<< "Usage: " << name
<< " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl
;
170 cout
<< " [server ip:port]: connect to the ip:port pair" << std::endl
;
171 cout
<< " [numjobs]: how much client threads spawned and do benchmark" << std::endl
;
172 cout
<< " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl
;
173 cout
<< " [ios]: how much messages sent for each client" << std::endl
;
174 cout
<< " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl
;
175 cout
<< " [msg length]: message data bytes" << std::endl
;
178 int main(int argc
, char **argv
)
180 auto args
= argv_to_vec(argc
, argv
);
182 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
183 CODE_ENVIRONMENT_UTILITY
,
184 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
185 common_init_finish(g_ceph_context
);
186 g_ceph_context
->_conf
.apply_changes(nullptr);
188 if (args
.size() < 6) {
193 int numjobs
= atoi(args
[1]);
194 int concurrent
= atoi(args
[2]);
195 int ios
= atoi(args
[3]);
196 int think_time
= atoi(args
[4]);
197 int len
= atoi(args
[5]);
199 std::string public_msgr_type
= g_ceph_context
->_conf
->ms_public_type
.empty() ? g_ceph_context
->_conf
.get_val
<std::string
>("ms_type") : g_ceph_context
->_conf
->ms_public_type
;
201 cout
<< " using ms-public-type " << public_msgr_type
<< std::endl
;
202 cout
<< " server ip:port " << args
[0] << std::endl
;
203 cout
<< " numjobs " << numjobs
<< std::endl
;
204 cout
<< " concurrency " << concurrent
<< std::endl
;
205 cout
<< " ios " << ios
<< std::endl
;
206 cout
<< " thinktime(us) " << think_time
<< std::endl
;
207 cout
<< " message data bytes " << len
<< std::endl
;
209 MessengerClient
client(public_msgr_type
, args
[0], think_time
);
211 client
.ready(concurrent
, numjobs
, ios
, len
);
213 uint64_t start
= Cycles::rdtsc();
215 uint64_t stop
= Cycles::rdtsc();
216 cout
<< " Total op " << (ios
* numjobs
) << " run time " << Cycles::to_microseconds(stop
- start
) << "us." << std::endl
;