1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Haomai Wang
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
25 #include "common/ceph_argparse.h"
26 #include "common/debug.h"
27 #include "common/Cycles.h"
28 #include "global/global_init.h"
29 #include "msg/Messenger.h"
30 #include "messages/MOSDOp.h"
34 class MessengerClient
{
36 class ClientDispatcher
: public Dispatcher
{
41 ClientDispatcher(uint64_t delay
, ClientThread
*t
): Dispatcher(g_ceph_context
), think_time(delay
), thread(t
) {}
42 bool ms_can_fast_dispatch_any() const override
{ return true; }
43 bool ms_can_fast_dispatch(const Message
*m
) const override
{
44 switch (m
->get_type()) {
45 case CEPH_MSG_OSD_OPREPLY
:
52 void ms_handle_fast_connect(Connection
*con
) override
{}
53 void ms_handle_fast_accept(Connection
*con
) override
{}
54 bool ms_dispatch(Message
*m
) override
{ return true; }
55 void ms_fast_dispatch(Message
*m
) override
;
56 bool ms_handle_reset(Connection
*con
) override
{ return true; }
57 void ms_handle_remote_reset(Connection
*con
) override
{}
58 bool ms_handle_refused(Connection
*con
) override
{ return false; }
59 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
60 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
61 bool& isvalid
, CryptoKey
& session_key
,
62 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
) override
{
68 class ClientThread
: public Thread
{
72 std::atomic
<unsigned> client_inc
= { 0 };
74 object_locator_t oloc
;
79 ClientDispatcher dispatcher
;
86 ClientThread(Messenger
*m
, int c
, ConnectionRef con
, int len
, int ops
, int think_time_us
):
87 msgr(m
), concurrent(c
), conn(con
), oid("object-name"), oloc(1, 1), msg_len(len
), ops(ops
),
88 dispatcher(think_time_us
, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) {
89 m
->add_dispatcher_head(&dispatcher
);
90 bufferptr
ptr(msg_len
);
91 memset(ptr
.c_str(), 0, msg_len
);
94 void *entry() override
{
96 for (int i
= 0; i
< ops
; ++i
) {
97 if (inflight
> uint64_t(concurrent
)) {
100 hobject_t
hobj(oid
, oloc
.key
, CEPH_NOSNAP
, pgid
.ps(), pgid
.pool(),
103 MOSDOp
*m
= new MOSDOp(client_inc
, 0, hobj
, spgid
, 0, 0, 0);
104 m
->write(0, msg_len
, data
);
106 conn
->send_message(m
);
107 //cerr << __func__ << " send m=" << m << std::endl;
118 vector
<Messenger
*> msgrs
;
119 vector
<ClientThread
*> clients
;
122 MessengerClient(string t
, string addr
, int delay
):
123 type(t
), serveraddr(addr
), think_time_us(delay
) {
126 for (uint64_t i
= 0; i
< clients
.size(); ++i
)
128 for (uint64_t i
= 0; i
< msgrs
.size(); ++i
) {
129 msgrs
[i
]->shutdown();
133 void ready(int c
, int jobs
, int ops
, int msg_len
) {
135 addr
.parse(serveraddr
.c_str());
137 for (int i
= 0; i
< jobs
; ++i
) {
138 Messenger
*msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(0), "client", getpid()+i
, 0);
139 msgr
->set_default_policy(Messenger::Policy::lossless_client(0));
140 entity_inst_t
inst(entity_name_t::OSD(0), addr
);
141 ConnectionRef conn
= msgr
->get_connection(inst
);
142 ClientThread
*t
= new ClientThread(msgr
, c
, conn
, msg_len
, ops
, think_time_us
);
143 msgrs
.push_back(msgr
);
144 clients
.push_back(t
);
150 for (uint64_t i
= 0; i
< clients
.size(); ++i
)
151 clients
[i
]->create("client");
152 for (uint64_t i
= 0; i
< msgrs
.size(); ++i
)
157 void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message
*m
) {
160 Mutex::Locker
l(thread
->lock
);
162 thread
->cond
.Signal();
166 void usage(const string
&name
) {
167 cerr
<< "Usage: " << name
<< " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl
;
168 cerr
<< " [server ip:port]: connect to the ip:port pair" << std::endl
;
169 cerr
<< " [numjobs]: how much client threads spawned and do benchmark" << std::endl
;
170 cerr
<< " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl
;
171 cerr
<< " [ios]: how much messages sent for each client" << std::endl
;
172 cerr
<< " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl
;
173 cerr
<< " [msg length]: message data bytes" << std::endl
;
176 int main(int argc
, char **argv
)
178 vector
<const char*> args
;
179 argv_to_vec(argc
, (const char **)argv
, args
);
181 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
182 CODE_ENVIRONMENT_UTILITY
, 0);
183 common_init_finish(g_ceph_context
);
184 g_ceph_context
->_conf
->apply_changes(NULL
);
186 if (args
.size() < 6) {
191 int numjobs
= atoi(args
[1]);
192 int concurrent
= atoi(args
[2]);
193 int ios
= atoi(args
[3]);
194 int think_time
= atoi(args
[4]);
195 int len
= atoi(args
[5]);
197 std::string public_msgr_type
= g_ceph_context
->_conf
->ms_public_type
.empty() ? g_ceph_context
->_conf
->get_val
<std::string
>("ms_type") : g_ceph_context
->_conf
->ms_public_type
;
199 cerr
<< " using ms-public-type " << public_msgr_type
<< std::endl
;
200 cerr
<< " server ip:port " << args
[0] << std::endl
;
201 cerr
<< " numjobs " << numjobs
<< std::endl
;
202 cerr
<< " concurrency " << concurrent
<< std::endl
;
203 cerr
<< " ios " << ios
<< std::endl
;
204 cerr
<< " thinktime(us) " << think_time
<< std::endl
;
205 cerr
<< " message data bytes " << len
<< std::endl
;
207 MessengerClient
client(public_msgr_type
, args
[0], think_time
);
209 client
.ready(concurrent
, numjobs
, ios
, len
);
211 uint64_t start
= Cycles::rdtsc();
213 uint64_t stop
= Cycles::rdtsc();
214 cerr
<< " Total op " << ios
<< " run time " << Cycles::to_microseconds(stop
- start
) << "us." << std::endl
;