]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/kv_store_bench.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / kv_store_bench.cc
1 /*
2 * KvStoreBench.cc
3 *
4 * Created on: Aug 23, 2012
5 * Author: eleanor
6 */
7
8 #include "test/kv_store_bench.h"
9 #include "key_value_store/key_value_structure.h"
10 #include "key_value_store/kv_flat_btree_async.h"
11 #include "include/rados/librados.hpp"
12 #include "test/omap_bench.h"
13 #include "common/ceph_argparse.h"
14
15
16 #include <string>
17 #include <climits>
18 #include <iostream>
19 #include <sstream>
20 #include <cmath>
21
22 KvStoreBench::KvStoreBench()
23 : entries(30),
24 ops(100),
25 clients(5),
26 key_size(5),
27 val_size(7),
28 max_ops_in_flight(8),
29 clear_first(false),
30 k(2),
31 cache_size(10),
32 cache_refresh(1),
33 client_name("admin"),
34 verbose(false),
35 kvs(NULL),
36 ops_in_flight(0),
37 rados_id("admin"),
38 pool_name("rbd"),
39 io_ctx_ready(false)
40 {
41 probs[25] = 'i';
42 probs[50] = 'u';
43 probs[75] = 'd';
44 probs[100] = 'r';
45 }
46
47 KvStoreBench::~KvStoreBench()
48 {
49 if (io_ctx_ready) {
50 librados::ObjectWriteOperation owo;
51 owo.remove();
52 io_ctx.operate(client_name + ".done-setting", &owo);
53 }
54 delete kvs;
55 }
56
57 int KvStoreBench::setup(int argc, const char** argv) {
58 auto args = argv_to_vec(argc, argv);
59 srand(time(NULL));
60
61 stringstream help;
62 help
63 << "Usage: KvStoreBench [options]\n"
64 << "Generate latency and throughput statistics for the key value store\n"
65 << "\n"
66 << "There are two sets of options - workload options affect the kind of\n"
67 << "test to run, while algorithm options affect how the key value\n"
68 << "store handles the workload.\n"
69 << "\n"
70 << "There are about entries / k objects in the store to begin with.\n"
71 << "Higher k values reduce the likelihood of splits and the likelihood\n"
72 << "multiple writers simultaneously faling to write because an object \n"
73 << "is full, but having a high k also means there will be more object\n"
74 << "contention.\n"
75 << "\n"
76 << "WORKLOAD OPTIONS\n"
77 << " --name <client name> client name (default admin)\n"
78 << " --entries <number> number of key/value pairs to store initially\n"
79 << " (default " << entries << ")\n"
80 << " --ops <number> number of operations to run\n"
81 << " --keysize <number> number of characters per key (default " << key_size << ")\n"
82 << " --valsize <number> number of characters per value (default " << val_size << ")\n"
83 << " -t <number> number of operations in flight concurrently\n"
84 << " (default " << max_ops_in_flight << ")\n"
85 << " --clients <number> tells this instance how many total clients are. Note that\n"
86 << " changing this does not change the number of clients."
87 << " -d <insert> <update> <delete> <read> percent (1-100) of operations that should be of each type\n"
88 << " (default 25 25 25 25)\n"
89 << " -r <number> random seed to use (default time(0))\n"
90 << "ALGORITHM OPTIONS\n"
91 << " --kval k, where each object has a number of entries\n"
92 << " >= k and <= 2k.\n"
93 << " --cache-size number of index entries to keep in cache\n"
94 << " (default " << cache_size << ")\n"
95 << " --cache-refresh percent (1-100) of cache-size to read each \n"
96 << " time the index is read\n"
97 << "OTHER OPTIONS\n"
98 << " --verbosity-on display debug output\n"
99 << " --clear-first delete all existing objects in the pool before running tests\n";
100 for (unsigned i = 0; i < args.size(); i++) {
101 if(i < args.size() - 1) {
102 if (strcmp(args[i], "--ops") == 0) {
103 ops = atoi(args[i+1]);
104 } else if (strcmp(args[i], "--entries") == 0) {
105 entries = atoi(args[i+1]);
106 } else if (strcmp(args[i], "--kval") == 0) {
107 k = atoi(args[i+1]);
108 } else if (strcmp(args[i], "--keysize") == 0) {
109 key_size = atoi(args[i+1]);
110 } else if (strcmp(args[i], "--valsize") == 0) {
111 val_size = atoi(args[i+1]);
112 } else if (strcmp(args[i], "--cache-size") == 0) {
113 cache_size = atoi(args[i+1]);
114 } else if (strcmp(args[i], "--cache-refresh") == 0) {
115 auto temp = atoi(args[i+1]);
116 assert (temp != 0);
117 cache_refresh = 100 / (double)temp;
118 } else if (strcmp(args[i], "-t") == 0) {
119 max_ops_in_flight = atoi(args[i+1]);
120 } else if (strcmp(args[i], "--clients") == 0) {
121 clients = atoi(args[i+1]);
122 } else if (strcmp(args[i], "-d") == 0) {
123 if (i + 4 >= args.size()) {
124 cout << "Invalid arguments after -d: there must be 4 of them."
125 << std::endl;
126 continue;
127 } else {
128 probs.clear();
129 int sum = atoi(args[i + 1]);
130 probs[sum] = 'i';
131 sum += atoi(args[i + 2]);
132 probs[sum] = 'u';
133 sum += atoi(args[i + 3]);
134 probs[sum] = 'd';
135 sum += atoi(args[i + 4]);
136 probs[sum] = 'r';
137 if (sum != 100) {
138 cout << "Invalid arguments after -d: they must add to 100."
139 << std::endl;
140 }
141 }
142 } else if (strcmp(args[i], "--name") == 0) {
143 client_name = args[i+1];
144 } else if (strcmp(args[i], "-r") == 0) {
145 srand(atoi(args[i+1]));
146 }
147 } else if (strcmp(args[i], "--verbosity-on") == 0) {
148 verbose = true;
149 } else if (strcmp(args[i], "--clear-first") == 0) {
150 clear_first = true;
151 } else if (strcmp(args[i], "--help") == 0) {
152 cout << help.str() << std::endl;
153 exit(1);
154 }
155 }
156
157 KvFlatBtreeAsync * kvba = new KvFlatBtreeAsync(k, client_name, cache_size,
158 cache_refresh, verbose);
159 kvs = kvba;
160
161 int r = rados.init(rados_id.c_str());
162 if (r < 0) {
163 cout << "error during init" << std::endl;
164 return r;
165 }
166 r = rados.conf_parse_argv(argc, argv);
167 if (r < 0) {
168 cout << "error during parsing args" << std::endl;
169 return r;
170 }
171 r = rados.conf_parse_env(NULL);
172 if (r < 0) {
173 cout << "error during parsing env" << std::endl;
174 return r;
175 }
176 r = rados.conf_read_file(NULL);
177 if (r < 0) {
178 cout << "error during read file" << std::endl;
179 return r;
180 }
181 r = rados.connect();
182 if (r < 0) {
183 cout << "error during connect: " << r << std::endl;
184 return r;
185 }
186 r = rados.ioctx_create(pool_name.c_str(), io_ctx);
187 if (r < 0) {
188 cout << "error creating io ctx" << std::endl;
189 rados.shutdown();
190 return r;
191 }
192 io_ctx_ready = true;
193
194 if (clear_first) {
195 librados::NObjectIterator it;
196 for (it = io_ctx.nobjects_begin(); it != io_ctx.nobjects_end(); ++it) {
197 librados::ObjectWriteOperation rm;
198 rm.remove();
199 io_ctx.operate(it->get_oid(), &rm);
200 }
201 }
202
203 int err = kvs->setup(argc, argv);
204 if (err < 0 && err != -17) {
205 cout << "error during setup of kvs: " << err << std::endl;
206 return err;
207 }
208
209 return 0;
210 }
211
212 string KvStoreBench::random_string(int len) {
213 string ret;
214 string alphanum = "0123456789"
215 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
216 "abcdefghijklmnopqrstuvwxyz";
217 for (int i = 0; i < len; ++i) {
218 ret.push_back(alphanum[rand() % (alphanum.size() - 1)]);
219 }
220
221 return ret;
222 }
223
224 pair<string, bufferlist> KvStoreBench::rand_distr(bool new_elem) {
225 pair<string, bufferlist> ret;
226 if (new_elem) {
227 ret = make_pair(random_string(key_size),
228 KvFlatBtreeAsync::to_bl(random_string(val_size)));
229 key_set.insert(ret.first);
230 } else {
231 if (key_set.size() == 0) {
232 return make_pair("",KvFlatBtreeAsync::to_bl(""));
233 }
234 string get_string = random_string(key_size);
235 std::set<string>::iterator it = key_set.lower_bound(get_string);
236 if (it == key_set.end()) {
237 ret.first = *(key_set.rbegin());
238 } else {
239 ret.first = *it;
240 }
241 ret.second = KvFlatBtreeAsync::to_bl(random_string(val_size));
242 }
243 return ret;
244 }
245
246 int KvStoreBench::test_random_insertions() {
247 int err;
248 if (entries == 0) {
249 return 0;
250 }
251 stringstream prev_ss;
252 prev_ss << (atoi(client_name.c_str()) - 1);
253 string prev_rid = prev_ss.str();
254 stringstream last_ss;
255 if (client_name.size() > 1) {
256 last_ss << client_name.substr(0,client_name.size() - 2);
257 }
258 last_ss << clients - 1;
259 string last_rid = client_name == "admin" ? "admin" : last_ss.str();
260
261 map<string, bufferlist> big_map;
262 for (int i = 0; i < entries; i++) {
263 bufferlist bfr;
264 bfr.append(random_string(7));
265 big_map[random_string(5)] = bfr;
266 }
267
268 uint64_t uint;
269 time_t t;
270 if (client_name[client_name.size() - 1] != '0' && client_name != "admin") {
271 do {
272 librados::ObjectReadOperation oro;
273 oro.stat(&uint, &t, &err);
274 err = io_ctx.operate(prev_rid + ".done-setting", &oro, NULL);
275 if (verbose) cout << "reading " << prev_rid << ": err = " << err
276 << std::endl;
277 } while (err != 0);
278 cout << "detected " << prev_rid << ".done-setting" << std::endl;
279 }
280
281 cout << "testing random insertions";
282 err = kvs->set_many(big_map);
283 if (err < 0) {
284 cout << "error setting things" << std::endl;
285 return err;
286 }
287
288 librados::ObjectWriteOperation owo;
289 owo.create(true);
290 io_ctx.operate(client_name + ".done-setting", &owo);
291 cout << "created " << client_name + ".done-setting. waiting for "
292 << last_rid << ".done-setting" << std::endl;
293
294 do {
295 librados::ObjectReadOperation oro;
296 oro.stat(&uint, &t, &err);
297 err = io_ctx.operate(last_rid + ".done-setting", &oro, NULL);
298 } while (err != 0);
299 cout << "detected " << last_rid << ".done-setting" << std::endl;
300
301 return err;
302 }
303
304 void KvStoreBench::aio_callback_timed(int * err, void *arg) {
305 timed_args *args = reinterpret_cast<timed_args *>(arg);
306 ceph::mutex * ops_in_flight_lock = &args->kvsb->ops_in_flight_lock;
307 ceph::mutex * data_lock = &args->kvsb->data_lock;
308 ceph::condition_variable * op_avail = &args->kvsb->op_avail;
309 int *ops_in_flight = &args->kvsb->ops_in_flight;
310 if (*err < 0 && *err != -61) {
311 cerr << "Error during " << args->op << " operation: " << *err << std::endl;
312 }
313
314 args->sw.stop_time();
315 double time = args->sw.get_time();
316 args->sw.clear();
317
318 data_lock->lock();
319 //latency
320 args->kvsb->data.latency_jf.open_object_section("latency");
321 args->kvsb->data.latency_jf.dump_float(string(1, args->op).c_str(),
322 time);
323 args->kvsb->data.latency_jf.close_section();
324
325 //throughput
326 args->kvsb->data.throughput_jf.open_object_section("throughput");
327 args->kvsb->data.throughput_jf.dump_unsigned(string(1, args->op).c_str(),
328 ceph_clock_now());
329 args->kvsb->data.throughput_jf.close_section();
330
331 data_lock->unlock();
332
333 ops_in_flight_lock->lock();
334 (*ops_in_flight)--;
335 op_avail->notify_all();
336 ops_in_flight_lock->unlock();
337
338 delete args;
339 }
340
341 int KvStoreBench::test_teuthology_aio(next_gen_t distr,
342 const map<int, char> &probs)
343 {
344 int err = 0;
345 cout << "inserting initial entries..." << std::endl;
346 err = test_random_insertions();
347 if (err < 0) {
348 return err;
349 }
350 cout << "finished inserting initial entries. Waiting 10 seconds for everyone"
351 << " to catch up..." << std::endl;
352
353 sleep(10);
354
355 cout << "done waiting. Starting random operations..." << std::endl;
356
357 std::unique_lock l{ops_in_flight_lock};
358 for (int i = 0; i < ops; i++) {
359 ceph_assert(ops_in_flight <= max_ops_in_flight);
360 if (ops_in_flight == max_ops_in_flight) {
361 op_avail.wait(l);
362 ceph_assert(ops_in_flight < max_ops_in_flight);
363 }
364 cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / "
365 << ops << std::endl;
366 timed_args * cb_args = new timed_args(this);
367 pair<string, bufferlist> kv;
368 int random = (rand() % 100);
369 cb_args->op = probs.lower_bound(random)->second;
370 switch (cb_args->op) {
371 case 'i':
372 kv = (((KvStoreBench *)this)->*distr)(true);
373 if (kv.first == "") {
374 i--;
375 delete cb_args;
376 continue;
377 }
378 ops_in_flight++;
379 cb_args->sw.start_time();
380 kvs->aio_set(kv.first, kv.second, false, aio_callback_timed,
381 cb_args, &cb_args->err);
382 break;
383 case 'u':
384 kv = (((KvStoreBench *)this)->*distr)(false);
385 if (kv.first == "") {
386 i--;
387 delete cb_args;
388 continue;
389 }
390 ops_in_flight++;
391 cb_args->sw.start_time();
392 kvs->aio_set(kv.first, kv.second, true, aio_callback_timed,
393 cb_args, &cb_args->err);
394 break;
395 case 'd':
396 kv = (((KvStoreBench *)this)->*distr)(false);
397 if (kv.first == "") {
398 i--;
399 delete cb_args;
400 continue;
401 }
402 key_set.erase(kv.first);
403 ops_in_flight++;
404 cb_args->sw.start_time();
405 kvs->aio_remove(kv.first, aio_callback_timed, cb_args, &cb_args->err);
406 break;
407 case 'r':
408 kv = (((KvStoreBench *)this)->*distr)(false);
409 if (kv.first == "") {
410 i--;
411 delete cb_args;
412 continue;
413 }
414 ops_in_flight++;
415 cb_args->sw.start_time();
416 kvs->aio_get(kv.first, &cb_args->val, aio_callback_timed,
417 cb_args, &cb_args->err);
418 break;
419 default:
420 // shouldn't happen here
421 assert(false);
422 }
423
424 }
425
426 op_avail.wait(l, [this] { return ops_in_flight <= 0; });
427
428 print_time_data();
429 return err;
430 }
431
432 int KvStoreBench::test_teuthology_sync(next_gen_t distr,
433 const map<int, char> &probs)
434 {
435 int err = 0;
436 err = test_random_insertions();
437 if (err < 0) {
438 return err;
439 }
440 sleep(10);
441 for (int i = 0; i < ops; i++) {
442 StopWatch sw;
443 pair<char, double> d;
444 cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / "
445 << ops << std::endl;
446 pair<string, bufferlist> kv;
447 int random = (rand() % 100);
448 d.first = probs.lower_bound(random)->second;
449 switch (d.first) {
450 case 'i':
451 kv = (((KvStoreBench *)this)->*distr)(true);
452 if (kv.first == "") {
453 i--;
454 continue;
455 }
456 sw.start_time();
457 err = kvs->set(kv.first, kv.second, true);
458 sw.stop_time();
459 if (err < 0) {
460 cout << "Error setting " << kv << ": " << err << std::endl;
461 return err;
462 }
463 break;
464 case 'u':
465 kv = (((KvStoreBench *)this)->*distr)(false);
466 if (kv.first == "") {
467 i--;
468 continue;
469 }
470 sw.start_time();
471 err = kvs->set(kv.first, kv.second, true);
472 sw.stop_time();
473 if (err < 0 && err != -61) {
474 cout << "Error updating " << kv << ": " << err << std::endl;
475 return err;
476 }
477 break;
478 case 'd':
479 kv = (((KvStoreBench *)this)->*distr)(false);
480 if (kv.first == "") {
481 i--;
482 continue;
483 }
484 key_set.erase(kv.first);
485 sw.start_time();
486 err = kvs->remove(kv.first);
487 sw.stop_time();
488 if (err < 0 && err != -61) {
489 cout << "Error removing " << kv << ": " << err << std::endl;
490 return err;
491 }
492 break;
493 case 'r':
494 kv = (((KvStoreBench *)this)->*distr)(false);
495 if (kv.first == "") {
496 i--;
497 continue;
498 }
499 bufferlist val;
500 sw.start_time();
501 err = kvs->get(kv.first, &kv.second);
502 sw.stop_time();
503 if (err < 0 && err != -61) {
504 cout << "Error getting " << kv << ": " << err << std::endl;
505 return err;
506 }
507 break;
508 }
509
510 double time = sw.get_time();
511 d.second = time;
512 sw.clear();
513 //latency
514 data.latency_jf.open_object_section("latency");
515 data.latency_jf.dump_float(string(1, d.first).c_str(),
516 time);
517 data.latency_jf.close_section();
518 }
519
520 print_time_data();
521 return err;
522 }
523
524 void KvStoreBench::print_time_data() {
525 cout << "========================================================\n";
526 cout << "latency:" << std::endl;
527 data.latency_jf.flush(cout);
528 cout << std::endl;
529 cout << "throughput:" << std::endl;
530 data.throughput_jf.flush(cout);
531 cout << "\n========================================================"
532 << std::endl;
533 }
534
535 int KvStoreBench::teuthology_tests() {
536 int err = 0;
537 if (max_ops_in_flight > 1) {
538 err = test_teuthology_aio(&KvStoreBench::rand_distr, probs);
539 } else {
540 err = test_teuthology_sync(&KvStoreBench::rand_distr, probs);
541 }
542 return err;
543 }
544
545 int main(int argc, const char** argv) {
546 KvStoreBench kvsb;
547 int err = kvsb.setup(argc, argv);
548 if (err == 0) cout << "setup successful" << std::endl;
549 else{
550 cout << "error " << err << std::endl;
551 return err;
552 }
553 err = kvsb.teuthology_tests();
554 if (err < 0) return err;
555 return 0;
556 };