1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Haomai Wang
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
25 #include "common/ceph_argparse.h"
26 #include "common/debug.h"
27 #include "common/Cycles.h"
28 #include "global/global_init.h"
29 #include "msg/Messenger.h"
30 #include "messages/MOSDOp.h"
34 class MessengerClient
{
36 class ClientDispatcher
: public Dispatcher
{
41 ClientDispatcher(uint64_t delay
, ClientThread
*t
): Dispatcher(g_ceph_context
), think_time(delay
), thread(t
) {}
42 bool ms_can_fast_dispatch_any() const override
{ return true; }
43 bool ms_can_fast_dispatch(const Message
*m
) const override
{
44 switch (m
->get_type()) {
45 case CEPH_MSG_OSD_OPREPLY
:
52 void ms_handle_fast_connect(Connection
*con
) override
{}
53 void ms_handle_fast_accept(Connection
*con
) override
{}
54 bool ms_dispatch(Message
*m
) override
{ return true; }
55 void ms_fast_dispatch(Message
*m
) override
;
56 bool ms_handle_reset(Connection
*con
) override
{ return true; }
57 void ms_handle_remote_reset(Connection
*con
) override
{}
58 bool ms_handle_refused(Connection
*con
) override
{ return false; }
59 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
60 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
61 bool& isvalid
, CryptoKey
& session_key
) override
{
67 class ClientThread
: public Thread
{
71 std::atomic
<unsigned> client_inc
= { 0 };
73 object_locator_t oloc
;
78 ClientDispatcher dispatcher
;
85 ClientThread(Messenger
*m
, int c
, ConnectionRef con
, int len
, int ops
, int think_time_us
):
86 msgr(m
), concurrent(c
), conn(con
), oid("object-name"), oloc(1, 1), msg_len(len
), ops(ops
),
87 dispatcher(think_time_us
, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) {
88 m
->add_dispatcher_head(&dispatcher
);
89 bufferptr
ptr(msg_len
);
90 memset(ptr
.c_str(), 0, msg_len
);
93 void *entry() override
{
95 for (int i
= 0; i
< ops
; ++i
) {
96 if (inflight
> uint64_t(concurrent
)) {
99 hobject_t
hobj(oid
, oloc
.key
, CEPH_NOSNAP
, pgid
.ps(), pgid
.pool(),
102 MOSDOp
*m
= new MOSDOp(client_inc
, 0, hobj
, spgid
, 0, 0, 0);
103 m
->write(0, msg_len
, data
);
105 conn
->send_message(m
);
106 //cerr << __func__ << " send m=" << m << std::endl;
117 vector
<Messenger
*> msgrs
;
118 vector
<ClientThread
*> clients
;
121 MessengerClient(string t
, string addr
, int delay
):
122 type(t
), serveraddr(addr
), think_time_us(delay
) {
125 for (uint64_t i
= 0; i
< clients
.size(); ++i
)
127 for (uint64_t i
= 0; i
< msgrs
.size(); ++i
) {
128 msgrs
[i
]->shutdown();
132 void ready(int c
, int jobs
, int ops
, int msg_len
) {
134 addr
.parse(serveraddr
.c_str());
136 for (int i
= 0; i
< jobs
; ++i
) {
137 Messenger
*msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(0), "client", getpid()+i
, 0);
138 msgr
->set_default_policy(Messenger::Policy::lossless_client(0));
139 entity_inst_t
inst(entity_name_t::OSD(0), addr
);
140 ConnectionRef conn
= msgr
->get_connection(inst
);
141 ClientThread
*t
= new ClientThread(msgr
, c
, conn
, msg_len
, ops
, think_time_us
);
142 msgrs
.push_back(msgr
);
143 clients
.push_back(t
);
149 for (uint64_t i
= 0; i
< clients
.size(); ++i
)
150 clients
[i
]->create("client");
151 for (uint64_t i
= 0; i
< msgrs
.size(); ++i
)
156 void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message
*m
) {
159 Mutex::Locker
l(thread
->lock
);
161 thread
->cond
.Signal();
165 void usage(const string
&name
) {
166 cerr
<< "Usage: " << name
<< " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl
;
167 cerr
<< " [server ip:port]: connect to the ip:port pair" << std::endl
;
168 cerr
<< " [numjobs]: how much client threads spawned and do benchmark" << std::endl
;
169 cerr
<< " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl
;
170 cerr
<< " [ios]: how much messages sent for each client" << std::endl
;
171 cerr
<< " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl
;
172 cerr
<< " [msg length]: message data bytes" << std::endl
;
175 int main(int argc
, char **argv
)
177 vector
<const char*> args
;
178 argv_to_vec(argc
, (const char **)argv
, args
);
180 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
181 CODE_ENVIRONMENT_UTILITY
, 0);
182 common_init_finish(g_ceph_context
);
183 g_ceph_context
->_conf
->apply_changes(NULL
);
185 if (args
.size() < 6) {
190 int numjobs
= atoi(args
[1]);
191 int concurrent
= atoi(args
[2]);
192 int ios
= atoi(args
[3]);
193 int think_time
= atoi(args
[4]);
194 int len
= atoi(args
[5]);
196 std::string public_msgr_type
= g_ceph_context
->_conf
->ms_public_type
.empty() ? g_ceph_context
->_conf
->get_val
<std::string
>("ms_type") : g_ceph_context
->_conf
->ms_public_type
;
198 cerr
<< " using ms-public-type " << public_msgr_type
<< std::endl
;
199 cerr
<< " server ip:port " << args
[0] << std::endl
;
200 cerr
<< " numjobs " << numjobs
<< std::endl
;
201 cerr
<< " concurrency " << concurrent
<< std::endl
;
202 cerr
<< " ios " << ios
<< std::endl
;
203 cerr
<< " thinktime(us) " << think_time
<< std::endl
;
204 cerr
<< " message data bytes " << len
<< std::endl
;
206 MessengerClient
client(public_msgr_type
, args
[0], think_time
);
208 client
.ready(concurrent
, numjobs
, ios
, len
);
210 uint64_t start
= Cycles::rdtsc();
212 uint64_t stop
= Cycles::rdtsc();
213 cerr
<< " Total op " << ios
<< " run time " << Cycles::to_microseconds(stop
- start
) << "us." << std::endl
;