]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/msgr/perf_msgr_client.cc
update sources to v12.1.1
[ceph.git] / ceph / src / test / msgr / perf_msgr_client.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Haomai Wang
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <stdlib.h>
18#include <stdint.h>
19#include <string>
20#include <unistd.h>
21#include <iostream>
22
23using namespace std;
24
7c673cae
FG
25#include "common/ceph_argparse.h"
26#include "common/debug.h"
27#include "common/Cycles.h"
28#include "global/global_init.h"
29#include "msg/Messenger.h"
30#include "messages/MOSDOp.h"
31
31f18b77
FG
32#include <atomic>
33
7c673cae
FG
34class MessengerClient {
35 class ClientThread;
36 class ClientDispatcher : public Dispatcher {
37 uint64_t think_time;
38 ClientThread *thread;
39
40 public:
41 ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {}
42 bool ms_can_fast_dispatch_any() const override { return true; }
43 bool ms_can_fast_dispatch(const Message *m) const override {
44 switch (m->get_type()) {
45 case CEPH_MSG_OSD_OPREPLY:
46 return true;
47 default:
48 return false;
49 }
50 }
51
52 void ms_handle_fast_connect(Connection *con) override {}
53 void ms_handle_fast_accept(Connection *con) override {}
54 bool ms_dispatch(Message *m) override { return true; }
55 void ms_fast_dispatch(Message *m) override;
56 bool ms_handle_reset(Connection *con) override { return true; }
57 void ms_handle_remote_reset(Connection *con) override {}
58 bool ms_handle_refused(Connection *con) override { return false; }
59 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
60 bufferlist& authorizer, bufferlist& authorizer_reply,
61 bool& isvalid, CryptoKey& session_key) override {
62 isvalid = true;
63 return true;
64 }
65 };
66
67 class ClientThread : public Thread {
68 Messenger *msgr;
69 int concurrent;
70 ConnectionRef conn;
31f18b77 71 std::atomic<unsigned> client_inc = { 0 };
7c673cae
FG
72 object_t oid;
73 object_locator_t oloc;
74 pg_t pgid;
75 int msg_len;
76 bufferlist data;
77 int ops;
78 ClientDispatcher dispatcher;
79
80 public:
81 Mutex lock;
82 Cond cond;
83 uint64_t inflight;
84
85 ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us):
31f18b77 86 msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops),
224ce89b 87 dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) {
7c673cae
FG
88 m->add_dispatcher_head(&dispatcher);
89 bufferptr ptr(msg_len);
90 memset(ptr.c_str(), 0, msg_len);
91 data.append(ptr);
92 }
93 void *entry() override {
94 lock.Lock();
95 for (int i = 0; i < ops; ++i) {
96 if (inflight > uint64_t(concurrent)) {
97 cond.Wait(lock);
98 }
99 hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(),
100 oloc.nspace);
101 spg_t spgid(pgid);
31f18b77 102 MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0);
7c673cae
FG
103 m->write(0, msg_len, data);
104 inflight++;
105 conn->send_message(m);
106 //cerr << __func__ << " send m=" << m << std::endl;
107 }
108 lock.Unlock();
109 msgr->shutdown();
110 return 0;
111 }
112 };
113
114 string type;
115 string serveraddr;
116 int think_time_us;
117 vector<Messenger*> msgrs;
118 vector<ClientThread*> clients;
119
120 public:
121 MessengerClient(string t, string addr, int delay):
122 type(t), serveraddr(addr), think_time_us(delay) {
123 }
124 ~MessengerClient() {
125 for (uint64_t i = 0; i < clients.size(); ++i)
126 delete clients[i];
127 for (uint64_t i = 0; i < msgrs.size(); ++i) {
128 msgrs[i]->shutdown();
129 msgrs[i]->wait();
130 }
131 }
132 void ready(int c, int jobs, int ops, int msg_len) {
133 entity_addr_t addr;
134 addr.parse(serveraddr.c_str());
135 addr.set_nonce(0);
136 for (int i = 0; i < jobs; ++i) {
137 Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0);
138 msgr->set_default_policy(Messenger::Policy::lossless_client(0));
139 entity_inst_t inst(entity_name_t::OSD(0), addr);
140 ConnectionRef conn = msgr->get_connection(inst);
141 ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us);
142 msgrs.push_back(msgr);
143 clients.push_back(t);
144 msgr->start();
145 }
146 usleep(1000*1000);
147 }
148 void start() {
149 for (uint64_t i = 0; i < clients.size(); ++i)
150 clients[i]->create("client");
151 for (uint64_t i = 0; i < msgrs.size(); ++i)
152 msgrs[i]->wait();
153 }
154};
155
156void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) {
157 usleep(think_time);
158 m->put();
159 Mutex::Locker l(thread->lock);
160 thread->inflight--;
161 thread->cond.Signal();
162}
163
164
165void usage(const string &name) {
166 cerr << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl;
167 cerr << " [server ip:port]: connect to the ip:port pair" << std::endl;
168 cerr << " [numjobs]: how much client threads spawned and do benchmark" << std::endl;
169 cerr << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl;
170 cerr << " [ios]: how much messages sent for each client" << std::endl;
171 cerr << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl;
172 cerr << " [msg length]: message data bytes" << std::endl;
173}
174
175int main(int argc, char **argv)
176{
177 vector<const char*> args;
178 argv_to_vec(argc, (const char **)argv, args);
179
180 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
181 CODE_ENVIRONMENT_UTILITY, 0);
182 common_init_finish(g_ceph_context);
183 g_ceph_context->_conf->apply_changes(NULL);
184
185 if (args.size() < 6) {
186 usage(argv[0]);
187 return 1;
188 }
189
190 int numjobs = atoi(args[1]);
191 int concurrent = atoi(args[2]);
192 int ios = atoi(args[3]);
193 int think_time = atoi(args[4]);
194 int len = atoi(args[5]);
195
196 std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf->get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type;
197
198 cerr << " using ms-public-type " << public_msgr_type << std::endl;
199 cerr << " server ip:port " << args[0] << std::endl;
200 cerr << " numjobs " << numjobs << std::endl;
201 cerr << " concurrency " << concurrent << std::endl;
202 cerr << " ios " << ios << std::endl;
203 cerr << " thinktime(us) " << think_time << std::endl;
204 cerr << " message data bytes " << len << std::endl;
205
206 MessengerClient client(public_msgr_type, args[0], think_time);
207
208 client.ready(concurrent, numjobs, ios, len);
209 Cycles::init();
210 uint64_t start = Cycles::rdtsc();
211 client.start();
212 uint64_t stop = Cycles::rdtsc();
213 cerr << " Total op " << ios << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl;
214
215 return 0;
216}