]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/objectstore_bench.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / objectstore_bench.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <chrono>
5 #include <cassert>
6 #include <condition_variable>
7 #include <memory>
8 #include <mutex>
9 #include <thread>
10
11 #include "os/ObjectStore.h"
12
13 #include "global/global_init.h"
14
15 #include "common/strtol.h"
16 #include "common/ceph_argparse.h"
17
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_filestore
20
21 using namespace std;
22
23 static void usage()
24 {
25 cout << "usage: ceph_objectstore_bench [flags]\n"
26 " --size\n"
27 " total size in bytes\n"
28 " --block-size\n"
29 " block size in bytes for each write\n"
30 " --repeats\n"
31 " number of times to repeat the write cycle\n"
32 " --threads\n"
33 " number of threads to carry out this workload\n"
34 " --multi-object\n"
35 " have each thread write to a separate object\n" << std::endl;
36 generic_server_usage();
37 }
38
39 // helper class for bytes with units
40 struct byte_units {
41 size_t v;
42 // cppcheck-suppress noExplicitConstructor
43 byte_units(size_t v) : v(v) {}
44
45 bool parse(const std::string &val, std::string *err);
46
47 operator size_t() const { return v; }
48 };
49
50 bool byte_units::parse(const std::string &val, std::string *err)
51 {
52 v = strict_iecstrtoll(val, err);
53 return err->empty();
54 }
55
56 std::ostream& operator<<(std::ostream &out, const byte_units &amount)
57 {
58 static const char* units[] = { "B", "KB", "MB", "GB", "TB", "PB", "EB" };
59 static const int max_units = sizeof(units)/sizeof(*units);
60
61 int unit = 0;
62 auto v = amount.v;
63 while (v >= 1024 && unit < max_units) {
64 // preserve significant bytes
65 if (v < 1048576 && (v % 1024 != 0))
66 break;
67 v >>= 10;
68 unit++;
69 }
70 return out << v << ' ' << units[unit];
71 }
72
73 struct Config {
74 byte_units size;
75 byte_units block_size;
76 int repeats;
77 int threads;
78 bool multi_object;
79 Config()
80 : size(1048576), block_size(4096),
81 repeats(1), threads(1),
82 multi_object(false) {}
83 };
84
85 class C_NotifyCond : public Context {
86 std::mutex *mutex;
87 std::condition_variable *cond;
88 bool *done;
89 public:
90 C_NotifyCond(std::mutex *mutex, std::condition_variable *cond, bool *done)
91 : mutex(mutex), cond(cond), done(done) {}
92 void finish(int r) override {
93 std::lock_guard<std::mutex> lock(*mutex);
94 *done = true;
95 cond->notify_one();
96 }
97 };
98
99 void osbench_worker(ObjectStore *os, const Config &cfg,
100 const coll_t cid, const ghobject_t oid,
101 uint64_t starting_offset)
102 {
103 bufferlist data;
104 data.append(buffer::create(cfg.block_size));
105
106 dout(0) << "Writing " << cfg.size
107 << " in blocks of " << cfg.block_size << dendl;
108
109 ceph_assert(starting_offset < cfg.size);
110 ceph_assert(starting_offset % cfg.block_size == 0);
111
112 ObjectStore::CollectionHandle ch = os->open_collection(cid);
113 ceph_assert(ch);
114
115 for (int i = 0; i < cfg.repeats; ++i) {
116 uint64_t offset = starting_offset;
117 size_t len = cfg.size;
118
119 vector<ObjectStore::Transaction> tls;
120
121 std::cout << "Write cycle " << i << std::endl;
122 while (len) {
123 size_t count = len < cfg.block_size ? len : (size_t)cfg.block_size;
124
125 auto t = new ObjectStore::Transaction;
126 t->write(cid, oid, offset, count, data);
127 tls.push_back(std::move(*t));
128 delete t;
129
130 offset += count;
131 if (offset > cfg.size)
132 offset -= cfg.size;
133 len -= count;
134 }
135
136 // set up the finisher
137 std::mutex mutex;
138 std::condition_variable cond;
139 bool done = false;
140
141 tls.back().register_on_commit(new C_NotifyCond(&mutex, &cond, &done));
142 os->queue_transactions(ch, tls);
143
144 std::unique_lock<std::mutex> lock(mutex);
145 cond.wait(lock, [&done](){ return done; });
146 lock.unlock();
147 }
148 }
149
150 int main(int argc, const char *argv[])
151 {
152 // command-line arguments
153 auto args = argv_to_vec(argc, argv);
154
155 if (args.empty()) {
156 cerr << argv[0] << ": -h or --help for usage" << std::endl;
157 exit(1);
158 }
159 if (ceph_argparse_need_usage(args)) {
160 usage();
161 exit(0);
162 }
163
164 auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
165 CODE_ENVIRONMENT_UTILITY,
166 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
167
168 Config cfg;
169 std::string val;
170 vector<const char*>::iterator i = args.begin();
171 while (i != args.end()) {
172 if (ceph_argparse_double_dash(args, i))
173 break;
174
175 if (ceph_argparse_witharg(args, i, &val, "--size", (char*)nullptr)) {
176 std::string err;
177 if (!cfg.size.parse(val, &err)) {
178 derr << "error parsing size: " << err << dendl;
179 exit(1);
180 }
181 } else if (ceph_argparse_witharg(args, i, &val, "--block-size", (char*)nullptr)) {
182 std::string err;
183 if (!cfg.block_size.parse(val, &err)) {
184 derr << "error parsing block-size: " << err << dendl;
185 exit(1);
186 }
187 } else if (ceph_argparse_witharg(args, i, &val, "--repeats", (char*)nullptr)) {
188 cfg.repeats = atoi(val.c_str());
189 } else if (ceph_argparse_witharg(args, i, &val, "--threads", (char*)nullptr)) {
190 cfg.threads = atoi(val.c_str());
191 } else if (ceph_argparse_flag(args, i, "--multi-object", (char*)nullptr)) {
192 cfg.multi_object = true;
193 } else {
194 derr << "Error: can't understand argument: " << *i << "\n" << dendl;
195 exit(1);
196 }
197 }
198
199 common_init_finish(g_ceph_context);
200
201 // create object store
202 dout(0) << "objectstore " << g_conf()->osd_objectstore << dendl;
203 dout(0) << "data " << g_conf()->osd_data << dendl;
204 dout(0) << "journal " << g_conf()->osd_journal << dendl;
205 dout(0) << "size " << cfg.size << dendl;
206 dout(0) << "block-size " << cfg.block_size << dendl;
207 dout(0) << "repeats " << cfg.repeats << dendl;
208 dout(0) << "threads " << cfg.threads << dendl;
209
210 auto os =
211 ObjectStore::create(g_ceph_context,
212 g_conf()->osd_objectstore,
213 g_conf()->osd_data,
214 g_conf()->osd_journal);
215
216 //Checking data folder: create if needed or error if it's not empty
217 DIR *dir = ::opendir(g_conf()->osd_data.c_str());
218 if (!dir) {
219 std::string cmd("mkdir -p ");
220 cmd+=g_conf()->osd_data;
221 int r = ::system( cmd.c_str() );
222 if( r<0 ){
223 derr << "Failed to create data directory, ret = " << r << dendl;
224 return 1;
225 }
226 }
227 else {
228 bool non_empty = readdir(dir) != NULL && readdir(dir) != NULL && readdir(dir) != NULL;
229 if( non_empty ){
230 derr << "Data directory '"<<g_conf()->osd_data<<"' isn't empty, please clean it first."<< dendl;
231 return 1;
232 }
233 }
234 ::closedir(dir);
235
236 //Create folders for journal if needed
237 string journal_base = g_conf()->osd_journal.substr(0, g_conf()->osd_journal.rfind('/'));
238 struct stat sb;
239 if (stat(journal_base.c_str(), &sb) != 0 ){
240 std::string cmd("mkdir -p ");
241 cmd+=journal_base;
242 int r = ::system( cmd.c_str() );
243 if( r<0 ){
244 derr << "Failed to create journal directory, ret = " << r << dendl;
245 return 1;
246 }
247 }
248
249 if (!os) {
250 derr << "bad objectstore type " << g_conf()->osd_objectstore << dendl;
251 return 1;
252 }
253 if (os->mkfs() < 0) {
254 derr << "mkfs failed" << dendl;
255 return 1;
256 }
257 if (os->mount() < 0) {
258 derr << "mount failed" << dendl;
259 return 1;
260 }
261
262 dout(10) << "created objectstore " << os.get() << dendl;
263
264 // create a collection
265 spg_t pg;
266 const coll_t cid(pg);
267 ObjectStore::CollectionHandle ch = os->create_new_collection(cid);
268 {
269 ObjectStore::Transaction t;
270 t.create_collection(cid, 0);
271 os->queue_transaction(ch, std::move(t));
272 }
273
274 // create the objects
275 std::vector<ghobject_t> oids;
276 if (cfg.multi_object) {
277 oids.reserve(cfg.threads);
278 for (int i = 0; i < cfg.threads; i++) {
279 std::stringstream oss;
280 oss << "osbench-thread-" << i;
281 oids.emplace_back(hobject_t(sobject_t(oss.str(), CEPH_NOSNAP)));
282
283 ObjectStore::Transaction t;
284 t.touch(cid, oids[i]);
285 int r = os->queue_transaction(ch, std::move(t));
286 ceph_assert(r == 0);
287 }
288 } else {
289 oids.emplace_back(hobject_t(sobject_t("osbench", CEPH_NOSNAP)));
290
291 ObjectStore::Transaction t;
292 t.touch(cid, oids.back());
293 int r = os->queue_transaction(ch, std::move(t));
294 ceph_assert(r == 0);
295 }
296
297 // run the worker threads
298 std::vector<std::thread> workers;
299 workers.reserve(cfg.threads);
300
301 using namespace std::chrono;
302 auto t1 = high_resolution_clock::now();
303 for (int i = 0; i < cfg.threads; i++) {
304 const auto &oid = cfg.multi_object ? oids[i] : oids[0];
305 workers.emplace_back(osbench_worker, os.get(), std::ref(cfg),
306 cid, oid, i * cfg.size / cfg.threads);
307 }
308 for (auto &worker : workers)
309 worker.join();
310 auto t2 = high_resolution_clock::now();
311 workers.clear();
312
313 auto duration = duration_cast<microseconds>(t2 - t1);
314 byte_units total = cfg.size * cfg.repeats * cfg.threads;
315 byte_units rate = (1000000LL * total) / duration.count();
316 size_t iops = (1000000LL * total / cfg.block_size) / duration.count();
317 dout(0) << "Wrote " << total << " in "
318 << duration.count() << "us, at a rate of " << rate << "/s and "
319 << iops << " iops" << dendl;
320
321 // remove the objects
322 ObjectStore::Transaction t;
323 for (const auto &oid : oids)
324 t.remove(cid, oid);
325 os->queue_transaction(ch, std::move(t));
326
327 os->umount();
328 return 0;
329 }