]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/msgr/perf_msgr_client.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / msgr / perf_msgr_client.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Haomai Wang
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <stdlib.h>
18#include <stdint.h>
19#include <string>
20#include <unistd.h>
21#include <iostream>
22
23using namespace std;
24
7c673cae
FG
25#include "common/ceph_argparse.h"
26#include "common/debug.h"
27#include "common/Cycles.h"
28#include "global/global_init.h"
29#include "msg/Messenger.h"
30#include "messages/MOSDOp.h"
9f95a23c 31#include "auth/DummyAuth.h"
7c673cae 32
31f18b77
FG
33#include <atomic>
34
7c673cae
FG
35class MessengerClient {
36 class ClientThread;
37 class ClientDispatcher : public Dispatcher {
38 uint64_t think_time;
39 ClientThread *thread;
40
41 public:
42 ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {}
43 bool ms_can_fast_dispatch_any() const override { return true; }
44 bool ms_can_fast_dispatch(const Message *m) const override {
45 switch (m->get_type()) {
46 case CEPH_MSG_OSD_OPREPLY:
47 return true;
48 default:
49 return false;
50 }
51 }
52
53 void ms_handle_fast_connect(Connection *con) override {}
54 void ms_handle_fast_accept(Connection *con) override {}
55 bool ms_dispatch(Message *m) override { return true; }
56 void ms_fast_dispatch(Message *m) override;
57 bool ms_handle_reset(Connection *con) override { return true; }
58 void ms_handle_remote_reset(Connection *con) override {}
59 bool ms_handle_refused(Connection *con) override { return false; }
aee94f69 60 int ms_handle_fast_authentication(Connection *con) override {
11fdf7f2 61 return 1;
7c673cae
FG
62 }
63 };
64
65 class ClientThread : public Thread {
66 Messenger *msgr;
67 int concurrent;
68 ConnectionRef conn;
31f18b77 69 std::atomic<unsigned> client_inc = { 0 };
7c673cae
FG
70 object_t oid;
71 object_locator_t oloc;
72 pg_t pgid;
73 int msg_len;
74 bufferlist data;
75 int ops;
76 ClientDispatcher dispatcher;
77
78 public:
9f95a23c
TL
79 ceph::mutex lock = ceph::make_mutex("MessengerBenchmark::ClientThread::lock");
80 ceph::condition_variable cond;
7c673cae
FG
81 uint64_t inflight;
82
83 ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us):
31f18b77 84 msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops),
9f95a23c 85 dispatcher(think_time_us, this), inflight(0) {
7c673cae
FG
86 m->add_dispatcher_head(&dispatcher);
87 bufferptr ptr(msg_len);
88 memset(ptr.c_str(), 0, msg_len);
89 data.append(ptr);
90 }
91 void *entry() override {
9f95a23c 92 std::unique_lock locker{lock};
7c673cae
FG
93 for (int i = 0; i < ops; ++i) {
94 if (inflight > uint64_t(concurrent)) {
9f95a23c 95 cond.wait(locker);
7c673cae
FG
96 }
97 hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(),
98 oloc.nspace);
99 spg_t spgid(pgid);
31f18b77 100 MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0);
11fdf7f2
TL
101 bufferlist msg_data(data);
102 m->write(0, msg_len, msg_data);
7c673cae
FG
103 inflight++;
104 conn->send_message(m);
105 //cerr << __func__ << " send m=" << m << std::endl;
106 }
9f95a23c 107 locker.unlock();
7c673cae
FG
108 msgr->shutdown();
109 return 0;
110 }
111 };
112
113 string type;
114 string serveraddr;
115 int think_time_us;
116 vector<Messenger*> msgrs;
117 vector<ClientThread*> clients;
9f95a23c 118 DummyAuthClientServer dummy_auth;
7c673cae
FG
119
120 public:
11fdf7f2 121 MessengerClient(const string &t, const string &addr, int delay):
9f95a23c
TL
122 type(t), serveraddr(addr), think_time_us(delay),
123 dummy_auth(g_ceph_context) {
7c673cae
FG
124 }
125 ~MessengerClient() {
126 for (uint64_t i = 0; i < clients.size(); ++i)
127 delete clients[i];
128 for (uint64_t i = 0; i < msgrs.size(); ++i) {
129 msgrs[i]->shutdown();
130 msgrs[i]->wait();
131 }
132 }
133 void ready(int c, int jobs, int ops, int msg_len) {
134 entity_addr_t addr;
135 addr.parse(serveraddr.c_str());
136 addr.set_nonce(0);
9f95a23c 137 dummy_auth.auth_registry.refresh_config();
7c673cae 138 for (int i = 0; i < jobs; ++i) {
f67539c2 139 Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
7c673cae 140 msgr->set_default_policy(Messenger::Policy::lossless_client(0));
9f95a23c 141 msgr->set_auth_client(&dummy_auth);
11fdf7f2
TL
142 msgr->start();
143 entity_addrvec_t addrs(addr);
144 ConnectionRef conn = msgr->connect_to_osd(addrs);
7c673cae
FG
145 ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us);
146 msgrs.push_back(msgr);
147 clients.push_back(t);
7c673cae
FG
148 }
149 usleep(1000*1000);
150 }
151 void start() {
152 for (uint64_t i = 0; i < clients.size(); ++i)
153 clients[i]->create("client");
154 for (uint64_t i = 0; i < msgrs.size(); ++i)
155 msgrs[i]->wait();
156 }
157};
158
159void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) {
160 usleep(think_time);
161 m->put();
9f95a23c 162 std::lock_guard l{thread->lock};
7c673cae 163 thread->inflight--;
9f95a23c 164 thread->cond.notify_all();
7c673cae
FG
165}
166
167
168void usage(const string &name) {
f67539c2
TL
169 cout << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl;
170 cout << " [server ip:port]: connect to the ip:port pair" << std::endl;
171 cout << " [numjobs]: how much client threads spawned and do benchmark" << std::endl;
172 cout << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl;
173 cout << " [ios]: how much messages sent for each client" << std::endl;
174 cout << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl;
175 cout << " [msg length]: message data bytes" << std::endl;
7c673cae
FG
176}
177
178int main(int argc, char **argv)
179{
20effc67 180 auto args = argv_to_vec(argc, argv);
7c673cae
FG
181
182 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
11fdf7f2
TL
183 CODE_ENVIRONMENT_UTILITY,
184 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
7c673cae 185 common_init_finish(g_ceph_context);
11fdf7f2 186 g_ceph_context->_conf.apply_changes(nullptr);
7c673cae
FG
187
188 if (args.size() < 6) {
189 usage(argv[0]);
190 return 1;
191 }
192
193 int numjobs = atoi(args[1]);
194 int concurrent = atoi(args[2]);
195 int ios = atoi(args[3]);
196 int think_time = atoi(args[4]);
197 int len = atoi(args[5]);
198
11fdf7f2 199 std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf.get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type;
7c673cae 200
f67539c2
TL
201 cout << " using ms-public-type " << public_msgr_type << std::endl;
202 cout << " server ip:port " << args[0] << std::endl;
203 cout << " numjobs " << numjobs << std::endl;
204 cout << " concurrency " << concurrent << std::endl;
205 cout << " ios " << ios << std::endl;
206 cout << " thinktime(us) " << think_time << std::endl;
207 cout << " message data bytes " << len << std::endl;
7c673cae
FG
208
209 MessengerClient client(public_msgr_type, args[0], think_time);
210
211 client.ready(concurrent, numjobs, ios, len);
212 Cycles::init();
213 uint64_t start = Cycles::rdtsc();
214 client.start();
215 uint64_t stop = Cycles::rdtsc();
f67539c2 216 cout << " Total op " << (ios * numjobs) << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl;
7c673cae
FG
217
218 return 0;
219}