1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
18 #include "include/compat.h"
20 #include "common/Cond.h"
21 #include "obj_bencher.h"
23 const std::string BENCH_LASTRUN_METADATA
= "benchmark_last_metadata";
24 const std::string BENCH_PREFIX
= "benchmark_data";
25 static char cached_hostname
[30] = {0};
28 static std::string
generate_object_prefix_nopid() {
29 if (cached_hostname
[0] == 0) {
30 gethostname(cached_hostname
, sizeof(cached_hostname
)-1);
31 cached_hostname
[sizeof(cached_hostname
)-1] = 0;
34 std::ostringstream oss
;
35 oss
<< BENCH_PREFIX
<< "_" << cached_hostname
;
39 static std::string
generate_object_prefix(int pid
= 0) {
43 cached_pid
= getpid();
45 std::ostringstream oss
;
46 oss
<< generate_object_prefix_nopid() << "_" << cached_pid
;
50 static std::string
generate_object_name(int objnum
, int pid
= 0)
52 std::ostringstream oss
;
53 oss
<< generate_object_prefix(pid
) << "_object" << objnum
;
57 static void sanitize_object_contents (bench_data
*data
, size_t length
) {
58 memset(data
->object_contents
, 'z', length
);
61 ostream
& ObjBencher::out(ostream
& os
, utime_t
& t
)
64 return t
.localtime(os
) << " ";
69 ostream
& ObjBencher::out(ostream
& os
)
71 utime_t cur_time
= ceph_clock_now();
72 return out(os
, cur_time
);
75 void *ObjBencher::status_printer(void *_bencher
) {
76 ObjBencher
*bencher
= static_cast<ObjBencher
*>(_bencher
);
77 bench_data
& data
= bencher
->data
;
78 Formatter
*formatter
= bencher
->formatter
;
79 ostream
*outstream
= bencher
->outstream
;
82 int previous_writes
= 0;
83 int cycleSinceChange
= 0;
87 ONE_SECOND
.set_from_double(1.0);
90 formatter
->open_array_section("datas");
92 utime_t cur_time
= ceph_clock_now();
94 if (i
% 20 == 0 && !formatter
) {
96 cur_time
.localtime(cout
) << " min lat: " << data
.min_latency
97 << " max lat: " << data
.max_latency
98 << " avg lat: " << data
.avg_latency
<< std::endl
;
99 //I'm naughty and don't reset the fill
100 bencher
->out(cout
, cur_time
) << setfill(' ')
102 << setw(8) << "Cur ops"
103 << setw(10) << "started"
104 << setw(10) << "finished"
105 << setw(10) << "avg MB/s"
106 << setw(10) << "cur MB/s"
107 << setw(12) << "last lat(s)"
108 << setw(12) << "avg lat(s)" << std::endl
;
110 if (cycleSinceChange
)
111 bandwidth
= (double)(data
.finished
- previous_writes
)
118 if (!std::isnan(bandwidth
) && bandwidth
> -1) {
119 if (bandwidth
> data
.idata
.max_bandwidth
)
120 data
.idata
.max_bandwidth
= bandwidth
;
121 if (bandwidth
< data
.idata
.min_bandwidth
)
122 data
.idata
.min_bandwidth
= bandwidth
;
124 data
.history
.bandwidth
.push_back(bandwidth
);
127 if (cycleSinceChange
)
128 iops
= (double)(data
.finished
- previous_writes
)
133 if (!std::isnan(iops
) && iops
> -1) {
134 if (iops
> data
.idata
.max_iops
)
135 data
.idata
.max_iops
= iops
;
136 if (iops
< data
.idata
.min_iops
)
137 data
.idata
.min_iops
= iops
;
139 data
.history
.iops
.push_back(iops
);
143 formatter
->open_object_section("data");
145 double avg_bandwidth
= (double) (data
.op_size
) * (data
.finished
)
146 / (double)(cur_time
- data
.start_time
) / (1024*1024);
147 if (previous_writes
!= data
.finished
) {
148 previous_writes
= data
.finished
;
149 cycleSinceChange
= 0;
151 bencher
->out(cout
, cur_time
)
154 << ' ' << setw(7) << data
.in_flight
155 << ' ' << setw(9) << data
.started
156 << ' ' << setw(9) << data
.finished
157 << ' ' << setw(9) << avg_bandwidth
158 << ' ' << setw(9) << bandwidth
159 << ' ' << setw(11) << (double)data
.cur_latency
160 << ' ' << setw(11) << data
.avg_latency
<< std::endl
;
162 formatter
->dump_format("sec", "%d", i
);
163 formatter
->dump_format("cur_ops", "%d", data
.in_flight
);
164 formatter
->dump_format("started", "%d", data
.started
);
165 formatter
->dump_format("finished", "%d", data
.finished
);
166 formatter
->dump_format("avg_bw", "%f", avg_bandwidth
);
167 formatter
->dump_format("cur_bw", "%f", bandwidth
);
168 formatter
->dump_format("last_lat", "%f", (double)data
.cur_latency
);
169 formatter
->dump_format("avg_lat", "%f", data
.avg_latency
);
174 bencher
->out(cout
, cur_time
)
177 << ' ' << setw(7) << data
.in_flight
178 << ' ' << setw(9) << data
.started
179 << ' ' << setw(9) << data
.finished
180 << ' ' << setw(9) << avg_bandwidth
181 << ' ' << setw(9) << '0'
182 << ' ' << setw(11) << '-'
183 << ' '<< setw(11) << data
.avg_latency
<< std::endl
;
185 formatter
->dump_format("sec", "%d", i
);
186 formatter
->dump_format("cur_ops", "%d", data
.in_flight
);
187 formatter
->dump_format("started", "%d", data
.started
);
188 formatter
->dump_format("finished", "%d", data
.finished
);
189 formatter
->dump_format("avg_bw", "%f", avg_bandwidth
);
190 formatter
->dump_format("cur_bw", "%f", 0);
191 formatter
->dump_format("last_lat", "%f", 0);
192 formatter
->dump_format("avg_lat", "%f", data
.avg_latency
);
196 formatter
->close_section(); // data
197 formatter
->flush(*outstream
);
201 cond
.WaitInterval(bencher
->lock
, ONE_SECOND
);
204 formatter
->close_section(); //datas
205 bencher
->lock
.Unlock();
209 int ObjBencher::aio_bench(
210 int operation
, int secondsToRun
,
212 uint64_t op_size
, uint64_t object_size
,
213 unsigned max_objects
,
214 bool cleanup
, bool hints
,
215 const std::string
& run_name
, bool no_verify
) {
217 if (concurrentios
<= 0)
225 // default metadata object is used if user does not specify one
226 const std::string run_name_meta
= (run_name
.empty() ? BENCH_LASTRUN_METADATA
: run_name
);
228 //get data from previous write run, if available
229 if (operation
!= OP_WRITE
) {
230 uint64_t prev_op_size
, prev_object_size
;
231 r
= fetch_bench_metadata(run_name_meta
, &prev_op_size
, &prev_object_size
,
232 &num_objects
, &prevPid
);
235 cerr
<< "Must write data before running a read benchmark!" << std::endl
;
238 object_size
= prev_object_size
;
239 op_size
= prev_op_size
;
242 char* contentsChars
= new char[op_size
];
246 data
.object_size
= object_size
;
247 data
.op_size
= op_size
;
251 data
.min_latency
= 9999.0; // this better be higher than initial latency!
252 data
.max_latency
= 0;
253 data
.avg_latency
= 0;
254 data
.object_contents
= contentsChars
;
257 //fill in contentsChars deterministically so we can check returns
258 sanitize_object_contents(&data
, data
.op_size
);
261 formatter
->open_object_section("bench");
263 if (OP_WRITE
== operation
) {
264 r
= write_bench(secondsToRun
, concurrentios
, run_name_meta
, max_objects
);
265 if (r
!= 0) goto out
;
267 else if (OP_SEQ_READ
== operation
) {
268 r
= seq_read_bench(secondsToRun
, num_objects
, concurrentios
, prevPid
, no_verify
);
269 if (r
!= 0) goto out
;
271 else if (OP_RAND_READ
== operation
) {
272 r
= rand_read_bench(secondsToRun
, num_objects
, concurrentios
, prevPid
, no_verify
);
273 if (r
!= 0) goto out
;
276 if (OP_WRITE
== operation
&& cleanup
) {
277 r
= fetch_bench_metadata(run_name_meta
, &op_size
, &object_size
,
278 &num_objects
, &prevPid
);
281 cerr
<< "Should never happen: bench metadata missing for current run!" << std::endl
;
285 data
.start_time
= ceph_clock_now();
286 out(cout
) << "Cleaning up (deleting benchmark objects)" << std::endl
;
288 r
= clean_up(num_objects
, prevPid
, concurrentios
);
289 if (r
!= 0) goto out
;
291 runtime
= ceph_clock_now() - data
.start_time
;
292 out(cout
) << "Clean up completed and total clean up time :" << runtime
<< std::endl
;
295 r
= sync_remove(run_name_meta
);
296 if (r
!= 0) goto out
;
301 formatter
->close_section(); // bench
302 formatter
->flush(*outstream
);
303 *outstream
<< std::endl
;
305 delete[] contentsChars
;
310 explicit lock_cond(Mutex
*_lock
) : lock(_lock
) {}
315 void _aio_cb(void *cb
, void *arg
) {
316 struct lock_cond
*lc
= (struct lock_cond
*)arg
;
323 static T
vec_stddev(vector
<T
>& v
)
330 typename vector
<T
>::iterator iter
;
331 for (iter
= v
.begin(); iter
!= v
.end(); ++iter
) {
338 for (iter
= v
.begin(); iter
!= v
.end(); ++iter
) {
339 T dev
= *iter
- mean
;
343 stddev
/= (v
.size() - 1);
347 int ObjBencher::fetch_bench_metadata(const std::string
& metadata_file
,
348 uint64_t *op_size
, uint64_t* object_size
,
349 int* num_objects
, int* prevPid
) {
351 bufferlist object_data
;
353 r
= sync_read(metadata_file
, object_data
,
354 sizeof(int) * 2 + sizeof(size_t) * 2);
356 // treat an empty file as a file that does not exist
362 bufferlist::iterator p
= object_data
.begin();
363 ::decode(*object_size
, p
);
364 ::decode(*num_objects
, p
);
365 ::decode(*prevPid
, p
);
367 ::decode(*op_size
, p
);
369 *op_size
= *object_size
;
375 int ObjBencher::write_bench(int secondsToRun
,
376 int concurrentios
, const string
& run_name_meta
,
377 unsigned max_objects
) {
378 if (concurrentios
<= 0)
382 out(cout
) << "Maintaining " << concurrentios
<< " concurrent writes of "
383 << data
.op_size
<< " bytes to objects of size "
384 << data
.object_size
<< " for up to "
385 << secondsToRun
<< " seconds or "
386 << max_objects
<< " objects"
389 formatter
->dump_format("concurrent_ios", "%d", concurrentios
);
390 formatter
->dump_format("object_size", "%d", data
.object_size
);
391 formatter
->dump_format("op_size", "%d", data
.op_size
);
392 formatter
->dump_format("seconds_to_run", "%d", secondsToRun
);
393 formatter
->dump_format("max_objects", "%d", max_objects
);
395 bufferlist
* newContents
= 0;
397 std::string prefix
= generate_object_prefix();
399 out(cout
) << "Object prefix: " << prefix
<< std::endl
;
401 formatter
->dump_string("object_prefix", prefix
);
403 std::vector
<string
> name(concurrentios
);
405 bufferlist
* contents
[concurrentios
];
406 double total_latency
= 0;
407 std::vector
<utime_t
> start_times(concurrentios
);
415 unsigned writes_per_object
= 1;
417 writes_per_object
= data
.object_size
/ data
.op_size
;
419 r
= completions_init(concurrentios
);
421 //set up writes so I can start them together
422 for (int i
= 0; i
<concurrentios
; ++i
) {
423 name
[i
] = generate_object_name(i
/ writes_per_object
);
424 contents
[i
] = new bufferlist();
425 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", i
);
426 contents
[i
]->append(data
.object_contents
, data
.op_size
);
429 pthread_t print_thread
;
431 pthread_create(&print_thread
, NULL
, ObjBencher::status_printer
, (void *)this);
432 ceph_pthread_setname(print_thread
, "write_stat");
435 data
.start_time
= ceph_clock_now();
437 for (int i
= 0; i
<concurrentios
; ++i
) {
438 start_times
[i
] = ceph_clock_now();
439 r
= create_completion(i
, _aio_cb
, (void *)&lc
);
442 r
= aio_write(name
[i
], i
, *contents
[i
], data
.op_size
,
443 data
.op_size
* (i
% writes_per_object
));
444 if (r
< 0) { //naughty, doesn't clean up heap
453 //keep on adding new writes as old ones complete until we've passed minimum time
457 //don't need locking for reads because other thread doesn't write
459 runtime
.set_from_double(secondsToRun
);
460 stopTime
= data
.start_time
+ runtime
;
463 while (!secondsToRun
|| ceph_clock_now() < stopTime
) {
468 if (completion_is_done(slot
)) {
473 if (slot
== concurrentios
) {
476 } while (slot
!= old_slot
);
482 //create new contents and name on the heap, and fill them
483 newName
= generate_object_name(data
.started
/ writes_per_object
);
484 newContents
= contents
[slot
];
485 snprintf(newContents
->c_str(), data
.op_size
, "I'm the %16dth op!", data
.started
);
486 // we wrote to buffer, going around internal crc cache, so invalidate it now.
487 newContents
->invalidate_crc();
489 completion_wait(slot
);
491 r
= completion_ret(slot
);
496 data
.cur_latency
= ceph_clock_now() - start_times
[slot
];
497 data
.history
.latency
.push_back(data
.cur_latency
);
498 total_latency
+= data
.cur_latency
;
499 if( data
.cur_latency
> data
.max_latency
) data
.max_latency
= data
.cur_latency
;
500 if (data
.cur_latency
< data
.min_latency
) data
.min_latency
= data
.cur_latency
;
502 data
.avg_latency
= total_latency
/ data
.finished
;
505 release_completion(slot
);
506 timePassed
= ceph_clock_now() - data
.start_time
;
508 //write new stuff to backend
509 start_times
[slot
] = ceph_clock_now();
510 r
= create_completion(slot
, _aio_cb
, &lc
);
513 r
= aio_write(newName
, slot
, *newContents
, data
.op_size
,
514 data
.op_size
* (data
.started
% writes_per_object
));
515 if (r
< 0) {//naughty; doesn't clean up heap space.
518 name
[slot
] = newName
;
523 data
.started
>= (int)((data
.object_size
* max_objects
+ data
.op_size
- 1) /
529 while (data
.finished
< data
.started
) {
530 slot
= data
.finished
% concurrentios
;
531 completion_wait(slot
);
533 r
= completion_ret(slot
);
538 data
.cur_latency
= ceph_clock_now() - start_times
[slot
];
539 data
.history
.latency
.push_back(data
.cur_latency
);
540 total_latency
+= data
.cur_latency
;
541 if (data
.cur_latency
> data
.max_latency
) data
.max_latency
= data
.cur_latency
;
542 if (data
.cur_latency
< data
.min_latency
) data
.min_latency
= data
.cur_latency
;
544 data
.avg_latency
= total_latency
/ data
.finished
;
547 release_completion(slot
);
548 delete contents
[slot
];
552 timePassed
= ceph_clock_now() - data
.start_time
;
557 pthread_join(print_thread
, NULL
);
560 bandwidth
= ((double)data
.finished
)*((double)data
.op_size
)/(double)timePassed
;
561 bandwidth
= bandwidth
/(1024*1024); // we want it in MB/sec
564 out(cout
) << "Total time run: " << timePassed
<< std::endl
565 << "Total writes made: " << data
.finished
<< std::endl
566 << "Write size: " << data
.op_size
<< std::endl
567 << "Object size: " << data
.object_size
<< std::endl
568 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth
<< std::endl
569 << "Stddev Bandwidth: " << vec_stddev(data
.history
.bandwidth
) << std::endl
570 << "Max bandwidth (MB/sec): " << data
.idata
.max_bandwidth
<< std::endl
571 << "Min bandwidth (MB/sec): " << data
.idata
.min_bandwidth
<< std::endl
572 << "Average IOPS: " << (int)(data
.finished
/timePassed
) << std::endl
573 << "Stddev IOPS: " << vec_stddev(data
.history
.iops
) << std::endl
574 << "Max IOPS: " << data
.idata
.max_iops
<< std::endl
575 << "Min IOPS: " << data
.idata
.min_iops
<< std::endl
576 << "Average Latency(s): " << data
.avg_latency
<< std::endl
577 << "Stddev Latency(s): " << vec_stddev(data
.history
.latency
) << std::endl
578 << "Max latency(s): " << data
.max_latency
<< std::endl
579 << "Min latency(s): " << data
.min_latency
<< std::endl
;
581 formatter
->dump_format("total_time_run", "%f", (double)timePassed
);
582 formatter
->dump_format("total_writes_made", "%d", data
.finished
);
583 formatter
->dump_format("write_size", "%d", data
.op_size
);
584 formatter
->dump_format("object_size", "%d", data
.object_size
);
585 formatter
->dump_format("bandwidth", "%f", bandwidth
);
586 formatter
->dump_format("stddev_bandwidth", "%f", vec_stddev(data
.history
.bandwidth
));
587 formatter
->dump_format("max_bandwidth", "%f", data
.idata
.max_bandwidth
);
588 formatter
->dump_format("min_bandwidth", "%f", data
.idata
.min_bandwidth
);
589 formatter
->dump_format("average_iops", "%d", (int)(data
.finished
/timePassed
));
590 formatter
->dump_format("stddev_iops", "%d", vec_stddev(data
.history
.iops
));
591 formatter
->dump_format("max_iops", "%d", data
.idata
.max_iops
);
592 formatter
->dump_format("min_iops", "%d", data
.idata
.min_iops
);
593 formatter
->dump_format("average_latency", "%f", data
.avg_latency
);
594 formatter
->dump_format("stddev_latency", "%f", vec_stddev(data
.history
.latency
));
595 formatter
->dump_format("max_latency", "%f", data
.max_latency
);
596 formatter
->dump_format("min_latency", "%f", data
.min_latency
);
598 //write object size/number data for read benchmarks
599 ::encode(data
.object_size
, b_write
);
600 num_objects
= (data
.finished
+ writes_per_object
- 1) / writes_per_object
;
601 ::encode(num_objects
, b_write
);
602 ::encode(getpid(), b_write
);
603 ::encode(data
.op_size
, b_write
);
605 // persist meta-data for further cleanup or read
606 sync_write(run_name_meta
, b_write
, sizeof(int)*3);
609 for (int i
= 0; i
< concurrentios
; i
++)
619 pthread_join(print_thread
, NULL
);
620 for (int i
= 0; i
< concurrentios
; i
++)
626 int ObjBencher::seq_read_bench(int seconds_to_run
, int num_objects
, int concurrentios
, int pid
, bool no_verify
) {
629 if (concurrentios
<= 0)
632 std::vector
<string
> name(concurrentios
);
634 bufferlist
* contents
[concurrentios
];
635 int index
[concurrentios
];
638 std::vector
<utime_t
> start_times(concurrentios
);
640 time_to_run
.set_from_double(seconds_to_run
);
641 double total_latency
= 0;
644 sanitize_object_contents(&data
, data
.op_size
); //clean it up once; subsequent
645 //changes will be safe because string length should remain the same
647 unsigned writes_per_object
= 1;
649 writes_per_object
= data
.object_size
/ data
.op_size
;
651 r
= completions_init(concurrentios
);
655 //set up initial reads
656 for (int i
= 0; i
< concurrentios
; ++i
) {
657 name
[i
] = generate_object_name(i
/ writes_per_object
, pid
);
658 contents
[i
] = new bufferlist();
663 data
.start_time
= ceph_clock_now();
666 pthread_t print_thread
;
667 pthread_create(&print_thread
, NULL
, status_printer
, (void *)this);
668 ceph_pthread_setname(print_thread
, "seq_read_stat");
670 utime_t finish_time
= data
.start_time
+ time_to_run
;
671 //start initial reads
672 for (int i
= 0; i
< concurrentios
; ++i
) {
674 start_times
[i
] = ceph_clock_now();
675 create_completion(i
, _aio_cb
, (void *)&lc
);
676 r
= aio_read(name
[i
], i
, contents
[i
], data
.op_size
,
677 data
.op_size
* (i
% writes_per_object
));
678 if (r
< 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
679 cerr
<< "r = " << r
<< std::endl
;
688 //keep on adding new reads as old ones complete
690 bufferlist
*cur_contents
;
693 while ((!seconds_to_run
|| ceph_clock_now() < finish_time
) &&
694 num_objects
> data
.started
) {
700 if (completion_is_done(slot
)) {
705 if (slot
== concurrentios
) {
708 } while (slot
!= old_slot
);
715 // calculate latency here, so memcmp doesn't inflate it
716 data
.cur_latency
= ceph_clock_now() - start_times
[slot
];
718 cur_contents
= contents
[slot
];
719 int current_index
= index
[slot
];
721 // invalidate internal crc cache
722 cur_contents
->invalidate_crc();
725 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", current_index
);
726 if ( (cur_contents
->length() != data
.op_size
) ||
727 (memcmp(data
.object_contents
, cur_contents
->c_str(), data
.op_size
) != 0) ) {
728 cerr
<< name
[slot
] << " is not correct!" << std::endl
;
733 newName
= generate_object_name(data
.started
/ writes_per_object
, pid
);
734 index
[slot
] = data
.started
;
736 completion_wait(slot
);
738 r
= completion_ret(slot
);
740 cerr
<< "read got " << r
<< std::endl
;
744 total_latency
+= data
.cur_latency
;
745 if (data
.cur_latency
> data
.max_latency
) data
.max_latency
= data
.cur_latency
;
746 if (data
.cur_latency
< data
.min_latency
) data
.min_latency
= data
.cur_latency
;
748 data
.avg_latency
= total_latency
/ data
.finished
;
751 release_completion(slot
);
753 //start new read and check data if requested
754 start_times
[slot
] = ceph_clock_now();
755 create_completion(slot
, _aio_cb
, (void *)&lc
);
756 r
= aio_read(newName
, slot
, contents
[slot
], data
.op_size
,
757 data
.op_size
* (data
.started
% writes_per_object
));
765 name
[slot
] = newName
;
768 //wait for final reads to complete
769 while (data
.finished
< data
.started
) {
770 slot
= data
.finished
% concurrentios
;
771 completion_wait(slot
);
773 r
= completion_ret(slot
);
775 cerr
<< "read got " << r
<< std::endl
;
779 data
.cur_latency
= ceph_clock_now() - start_times
[slot
];
780 total_latency
+= data
.cur_latency
;
781 if (data
.cur_latency
> data
.max_latency
) data
.max_latency
= data
.cur_latency
;
782 if (data
.cur_latency
< data
.min_latency
) data
.min_latency
= data
.cur_latency
;
784 data
.avg_latency
= total_latency
/ data
.finished
;
786 release_completion(slot
);
788 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", index
[slot
]);
790 if ((contents
[slot
]->length() != data
.op_size
) ||
791 (memcmp(data
.object_contents
, contents
[slot
]->c_str(), data
.op_size
) != 0)) {
792 cerr
<< name
[slot
] << " is not correct!" << std::endl
;
798 delete contents
[slot
];
801 runtime
= ceph_clock_now() - data
.start_time
;
806 pthread_join(print_thread
, NULL
);
809 bandwidth
= ((double)data
.finished
)*((double)data
.op_size
)/(double)runtime
;
810 bandwidth
= bandwidth
/(1024*1024); // we want it in MB/sec
813 out(cout
) << "Total time run: " << runtime
<< std::endl
814 << "Total reads made: " << data
.finished
<< std::endl
815 << "Read size: " << data
.op_size
<< std::endl
816 << "Object size: " << data
.object_size
<< std::endl
817 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth
<< std::endl
818 << "Average IOPS: " << (int)(data
.finished
/runtime
) << std::endl
819 << "Stddev IOPS: " << vec_stddev(data
.history
.iops
) << std::endl
820 << "Max IOPS: " << data
.idata
.max_iops
<< std::endl
821 << "Min IOPS: " << data
.idata
.min_iops
<< std::endl
822 << "Average Latency(s): " << data
.avg_latency
<< std::endl
823 << "Max latency(s): " << data
.max_latency
<< std::endl
824 << "Min latency(s): " << data
.min_latency
<< std::endl
;
826 formatter
->dump_format("total_time_run", "%f", (double)runtime
);
827 formatter
->dump_format("total_reads_made", "%d", data
.finished
);
828 formatter
->dump_format("read_size", "%d", data
.op_size
);
829 formatter
->dump_format("object_size", "%d", data
.object_size
);
830 formatter
->dump_format("bandwidth", "%f", bandwidth
);
831 formatter
->dump_format("average_iops", "%d", (int)(data
.finished
/runtime
));
832 formatter
->dump_format("stddev_iops", "%d", vec_stddev(data
.history
.iops
));
833 formatter
->dump_format("max_iops", "%d", data
.idata
.max_iops
);
834 formatter
->dump_format("min_iops", "%d", data
.idata
.min_iops
);
835 formatter
->dump_format("average_latency", "%f", data
.avg_latency
);
836 formatter
->dump_format("max_latency", "%f", data
.max_latency
);
837 formatter
->dump_format("min_latency", "%f", data
.min_latency
);
842 return (errors
> 0 ? -EIO
: 0);
848 pthread_join(print_thread
, NULL
);
852 int ObjBencher::rand_read_bench(int seconds_to_run
, int num_objects
, int concurrentios
, int pid
, bool no_verify
)
856 if (concurrentios
<= 0)
859 std::vector
<string
> name(concurrentios
);
861 bufferlist
* contents
[concurrentios
];
862 int index
[concurrentios
];
865 std::vector
<utime_t
> start_times(concurrentios
);
867 time_to_run
.set_from_double(seconds_to_run
);
868 double total_latency
= 0;
871 sanitize_object_contents(&data
, data
.op_size
); //clean it up once; subsequent
872 //changes will be safe because string length should remain the same
874 unsigned writes_per_object
= 1;
876 writes_per_object
= data
.object_size
/ data
.op_size
;
880 r
= completions_init(concurrentios
);
884 //set up initial reads
885 for (int i
= 0; i
< concurrentios
; ++i
) {
886 name
[i
] = generate_object_name(i
/ writes_per_object
, pid
);
887 contents
[i
] = new bufferlist();
892 data
.start_time
= ceph_clock_now();
895 pthread_t print_thread
;
896 pthread_create(&print_thread
, NULL
, status_printer
, (void *)this);
897 ceph_pthread_setname(print_thread
, "rand_read_stat");
899 utime_t finish_time
= data
.start_time
+ time_to_run
;
900 //start initial reads
901 for (int i
= 0; i
< concurrentios
; ++i
) {
903 start_times
[i
] = ceph_clock_now();
904 create_completion(i
, _aio_cb
, (void *)&lc
);
905 r
= aio_read(name
[i
], i
, contents
[i
], data
.op_size
,
906 data
.op_size
* (i
% writes_per_object
));
907 if (r
< 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
908 cerr
<< "r = " << r
<< std::endl
;
917 //keep on adding new reads as old ones complete
919 bufferlist
*cur_contents
;
923 while ((!seconds_to_run
|| ceph_clock_now() < finish_time
)) {
929 if (completion_is_done(slot
)) {
934 if (slot
== concurrentios
) {
937 } while (slot
!= old_slot
);
944 // calculate latency here, so memcmp doesn't inflate it
945 data
.cur_latency
= ceph_clock_now() - start_times
[slot
];
949 int current_index
= index
[slot
];
950 cur_contents
= contents
[slot
];
951 completion_wait(slot
);
953 r
= completion_ret(slot
);
955 cerr
<< "read got " << r
<< std::endl
;
960 total_latency
+= data
.cur_latency
;
961 if (data
.cur_latency
> data
.max_latency
) data
.max_latency
= data
.cur_latency
;
962 if (data
.cur_latency
< data
.min_latency
) data
.min_latency
= data
.cur_latency
;
964 data
.avg_latency
= total_latency
/ data
.finished
;
969 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", current_index
);
970 if ((cur_contents
->length() != data
.op_size
) ||
971 (memcmp(data
.object_contents
, cur_contents
->c_str(), data
.op_size
) != 0)) {
972 cerr
<< name
[slot
] << " is not correct!" << std::endl
;
977 rand_id
= rand() % num_objects
;
978 newName
= generate_object_name(rand_id
/ writes_per_object
, pid
);
979 index
[slot
] = rand_id
;
980 release_completion(slot
);
982 // invalidate internal crc cache
983 cur_contents
->invalidate_crc();
985 //start new read and check data if requested
986 start_times
[slot
] = ceph_clock_now();
987 create_completion(slot
, _aio_cb
, (void *)&lc
);
988 r
= aio_read(newName
, slot
, contents
[slot
], data
.op_size
,
989 data
.op_size
* (rand_id
% writes_per_object
));
997 name
[slot
] = newName
;
1001 //wait for final reads to complete
1002 while (data
.finished
< data
.started
) {
1003 slot
= data
.finished
% concurrentios
;
1004 completion_wait(slot
);
1006 r
= completion_ret(slot
);
1008 cerr
<< "read got " << r
<< std::endl
;
1012 data
.cur_latency
= ceph_clock_now() - start_times
[slot
];
1013 total_latency
+= data
.cur_latency
;
1014 if (data
.cur_latency
> data
.max_latency
) data
.max_latency
= data
.cur_latency
;
1015 if (data
.cur_latency
< data
.min_latency
) data
.min_latency
= data
.cur_latency
;
1017 data
.avg_latency
= total_latency
/ data
.finished
;
1019 release_completion(slot
);
1021 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", index
[slot
]);
1023 if ((contents
[slot
]->length() != data
.op_size
) ||
1024 (memcmp(data
.object_contents
, contents
[slot
]->c_str(), data
.op_size
) != 0)) {
1025 cerr
<< name
[slot
] << " is not correct!" << std::endl
;
1031 delete contents
[slot
];
1034 runtime
= ceph_clock_now() - data
.start_time
;
1039 pthread_join(print_thread
, NULL
);
1042 bandwidth
= ((double)data
.finished
)*((double)data
.op_size
)/(double)runtime
;
1043 bandwidth
= bandwidth
/(1024*1024); // we want it in MB/sec
1046 out(cout
) << "Total time run: " << runtime
<< std::endl
1047 << "Total reads made: " << data
.finished
<< std::endl
1048 << "Read size: " << data
.op_size
<< std::endl
1049 << "Object size: " << data
.object_size
<< std::endl
1050 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth
<< std::endl
1051 << "Average IOPS: " << (int)(data
.finished
/runtime
) << std::endl
1052 << "Stddev IOPS: " << vec_stddev(data
.history
.iops
) << std::endl
1053 << "Max IOPS: " << data
.idata
.max_iops
<< std::endl
1054 << "Min IOPS: " << data
.idata
.min_iops
<< std::endl
1055 << "Average Latency(s): " << data
.avg_latency
<< std::endl
1056 << "Max latency(s): " << data
.max_latency
<< std::endl
1057 << "Min latency(s): " << data
.min_latency
<< std::endl
;
1059 formatter
->dump_format("total_time_run", "%f", (double)runtime
);
1060 formatter
->dump_format("total_reads_made", "%d", data
.finished
);
1061 formatter
->dump_format("read_size", "%d", data
.op_size
);
1062 formatter
->dump_format("object_size", "%d", data
.object_size
);
1063 formatter
->dump_format("bandwidth", "%f", bandwidth
);
1064 formatter
->dump_format("average_iops", "%d", (int)(data
.finished
/runtime
));
1065 formatter
->dump_format("stddev_iops", "%d", vec_stddev(data
.history
.iops
));
1066 formatter
->dump_format("max_iops", "%d", data
.idata
.max_iops
);
1067 formatter
->dump_format("min_iops", "%d", data
.idata
.min_iops
);
1068 formatter
->dump_format("average_latency", "%f", data
.avg_latency
);
1069 formatter
->dump_format("max_latency", "%f", data
.max_latency
);
1070 formatter
->dump_format("min_latency", "%f", data
.min_latency
);
1074 return (errors
> 0 ? -EIO
: 0);
1080 pthread_join(print_thread
, NULL
);
1084 int ObjBencher::clean_up(const std::string
& orig_prefix
, int concurrentios
, const std::string
& run_name
) {
1086 uint64_t op_size
, object_size
;
1090 // default meta object if user does not specify one
1091 const std::string run_name_meta
= (run_name
.empty() ? BENCH_LASTRUN_METADATA
: run_name
);
1092 const std::string prefix
= (orig_prefix
.empty() ? generate_object_prefix_nopid() : orig_prefix
);
1094 if (prefix
.substr(0, BENCH_PREFIX
.length()) != BENCH_PREFIX
) {
1095 cerr
<< "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX
<< "\"" << std::endl
;
1099 std::list
<Object
> unfiltered_objects
;
1100 std::set
<std::string
> meta_namespaces
, all_namespaces
;
1102 // If caller set all_nspaces this will be searching
1103 // across multiple namespaces.
1105 bool objects_remain
= get_objects(&unfiltered_objects
, 20);
1106 if (!objects_remain
)
1109 std::list
<Object
>::const_iterator i
= unfiltered_objects
.begin();
1110 for ( ; i
!= unfiltered_objects
.end(); ++i
) {
1111 if (i
->first
== run_name_meta
) {
1112 meta_namespaces
.insert(i
->second
);
1114 if (i
->first
.substr(0, prefix
.length()) == prefix
) {
1115 all_namespaces
.insert(i
->second
);
1120 std::set
<std::string
>::const_iterator i
= all_namespaces
.begin();
1121 for ( ; i
!= all_namespaces
.end(); ++i
) {
1124 // if no metadata file found we should try to do a linear search on the prefix
1125 if (meta_namespaces
.find(*i
) == meta_namespaces
.end()) {
1126 int r
= clean_up_slow(prefix
, concurrentios
);
1128 cerr
<< "clean_up_slow error r= " << r
<< std::endl
;
1134 r
= fetch_bench_metadata(run_name_meta
, &op_size
, &object_size
, &num_objects
, &prevPid
);
1139 r
= clean_up(num_objects
, prevPid
, concurrentios
);
1140 if (r
!= 0) return r
;
1142 r
= sync_remove(run_name_meta
);
1143 if (r
!= 0) return r
;
1149 int ObjBencher::clean_up(int num_objects
, int prevPid
, int concurrentios
) {
1150 lock_cond
lc(&lock
);
1152 if (concurrentios
<= 0)
1155 std::vector
<string
> name(concurrentios
);
1156 std::string newName
;
1168 // don't start more completions than files
1169 if (num_objects
== 0) {
1171 } else if (num_objects
< concurrentios
) {
1172 concurrentios
= num_objects
;
1175 r
= completions_init(concurrentios
);
1179 //set up initial removes
1180 for (int i
= 0; i
< concurrentios
; ++i
) {
1181 name
[i
] = generate_object_name(i
, prevPid
);
1184 //start initial removes
1185 for (int i
= 0; i
< concurrentios
; ++i
) {
1186 create_completion(i
, _aio_cb
, (void *)&lc
);
1187 r
= aio_remove(name
[i
], i
);
1188 if (r
< 0) { //naughty, doesn't clean up heap
1189 cerr
<< "r = " << r
<< std::endl
;
1198 //keep on adding new removes as old ones complete
1199 while (data
.started
< num_objects
) {
1201 int old_slot
= slot
;
1205 if (completion_is_done(slot
)) {
1210 if (slot
== concurrentios
) {
1213 } while (slot
!= old_slot
);
1220 newName
= generate_object_name(data
.started
, prevPid
);
1221 completion_wait(slot
);
1223 r
= completion_ret(slot
);
1224 if (r
!= 0 && r
!= -ENOENT
) { // file does not exist
1225 cerr
<< "remove got " << r
<< std::endl
;
1232 release_completion(slot
);
1234 //start new remove and check data if requested
1235 create_completion(slot
, _aio_cb
, (void *)&lc
);
1236 r
= aio_remove(newName
, slot
);
1244 name
[slot
] = newName
;
1247 //wait for final removes to complete
1248 while (data
.finished
< data
.started
) {
1249 slot
= data
.finished
% concurrentios
;
1250 completion_wait(slot
);
1252 r
= completion_ret(slot
);
1253 if (r
!= 0 && r
!= -ENOENT
) { // file does not exist
1254 cerr
<< "remove got " << r
<< std::endl
;
1260 release_completion(slot
);
1270 out(cout
) << "Removed " << data
.finished
<< " object" << (data
.finished
!= 1 ? "s" : "") << std::endl
;
1282 * Return objects from the datastore which match a prefix.
1284 * Clears the list and populates it with any objects which match the
1285 * prefix. The list is guaranteed to have at least one item when the
1286 * function returns true.
1288 * @param prefix the prefix to match against
1289 * @param objects [out] return list of objects
1290 * @returns true if there are any objects in the store which match
1291 * the prefix, false if there are no more
1293 bool ObjBencher::more_objects_matching_prefix(const std::string
& prefix
, std::list
<Object
>* objects
) {
1294 std::list
<Object
> unfiltered_objects
;
1298 while (objects
->empty()) {
1299 bool objects_remain
= get_objects(&unfiltered_objects
, 20);
1300 if (!objects_remain
)
1303 std::list
<Object
>::const_iterator i
= unfiltered_objects
.begin();
1304 for ( ; i
!= unfiltered_objects
.end(); ++i
) {
1305 if (i
->first
.substr(0, prefix
.length()) == prefix
) {
1306 objects
->push_back(*i
);
1314 int ObjBencher::clean_up_slow(const std::string
& prefix
, int concurrentios
) {
1315 lock_cond
lc(&lock
);
1317 if (concurrentios
<= 0)
1320 std::vector
<Object
> name(concurrentios
);
1325 std::list
<Object
> objects
;
1326 bool objects_remain
= true;
1335 out(cout
) << "Warning: using slow linear search" << std::endl
;
1337 r
= completions_init(concurrentios
);
1341 //set up initial removes
1342 for (int i
= 0; i
< concurrentios
; ++i
) {
1343 if (objects
.empty()) {
1344 // if there are fewer objects than concurrent ios, don't generate extras
1345 bool objects_found
= more_objects_matching_prefix(prefix
, &objects
);
1346 if (!objects_found
) {
1348 objects_remain
= false;
1353 name
[i
] = objects
.front();
1354 objects
.pop_front();
1357 //start initial removes
1358 for (int i
= 0; i
< concurrentios
; ++i
) {
1359 create_completion(i
, _aio_cb
, (void *)&lc
);
1360 set_namespace(name
[i
].second
);
1361 r
= aio_remove(name
[i
].first
, i
);
1362 if (r
< 0) { //naughty, doesn't clean up heap
1363 cerr
<< "r = " << r
<< std::endl
;
1372 //keep on adding new removes as old ones complete
1373 while (objects_remain
) {
1375 int old_slot
= slot
;
1379 if (completion_is_done(slot
)) {
1384 if (slot
== concurrentios
) {
1387 } while (slot
!= old_slot
);
1395 // get more objects if necessary
1396 if (objects
.empty()) {
1397 objects_remain
= more_objects_matching_prefix(prefix
, &objects
);
1398 // quit if there are no more
1399 if (!objects_remain
) {
1404 // get the next object
1405 newName
= objects
.front();
1406 objects
.pop_front();
1408 completion_wait(slot
);
1410 r
= completion_ret(slot
);
1411 if (r
!= 0 && r
!= -ENOENT
) { // file does not exist
1412 cerr
<< "remove got " << r
<< std::endl
;
1419 release_completion(slot
);
1421 //start new remove and check data if requested
1422 create_completion(slot
, _aio_cb
, (void *)&lc
);
1423 set_namespace(newName
.second
);
1424 r
= aio_remove(newName
.first
, slot
);
1432 name
[slot
] = newName
;
1435 //wait for final removes to complete
1436 while (data
.finished
< data
.started
) {
1437 slot
= data
.finished
% concurrentios
;
1438 completion_wait(slot
);
1440 r
= completion_ret(slot
);
1441 if (r
!= 0 && r
!= -ENOENT
) { // file does not exist
1442 cerr
<< "remove got " << r
<< std::endl
;
1448 release_completion(slot
);
1458 out(cout
) << "Removed " << data
.finished
<< " object" << (data
.finished
!= 1 ? "s" : "") << std::endl
;