1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
18 #include "include/compat.h"
20 #include "common/ceph_mutex.h"
21 #include "common/Clock.h"
22 #include "obj_bencher.h"
28 using std::setprecision
;
31 using std::unique_lock
;
32 using std::unique_ptr
;
34 const std::string BENCH_LASTRUN_METADATA
= "benchmark_last_metadata";
35 const std::string BENCH_PREFIX
= "benchmark_data";
36 const std::string BENCH_OBJ_NAME
= BENCH_PREFIX
+ "_%s_%d_object%d";
38 static char cached_hostname
[30] = {0};
41 static std::string
generate_object_prefix_nopid() {
42 if (cached_hostname
[0] == 0) {
43 gethostname(cached_hostname
, sizeof(cached_hostname
)-1);
44 cached_hostname
[sizeof(cached_hostname
)-1] = 0;
47 std::ostringstream oss
;
48 oss
<< BENCH_PREFIX
<< "_" << cached_hostname
;
52 static std::string
generate_object_prefix(int pid
= 0) {
56 cached_pid
= getpid();
58 std::ostringstream oss
;
59 oss
<< generate_object_prefix_nopid() << "_" << cached_pid
;
63 // this is 8x faster than previous impl based on chained, deduped functions call
64 static std::string
generate_object_name_fast(int objnum
, int pid
= 0)
66 if (cached_hostname
[0] == 0) {
67 gethostname(cached_hostname
, sizeof(cached_hostname
)-1);
68 cached_hostname
[sizeof(cached_hostname
)-1] = 0;
74 cached_pid
= getpid();
77 int n
= snprintf(&name
[0], sizeof(name
), BENCH_OBJ_NAME
.c_str(), cached_hostname
, cached_pid
, objnum
);
78 ceph_assert(n
> 0 && n
< (int)sizeof(name
));
79 return std::string(&name
[0], (size_t)n
);
82 static void sanitize_object_contents (bench_data
*data
, size_t length
) {
83 // FIPS zeroization audit 20191115: this memset is not security related.
84 memset(data
->object_contents
, 'z', length
);
87 ostream
& ObjBencher::out(ostream
& os
, utime_t
& t
)
90 return t
.localtime(os
) << " ";
95 ostream
& ObjBencher::out(ostream
& os
)
97 utime_t cur_time
= ceph_clock_now();
98 return out(os
, cur_time
);
101 void *ObjBencher::status_printer(void *_bencher
) {
102 ObjBencher
*bencher
= static_cast<ObjBencher
*>(_bencher
);
103 bench_data
& data
= bencher
->data
;
104 Formatter
*formatter
= bencher
->formatter
;
105 ostream
*outstream
= bencher
->outstream
;
106 ceph::condition_variable cond
;
108 int previous_writes
= 0;
109 int cycleSinceChange
= 0;
112 mono_clock::duration ONE_SECOND
= std::chrono::seconds(1);
113 std::unique_lock locker
{bencher
->lock
};
115 formatter
->open_array_section("datas");
117 mono_time cur_time
= mono_clock::now();
118 utime_t t
= ceph_clock_now();
120 if (i
% 20 == 0 && !formatter
) {
123 << " min lat: " << data
.min_latency
124 << " max lat: " << data
.max_latency
125 << " avg lat: " << data
.avg_latency
<< std::endl
;
126 //I'm naughty and don't reset the fill
127 bencher
->out(cout
, t
) << setfill(' ')
129 << setw(8) << "Cur ops"
130 << setw(10) << "started"
131 << setw(10) << "finished"
132 << setw(10) << "avg MB/s"
133 << setw(10) << "cur MB/s"
134 << setw(12) << "last lat(s)"
135 << setw(12) << "avg lat(s)" << std::endl
;
137 if (cycleSinceChange
)
138 bandwidth
= (double)(data
.finished
- previous_writes
)
145 if (!std::isnan(bandwidth
) && bandwidth
> -1) {
146 if (bandwidth
> data
.idata
.max_bandwidth
)
147 data
.idata
.max_bandwidth
= bandwidth
;
148 if (bandwidth
< data
.idata
.min_bandwidth
)
149 data
.idata
.min_bandwidth
= bandwidth
;
151 ++data
.idata
.bandwidth_cycles
;
152 double delta
= bandwidth
- data
.idata
.avg_bandwidth
;
153 data
.idata
.avg_bandwidth
+= delta
/ data
.idata
.bandwidth_cycles
;
154 data
.idata
.bandwidth_diff_sum
+= delta
* (bandwidth
- data
.idata
.avg_bandwidth
);
157 if (cycleSinceChange
)
158 iops
= (double)(data
.finished
- previous_writes
)
163 if (!std::isnan(iops
) && iops
> -1) {
164 if (iops
> data
.idata
.max_iops
)
165 data
.idata
.max_iops
= iops
;
166 if (iops
< data
.idata
.min_iops
)
167 data
.idata
.min_iops
= iops
;
169 ++data
.idata
.iops_cycles
;
170 double delta
= iops
- data
.idata
.avg_iops
;
171 data
.idata
.avg_iops
+= delta
/ data
.idata
.iops_cycles
;
172 data
.idata
.iops_diff_sum
+= delta
* (iops
- data
.idata
.avg_iops
);
176 formatter
->open_object_section("data");
178 // elapsed will be in seconds, by default
179 std::chrono::duration
<double> elapsed
= cur_time
- data
.start_time
;
180 double avg_bandwidth
= (double) (data
.op_size
) * (data
.finished
)
181 / elapsed
.count() / (1024*1024);
182 if (previous_writes
!= data
.finished
) {
183 previous_writes
= data
.finished
;
184 cycleSinceChange
= 0;
186 bencher
->out(cout
, t
)
189 << ' ' << setw(7) << data
.in_flight
190 << ' ' << setw(9) << data
.started
191 << ' ' << setw(9) << data
.finished
192 << ' ' << setw(9) << avg_bandwidth
193 << ' ' << setw(9) << bandwidth
194 << ' ' << setw(11) << (double)data
.cur_latency
.count()
195 << ' ' << setw(11) << data
.avg_latency
<< std::endl
;
197 formatter
->dump_format("sec", "%d", i
);
198 formatter
->dump_format("cur_ops", "%d", data
.in_flight
);
199 formatter
->dump_format("started", "%d", data
.started
);
200 formatter
->dump_format("finished", "%d", data
.finished
);
201 formatter
->dump_format("avg_bw", "%f", avg_bandwidth
);
202 formatter
->dump_format("cur_bw", "%f", bandwidth
);
203 formatter
->dump_format("last_lat", "%f", (double)data
.cur_latency
.count());
204 formatter
->dump_format("avg_lat", "%f", data
.avg_latency
);
209 bencher
->out(cout
, t
)
212 << ' ' << setw(7) << data
.in_flight
213 << ' ' << setw(9) << data
.started
214 << ' ' << setw(9) << data
.finished
215 << ' ' << setw(9) << avg_bandwidth
216 << ' ' << setw(9) << '0'
217 << ' ' << setw(11) << '-'
218 << ' '<< setw(11) << data
.avg_latency
<< std::endl
;
220 formatter
->dump_format("sec", "%d", i
);
221 formatter
->dump_format("cur_ops", "%d", data
.in_flight
);
222 formatter
->dump_format("started", "%d", data
.started
);
223 formatter
->dump_format("finished", "%d", data
.finished
);
224 formatter
->dump_format("avg_bw", "%f", avg_bandwidth
);
225 formatter
->dump_format("cur_bw", "%f", 0);
226 formatter
->dump_format("last_lat", "%f", 0);
227 formatter
->dump_format("avg_lat", "%f", data
.avg_latency
);
231 formatter
->close_section(); // data
232 formatter
->flush(*outstream
);
236 cond
.wait_for(locker
, ONE_SECOND
);
239 formatter
->close_section(); //datas
241 std::chrono::duration
<double> runtime
= mono_clock::now() - data
.start_time
;
242 data
.idata
.min_iops
= data
.idata
.max_iops
= data
.finished
/ runtime
.count();
247 int ObjBencher::aio_bench(
248 int operation
, int secondsToRun
,
250 uint64_t op_size
, uint64_t object_size
,
251 unsigned max_objects
,
252 bool cleanup
, bool hints
,
253 const std::string
& run_name
, bool reuse_bench
, bool no_verify
) {
255 if (concurrentios
<= 0)
262 std::chrono::duration
<double> timePassed
;
264 // default metadata object is used if user does not specify one
265 const std::string run_name_meta
= (run_name
.empty() ? BENCH_LASTRUN_METADATA
: run_name
);
267 //get data from previous write run, if available
268 if (operation
!= OP_WRITE
|| reuse_bench
) {
269 uint64_t prev_op_size
, prev_object_size
;
270 r
= fetch_bench_metadata(run_name_meta
, &prev_op_size
, &prev_object_size
,
271 &num_ops
, &num_objects
, &prev_pid
);
275 cerr
<< "Must write data before using reuse_bench for a write benchmark!" << std::endl
;
277 cerr
<< "Must write data before running a read benchmark!" << std::endl
;
281 object_size
= prev_object_size
;
282 op_size
= prev_op_size
;
285 char* contentsChars
= new char[op_size
];
289 data
.object_size
= object_size
;
290 data
.op_size
= op_size
;
294 data
.min_latency
= 9999.0; // this better be higher than initial latency!
295 data
.max_latency
= 0;
296 data
.avg_latency
= 0;
297 data
.latency_diff_sum
= 0;
298 data
.object_contents
= contentsChars
;
301 //fill in contentsChars deterministically so we can check returns
302 sanitize_object_contents(&data
, data
.op_size
);
305 formatter
->open_object_section("bench");
307 if (OP_WRITE
== operation
) {
308 r
= write_bench(secondsToRun
, concurrentios
, run_name_meta
, max_objects
, prev_pid
);
309 if (r
!= 0) goto out
;
311 else if (OP_SEQ_READ
== operation
) {
312 r
= seq_read_bench(secondsToRun
, num_ops
, num_objects
, concurrentios
, prev_pid
, no_verify
);
313 if (r
!= 0) goto out
;
315 else if (OP_RAND_READ
== operation
) {
316 r
= rand_read_bench(secondsToRun
, num_ops
, num_objects
, concurrentios
, prev_pid
, no_verify
);
317 if (r
!= 0) goto out
;
320 if (OP_WRITE
== operation
&& cleanup
) {
321 r
= fetch_bench_metadata(run_name_meta
, &op_size
, &object_size
,
322 &num_ops
, &num_objects
, &prev_pid
);
325 cerr
<< "Should never happen: bench metadata missing for current run!" << std::endl
;
329 data
.start_time
= mono_clock::now();
330 out(cout
) << "Cleaning up (deleting benchmark objects)" << std::endl
;
332 r
= clean_up(num_objects
, prev_pid
, concurrentios
);
333 if (r
!= 0) goto out
;
335 timePassed
= mono_clock::now() - data
.start_time
;
336 out(cout
) << "Clean up completed and total clean up time :" << timePassed
.count() << std::endl
;
339 r
= sync_remove(run_name_meta
);
340 if (r
!= 0) goto out
;
345 formatter
->close_section(); // bench
346 formatter
->flush(*outstream
);
347 *outstream
<< std::endl
;
349 delete[] contentsChars
;
354 explicit lock_cond(ceph::mutex
*_lock
) : lock(_lock
) {}
356 ceph::condition_variable cond
;
359 void _aio_cb(void *cb
, void *arg
) {
360 struct lock_cond
*lc
= (struct lock_cond
*)arg
;
362 lc
->cond
.notify_all();
366 int ObjBencher::fetch_bench_metadata(const std::string
& metadata_file
,
367 uint64_t *op_size
, uint64_t* object_size
,
368 int* num_ops
, int* num_objects
, int* prevPid
) {
370 bufferlist object_data
;
372 r
= sync_read(metadata_file
, object_data
,
373 sizeof(int) * 2 + sizeof(size_t) * 2);
375 // treat an empty file as a file that does not exist
381 auto p
= object_data
.cbegin();
382 decode(*object_size
, p
);
388 *op_size
= *object_size
;
390 unsigned ops_per_object
= 1;
391 // make sure *op_size value is reasonable
392 if (*op_size
> 0 && *object_size
> *op_size
) {
393 ops_per_object
= *object_size
/ *op_size
;
395 *num_objects
= (*num_ops
+ ops_per_object
- 1) / ops_per_object
;
400 int ObjBencher::write_bench(int secondsToRun
,
401 int concurrentios
, const string
& run_name_meta
,
402 unsigned max_objects
, int prev_pid
) {
403 if (concurrentios
<= 0)
407 out(cout
) << "Maintaining " << concurrentios
<< " concurrent writes of "
408 << data
.op_size
<< " bytes to objects of size "
409 << data
.object_size
<< " for up to "
410 << secondsToRun
<< " seconds or "
411 << max_objects
<< " objects"
414 formatter
->dump_format("concurrent_ios", "%d", concurrentios
);
415 formatter
->dump_format("object_size", "%d", data
.object_size
);
416 formatter
->dump_format("op_size", "%d", data
.op_size
);
417 formatter
->dump_format("seconds_to_run", "%d", secondsToRun
);
418 formatter
->dump_format("max_objects", "%d", max_objects
);
420 bufferlist
* newContents
= 0;
422 std::string prefix
= prev_pid
? generate_object_prefix(prev_pid
) : generate_object_prefix();
424 out(cout
) << "Object prefix: " << prefix
<< std::endl
;
426 formatter
->dump_string("object_prefix", prefix
);
428 std::vector
<string
> name(concurrentios
);
430 unique_ptr
<bufferlist
> contents
[concurrentios
];
434 double total_latency
= 0;
435 std::vector
<mono_time
> start_times(concurrentios
);
437 std::chrono::duration
<double> timePassed
;
439 unsigned writes_per_object
= 1;
441 writes_per_object
= data
.object_size
/ data
.op_size
;
443 r
= completions_init(concurrentios
);
445 //set up writes so I can start them together
446 for (int i
= 0; i
<concurrentios
; ++i
) {
447 name
[i
] = generate_object_name_fast(i
/ writes_per_object
);
448 contents
[i
] = std::make_unique
<bufferlist
>();
449 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", i
);
450 contents
[i
]->append(data
.object_contents
, data
.op_size
);
453 pthread_t print_thread
;
455 pthread_create(&print_thread
, NULL
, ObjBencher::status_printer
, (void *)this);
456 ceph_pthread_setname(print_thread
, "write_stat");
457 std::unique_lock locker
{lock
};
459 data
.start_time
= mono_clock::now();
461 for (int i
= 0; i
<concurrentios
; ++i
) {
462 start_times
[i
] = mono_clock::now();
463 r
= create_completion(i
, _aio_cb
, (void *)&lc
);
466 r
= aio_write(name
[i
], i
, *contents
[i
], data
.op_size
,
467 data
.op_size
* (i
% writes_per_object
));
477 //keep on adding new writes as old ones complete until we've passed minimum time
480 //don't need locking for reads because other thread doesn't write
482 stopTime
= data
.start_time
+ std::chrono::seconds(secondsToRun
);
485 while (data
.finished
< data
.started
) {
490 if (completion_is_done(slot
)) {
495 if (slot
== concurrentios
) {
498 } while (slot
!= old_slot
);
501 lc
.cond
.wait(locker
);
505 completion_wait(slot
);
507 r
= completion_ret(slot
);
512 data
.cur_latency
= mono_clock::now() - start_times
[slot
];
513 total_latency
+= data
.cur_latency
.count();
514 if( data
.cur_latency
.count() > data
.max_latency
)
515 data
.max_latency
= data
.cur_latency
.count();
516 if (data
.cur_latency
.count() < data
.min_latency
)
517 data
.min_latency
= data
.cur_latency
.count();
519 double delta
= data
.cur_latency
.count() - data
.avg_latency
;
520 data
.avg_latency
= total_latency
/ data
.finished
;
521 data
.latency_diff_sum
+= delta
* (data
.cur_latency
.count() - data
.avg_latency
);
524 release_completion(slot
);
526 if (!secondsToRun
|| mono_clock::now() >= stopTime
) {
531 if (data
.op_size
&& max_objects
&&
533 (int)((data
.object_size
* max_objects
+ data
.op_size
- 1) /
539 //write new stuff to backend
541 //create new contents and name on the heap, and fill them
542 newName
= generate_object_name_fast(data
.started
/ writes_per_object
);
543 newContents
= contents
[slot
].get();
544 snprintf(newContents
->c_str(), data
.op_size
, "I'm the %16dth op!", data
.started
);
545 // we wrote to buffer, going around internal crc cache, so invalidate it now.
546 newContents
->invalidate_crc();
548 start_times
[slot
] = mono_clock::now();
549 r
= create_completion(slot
, _aio_cb
, &lc
);
552 r
= aio_write(newName
, slot
, *newContents
, data
.op_size
,
553 data
.op_size
* (data
.started
% writes_per_object
));
557 name
[slot
] = newName
;
564 timePassed
= mono_clock::now() - data
.start_time
;
569 pthread_join(print_thread
, NULL
);
572 bandwidth
= ((double)data
.finished
)*((double)data
.op_size
) /
574 bandwidth
= bandwidth
/(1024*1024); // we want it in MB/sec
576 double bandwidth_stddev
;
578 double latency_stddev
;
579 if (data
.idata
.bandwidth_cycles
> 1) {
580 bandwidth_stddev
= std::sqrt(data
.idata
.bandwidth_diff_sum
/ (data
.idata
.bandwidth_cycles
- 1));
582 bandwidth_stddev
= 0;
584 if (data
.idata
.iops_cycles
> 1) {
585 iops_stddev
= std::sqrt(data
.idata
.iops_diff_sum
/ (data
.idata
.iops_cycles
- 1));
589 if (data
.finished
> 1) {
590 latency_stddev
= std::sqrt(data
.latency_diff_sum
/ (data
.finished
- 1));
596 out(cout
) << "Total time run: " << timePassed
.count() << std::endl
597 << "Total writes made: " << data
.finished
<< std::endl
598 << "Write size: " << data
.op_size
<< std::endl
599 << "Object size: " << data
.object_size
<< std::endl
600 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth
<< std::endl
601 << "Stddev Bandwidth: " << bandwidth_stddev
<< std::endl
602 << "Max bandwidth (MB/sec): " << data
.idata
.max_bandwidth
<< std::endl
603 << "Min bandwidth (MB/sec): " << data
.idata
.min_bandwidth
<< std::endl
604 << "Average IOPS: " << (int)(data
.finished
/timePassed
.count()) << std::endl
605 << "Stddev IOPS: " << iops_stddev
<< std::endl
606 << "Max IOPS: " << data
.idata
.max_iops
<< std::endl
607 << "Min IOPS: " << data
.idata
.min_iops
<< std::endl
608 << "Average Latency(s): " << data
.avg_latency
<< std::endl
609 << "Stddev Latency(s): " << latency_stddev
<< std::endl
610 << "Max latency(s): " << data
.max_latency
<< std::endl
611 << "Min latency(s): " << data
.min_latency
<< std::endl
;
613 formatter
->dump_format("total_time_run", "%f", timePassed
.count());
614 formatter
->dump_format("total_writes_made", "%d", data
.finished
);
615 formatter
->dump_format("write_size", "%d", data
.op_size
);
616 formatter
->dump_format("object_size", "%d", data
.object_size
);
617 formatter
->dump_format("bandwidth", "%f", bandwidth
);
618 formatter
->dump_format("stddev_bandwidth", "%f", bandwidth_stddev
);
619 formatter
->dump_format("max_bandwidth", "%f", data
.idata
.max_bandwidth
);
620 formatter
->dump_format("min_bandwidth", "%f", data
.idata
.min_bandwidth
);
621 formatter
->dump_format("average_iops", "%d", (int)(data
.finished
/timePassed
.count()));
622 formatter
->dump_format("stddev_iops", "%d", iops_stddev
);
623 formatter
->dump_format("max_iops", "%d", data
.idata
.max_iops
);
624 formatter
->dump_format("min_iops", "%d", data
.idata
.min_iops
);
625 formatter
->dump_format("average_latency", "%f", data
.avg_latency
);
626 formatter
->dump_format("stddev_latency", "%f", latency_stddev
);
627 formatter
->dump_format("max_latency", "%f", data
.max_latency
);
628 formatter
->dump_format("min_latency", "%f", data
.min_latency
);
630 //write object size/number data for read benchmarks
631 encode(data
.object_size
, b_write
);
632 encode(data
.finished
, b_write
);
633 encode(prev_pid
? prev_pid
: getpid(), b_write
);
634 encode(data
.op_size
, b_write
);
636 // persist meta-data for further cleanup or read
637 sync_write(run_name_meta
, b_write
, sizeof(int)*3);
647 pthread_join(print_thread
, NULL
);
651 int ObjBencher::seq_read_bench(
652 int seconds_to_run
, int num_ops
, int num_objects
,
653 int concurrentios
, int pid
, bool no_verify
) {
657 if (concurrentios
<= 0)
660 std::vector
<string
> name(concurrentios
);
662 unique_ptr
<bufferlist
> contents
[concurrentios
];
663 int index
[concurrentios
];
665 double total_latency
= 0;
667 std::vector
<mono_time
> start_times(concurrentios
);
668 mono_clock::duration time_to_run
= std::chrono::seconds(seconds_to_run
);
669 std::chrono::duration
<double> timePassed
;
670 sanitize_object_contents(&data
, data
.op_size
); //clean it up once; subsequent
671 //changes will be safe because string length should remain the same
673 unsigned reads_per_object
= 1;
675 reads_per_object
= data
.object_size
/ data
.op_size
;
677 r
= completions_init(concurrentios
);
681 //set up initial reads
682 for (int i
= 0; i
< concurrentios
; ++i
) {
683 name
[i
] = generate_object_name_fast(i
/ reads_per_object
, pid
);
684 contents
[i
] = std::make_unique
<bufferlist
>();
687 std::unique_lock locker
{lock
};
689 data
.start_time
= mono_clock::now();
692 pthread_t print_thread
;
693 pthread_create(&print_thread
, NULL
, status_printer
, (void *)this);
694 ceph_pthread_setname(print_thread
, "seq_read_stat");
696 mono_time finish_time
= data
.start_time
+ time_to_run
;
697 //start initial reads
698 for (int i
= 0; i
< concurrentios
; ++i
) {
700 start_times
[i
] = mono_clock::now();
701 create_completion(i
, _aio_cb
, (void *)&lc
);
702 r
= aio_read(name
[i
], i
, contents
[i
].get(), data
.op_size
,
703 data
.op_size
* (i
% reads_per_object
));
705 cerr
<< "r = " << r
<< std::endl
;
714 //keep on adding new reads as old ones complete
716 bufferlist
*cur_contents
;
719 while (data
.finished
< data
.started
) {
725 if (completion_is_done(slot
)) {
730 if (slot
== concurrentios
) {
733 } while (slot
!= old_slot
);
737 lc
.cond
.wait(locker
);
740 // calculate latency here, so memcmp doesn't inflate it
741 data
.cur_latency
= mono_clock::now() - start_times
[slot
];
743 cur_contents
= contents
[slot
].get();
744 int current_index
= index
[slot
];
746 // invalidate internal crc cache
747 cur_contents
->invalidate_crc();
750 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", current_index
);
751 if ( (cur_contents
->length() != data
.op_size
) ||
752 (memcmp(data
.object_contents
, cur_contents
->c_str(), data
.op_size
) != 0) ) {
753 cerr
<< name
[slot
] << " is not correct!" << std::endl
;
758 bool start_new_read
= (seconds_to_run
&& mono_clock::now() < finish_time
) &&
759 num_ops
> data
.started
;
760 if (start_new_read
) {
761 newName
= generate_object_name_fast(data
.started
/ reads_per_object
, pid
);
762 index
[slot
] = data
.started
;
766 completion_wait(slot
);
768 r
= completion_ret(slot
);
770 cerr
<< "read got " << r
<< std::endl
;
774 total_latency
+= data
.cur_latency
.count();
775 if (data
.cur_latency
.count() > data
.max_latency
)
776 data
.max_latency
= data
.cur_latency
.count();
777 if (data
.cur_latency
.count() < data
.min_latency
)
778 data
.min_latency
= data
.cur_latency
.count();
780 data
.avg_latency
= total_latency
/ data
.finished
;
783 release_completion(slot
);
788 //start new read and check data if requested
789 start_times
[slot
] = mono_clock::now();
790 create_completion(slot
, _aio_cb
, (void *)&lc
);
791 r
= aio_read(newName
, slot
, contents
[slot
].get(), data
.op_size
,
792 data
.op_size
* (data
.started
% reads_per_object
));
800 name
[slot
] = newName
;
803 timePassed
= mono_clock::now() - data
.start_time
;
808 pthread_join(print_thread
, NULL
);
811 bandwidth
= ((double)data
.finished
)*((double)data
.op_size
)/timePassed
.count();
812 bandwidth
= bandwidth
/(1024*1024); // we want it in MB/sec
815 if (data
.idata
.iops_cycles
> 1) {
816 iops_stddev
= std::sqrt(data
.idata
.iops_diff_sum
/ (data
.idata
.iops_cycles
- 1));
822 out(cout
) << "Total time run: " << timePassed
.count() << std::endl
823 << "Total reads made: " << data
.finished
<< std::endl
824 << "Read size: " << data
.op_size
<< std::endl
825 << "Object size: " << data
.object_size
<< std::endl
826 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth
<< std::endl
827 << "Average IOPS: " << (int)(data
.finished
/timePassed
.count()) << std::endl
828 << "Stddev IOPS: " << iops_stddev
<< std::endl
829 << "Max IOPS: " << data
.idata
.max_iops
<< std::endl
830 << "Min IOPS: " << data
.idata
.min_iops
<< std::endl
831 << "Average Latency(s): " << data
.avg_latency
<< std::endl
832 << "Max latency(s): " << data
.max_latency
<< std::endl
833 << "Min latency(s): " << data
.min_latency
<< std::endl
;
835 formatter
->dump_format("total_time_run", "%f", timePassed
.count());
836 formatter
->dump_format("total_reads_made", "%d", data
.finished
);
837 formatter
->dump_format("read_size", "%d", data
.op_size
);
838 formatter
->dump_format("object_size", "%d", data
.object_size
);
839 formatter
->dump_format("bandwidth", "%f", bandwidth
);
840 formatter
->dump_format("average_iops", "%d", (int)(data
.finished
/timePassed
.count()));
841 formatter
->dump_format("stddev_iops", "%f", iops_stddev
);
842 formatter
->dump_format("max_iops", "%d", data
.idata
.max_iops
);
843 formatter
->dump_format("min_iops", "%d", data
.idata
.min_iops
);
844 formatter
->dump_format("average_latency", "%f", data
.avg_latency
);
845 formatter
->dump_format("max_latency", "%f", data
.max_latency
);
846 formatter
->dump_format("min_latency", "%f", data
.min_latency
);
851 return (errors
> 0 ? -EIO
: 0);
857 pthread_join(print_thread
, NULL
);
861 int ObjBencher::rand_read_bench(
862 int seconds_to_run
, int num_ops
, int num_objects
,
863 int concurrentios
, int pid
, bool no_verify
) {
867 if (concurrentios
<= 0)
870 std::vector
<string
> name(concurrentios
);
872 unique_ptr
<bufferlist
> contents
[concurrentios
];
873 int index
[concurrentios
];
876 double total_latency
= 0;
877 std::vector
<mono_time
> start_times(concurrentios
);
878 mono_clock::duration time_to_run
= std::chrono::seconds(seconds_to_run
);
879 std::chrono::duration
<double> timePassed
;
880 sanitize_object_contents(&data
, data
.op_size
); //clean it up once; subsequent
881 //changes will be safe because string length should remain the same
883 unsigned reads_per_object
= 1;
885 reads_per_object
= data
.object_size
/ data
.op_size
;
889 r
= completions_init(concurrentios
);
893 //set up initial reads
894 for (int i
= 0; i
< concurrentios
; ++i
) {
895 name
[i
] = generate_object_name_fast(i
/ reads_per_object
, pid
);
896 contents
[i
] = std::make_unique
<bufferlist
>();
899 unique_lock locker
{lock
};
901 data
.start_time
= mono_clock::now();
904 pthread_t print_thread
;
905 pthread_create(&print_thread
, NULL
, status_printer
, (void *)this);
906 ceph_pthread_setname(print_thread
, "rand_read_stat");
908 mono_time finish_time
= data
.start_time
+ time_to_run
;
909 //start initial reads
910 for (int i
= 0; i
< concurrentios
; ++i
) {
912 start_times
[i
] = mono_clock::now();
913 create_completion(i
, _aio_cb
, (void *)&lc
);
914 r
= aio_read(name
[i
], i
, contents
[i
].get(), data
.op_size
,
915 data
.op_size
* (i
% reads_per_object
));
917 cerr
<< "r = " << r
<< std::endl
;
926 //keep on adding new reads as old ones complete
928 bufferlist
*cur_contents
;
932 while (data
.finished
< data
.started
) {
938 if (completion_is_done(slot
)) {
943 if (slot
== concurrentios
) {
946 } while (slot
!= old_slot
);
950 lc
.cond
.wait(locker
);
953 // calculate latency here, so memcmp doesn't inflate it
954 data
.cur_latency
= mono_clock::now() - start_times
[slot
];
958 int current_index
= index
[slot
];
959 cur_contents
= contents
[slot
].get();
960 completion_wait(slot
);
962 r
= completion_ret(slot
);
964 cerr
<< "read got " << r
<< std::endl
;
969 total_latency
+= data
.cur_latency
.count();
970 if (data
.cur_latency
.count() > data
.max_latency
)
971 data
.max_latency
= data
.cur_latency
.count();
972 if (data
.cur_latency
.count() < data
.min_latency
)
973 data
.min_latency
= data
.cur_latency
.count();
975 data
.avg_latency
= total_latency
/ data
.finished
;
979 snprintf(data
.object_contents
, data
.op_size
, "I'm the %16dth op!", current_index
);
980 if ((cur_contents
->length() != data
.op_size
) ||
981 (memcmp(data
.object_contents
, cur_contents
->c_str(), data
.op_size
) != 0)) {
982 cerr
<< name
[slot
] << " is not correct!" << std::endl
;
988 release_completion(slot
);
990 if (!seconds_to_run
|| mono_clock::now() >= finish_time
)
993 //start new read and check data if requested
995 rand_id
= rand() % num_ops
;
996 newName
= generate_object_name_fast(rand_id
/ reads_per_object
, pid
);
997 index
[slot
] = rand_id
;
999 // invalidate internal crc cache
1000 cur_contents
->invalidate_crc();
1002 start_times
[slot
] = mono_clock::now();
1003 create_completion(slot
, _aio_cb
, (void *)&lc
);
1004 r
= aio_read(newName
, slot
, contents
[slot
].get(), data
.op_size
,
1005 data
.op_size
* (rand_id
% reads_per_object
));
1013 name
[slot
] = newName
;
1016 timePassed
= mono_clock::now() - data
.start_time
;
1021 pthread_join(print_thread
, NULL
);
1024 bandwidth
= ((double)data
.finished
)*((double)data
.op_size
)/timePassed
.count();
1025 bandwidth
= bandwidth
/(1024*1024); // we want it in MB/sec
1028 if (data
.idata
.iops_cycles
> 1) {
1029 iops_stddev
= std::sqrt(data
.idata
.iops_diff_sum
/ (data
.idata
.iops_cycles
- 1));
1035 out(cout
) << "Total time run: " << timePassed
.count() << std::endl
1036 << "Total reads made: " << data
.finished
<< std::endl
1037 << "Read size: " << data
.op_size
<< std::endl
1038 << "Object size: " << data
.object_size
<< std::endl
1039 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth
<< std::endl
1040 << "Average IOPS: " << (int)(data
.finished
/timePassed
.count()) << std::endl
1041 << "Stddev IOPS: " << iops_stddev
<< std::endl
1042 << "Max IOPS: " << data
.idata
.max_iops
<< std::endl
1043 << "Min IOPS: " << data
.idata
.min_iops
<< std::endl
1044 << "Average Latency(s): " << data
.avg_latency
<< std::endl
1045 << "Max latency(s): " << data
.max_latency
<< std::endl
1046 << "Min latency(s): " << data
.min_latency
<< std::endl
;
1048 formatter
->dump_format("total_time_run", "%f", timePassed
.count());
1049 formatter
->dump_format("total_reads_made", "%d", data
.finished
);
1050 formatter
->dump_format("read_size", "%d", data
.op_size
);
1051 formatter
->dump_format("object_size", "%d", data
.object_size
);
1052 formatter
->dump_format("bandwidth", "%f", bandwidth
);
1053 formatter
->dump_format("average_iops", "%d", (int)(data
.finished
/timePassed
.count()));
1054 formatter
->dump_format("stddev_iops", "%f", iops_stddev
);
1055 formatter
->dump_format("max_iops", "%d", data
.idata
.max_iops
);
1056 formatter
->dump_format("min_iops", "%d", data
.idata
.min_iops
);
1057 formatter
->dump_format("average_latency", "%f", data
.avg_latency
);
1058 formatter
->dump_format("max_latency", "%f", data
.max_latency
);
1059 formatter
->dump_format("min_latency", "%f", data
.min_latency
);
1063 return (errors
> 0 ? -EIO
: 0);
1069 pthread_join(print_thread
, NULL
);
1073 int ObjBencher::clean_up(const std::string
& orig_prefix
, int concurrentios
, const std::string
& run_name
) {
1075 uint64_t op_size
, object_size
;
1076 int num_ops
, num_objects
;
1079 // default meta object if user does not specify one
1080 const std::string run_name_meta
= (run_name
.empty() ? BENCH_LASTRUN_METADATA
: run_name
);
1081 const std::string prefix
= (orig_prefix
.empty() ? generate_object_prefix_nopid() : orig_prefix
);
1083 if (prefix
.substr(0, BENCH_PREFIX
.length()) != BENCH_PREFIX
) {
1084 cerr
<< "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX
<< "\"" << std::endl
;
1088 std::list
<Object
> unfiltered_objects
;
1089 std::set
<std::string
> meta_namespaces
, all_namespaces
;
1091 // If caller set all_nspaces this will be searching
1092 // across multiple namespaces.
1094 bool objects_remain
= get_objects(&unfiltered_objects
, 20);
1095 if (!objects_remain
)
1098 std::list
<Object
>::const_iterator i
= unfiltered_objects
.begin();
1099 for ( ; i
!= unfiltered_objects
.end(); ++i
) {
1100 if (i
->first
== run_name_meta
) {
1101 meta_namespaces
.insert(i
->second
);
1103 if (i
->first
.substr(0, prefix
.length()) == prefix
) {
1104 all_namespaces
.insert(i
->second
);
1109 std::set
<std::string
>::const_iterator i
= all_namespaces
.begin();
1110 for ( ; i
!= all_namespaces
.end(); ++i
) {
1113 // if no metadata file found we should try to do a linear search on the prefix
1114 if (meta_namespaces
.find(*i
) == meta_namespaces
.end()) {
1115 int r
= clean_up_slow(prefix
, concurrentios
);
1117 cerr
<< "clean_up_slow error r= " << r
<< std::endl
;
1123 r
= fetch_bench_metadata(run_name_meta
, &op_size
, &object_size
, &num_ops
, &num_objects
, &prevPid
);
1128 r
= clean_up(num_objects
, prevPid
, concurrentios
);
1129 if (r
!= 0) return r
;
1131 r
= sync_remove(run_name_meta
);
1132 if (r
!= 0) return r
;
1138 int ObjBencher::clean_up(int num_objects
, int prevPid
, int concurrentios
) {
1139 lock_cond
lc(&lock
);
1141 if (concurrentios
<= 0)
1144 std::vector
<string
> name(concurrentios
);
1145 std::string newName
;
1149 unique_lock locker
{lock
};
1156 // don't start more completions than files
1157 if (num_objects
== 0) {
1159 } else if (num_objects
< concurrentios
) {
1160 concurrentios
= num_objects
;
1163 r
= completions_init(concurrentios
);
1167 //set up initial removes
1168 for (int i
= 0; i
< concurrentios
; ++i
) {
1169 name
[i
] = generate_object_name_fast(i
, prevPid
);
1172 //start initial removes
1173 for (int i
= 0; i
< concurrentios
; ++i
) {
1174 create_completion(i
, _aio_cb
, (void *)&lc
);
1175 r
= aio_remove(name
[i
], i
);
1176 if (r
< 0) { //naughty, doesn't clean up heap
1177 cerr
<< "r = " << r
<< std::endl
;
1186 //keep on adding new removes as old ones complete
1187 while (data
.finished
< data
.started
) {
1189 int old_slot
= slot
;
1193 if (completion_is_done(slot
)) {
1198 if (slot
== concurrentios
) {
1201 } while (slot
!= old_slot
);
1205 lc
.cond
.wait(locker
);
1208 completion_wait(slot
);
1210 r
= completion_ret(slot
);
1211 if (r
!= 0 && r
!= -ENOENT
) { // file does not exist
1212 cerr
<< "remove got " << r
<< std::endl
;
1219 release_completion(slot
);
1221 if (data
.started
>= num_objects
)
1224 //start new remove and check data if requested
1225 newName
= generate_object_name_fast(data
.started
, prevPid
);
1226 create_completion(slot
, _aio_cb
, (void *)&lc
);
1227 r
= aio_remove(newName
, slot
);
1235 name
[slot
] = newName
;
1244 out(cout
) << "Removed " << data
.finished
<< " object" << (data
.finished
!= 1 ? "s" : "") << std::endl
;
1256 * Return objects from the datastore which match a prefix.
1258 * Clears the list and populates it with any objects which match the
1259 * prefix. The list is guaranteed to have at least one item when the
1260 * function returns true.
1262 * @param prefix the prefix to match against
1263 * @param objects [out] return list of objects
1264 * @returns true if there are any objects in the store which match
1265 * the prefix, false if there are no more
1267 bool ObjBencher::more_objects_matching_prefix(const std::string
& prefix
, std::list
<Object
>* objects
) {
1268 std::list
<Object
> unfiltered_objects
;
1272 while (objects
->empty()) {
1273 bool objects_remain
= get_objects(&unfiltered_objects
, 20);
1274 if (!objects_remain
)
1277 std::list
<Object
>::const_iterator i
= unfiltered_objects
.begin();
1278 for ( ; i
!= unfiltered_objects
.end(); ++i
) {
1279 if (i
->first
.substr(0, prefix
.length()) == prefix
) {
1280 objects
->push_back(*i
);
1288 int ObjBencher::clean_up_slow(const std::string
& prefix
, int concurrentios
) {
1289 lock_cond
lc(&lock
);
1291 if (concurrentios
<= 0)
1294 std::vector
<Object
> name(concurrentios
);
1298 std::list
<Object
> objects
;
1299 bool objects_remain
= true;
1301 std::unique_lock locker
{lock
};
1308 out(cout
) << "Warning: using slow linear search" << std::endl
;
1310 r
= completions_init(concurrentios
);
1314 //set up initial removes
1315 for (int i
= 0; i
< concurrentios
; ++i
) {
1316 if (objects
.empty()) {
1317 // if there are fewer objects than concurrent ios, don't generate extras
1318 bool objects_found
= more_objects_matching_prefix(prefix
, &objects
);
1319 if (!objects_found
) {
1321 objects_remain
= false;
1326 name
[i
] = objects
.front();
1327 objects
.pop_front();
1330 //start initial removes
1331 for (int i
= 0; i
< concurrentios
; ++i
) {
1332 create_completion(i
, _aio_cb
, (void *)&lc
);
1333 set_namespace(name
[i
].second
);
1334 r
= aio_remove(name
[i
].first
, i
);
1335 if (r
< 0) { //naughty, doesn't clean up heap
1336 cerr
<< "r = " << r
<< std::endl
;
1345 //keep on adding new removes as old ones complete
1346 while (objects_remain
) {
1348 int old_slot
= slot
;
1352 if (completion_is_done(slot
)) {
1357 if (slot
== concurrentios
) {
1360 } while (slot
!= old_slot
);
1364 lc
.cond
.wait(locker
);
1368 // get more objects if necessary
1369 if (objects
.empty()) {
1370 objects_remain
= more_objects_matching_prefix(prefix
, &objects
);
1371 // quit if there are no more
1372 if (!objects_remain
) {
1377 // get the next object
1378 newName
= objects
.front();
1379 objects
.pop_front();
1381 completion_wait(slot
);
1383 r
= completion_ret(slot
);
1384 if (r
!= 0 && r
!= -ENOENT
) { // file does not exist
1385 cerr
<< "remove got " << r
<< std::endl
;
1392 release_completion(slot
);
1394 //start new remove and check data if requested
1395 create_completion(slot
, _aio_cb
, (void *)&lc
);
1396 set_namespace(newName
.second
);
1397 r
= aio_remove(newName
.first
, slot
);
1405 name
[slot
] = newName
;
1408 //wait for final removes to complete
1409 while (data
.finished
< data
.started
) {
1410 slot
= data
.finished
% concurrentios
;
1411 completion_wait(slot
);
1413 r
= completion_ret(slot
);
1414 if (r
!= 0 && r
!= -ENOENT
) { // file does not exist
1415 cerr
<< "remove got " << r
<< std::endl
;
1421 release_completion(slot
);
1431 out(cout
) << "Removed " << data
.finished
<< " object" << (data
.finished
!= 1 ? "s" : "") << std::endl
;