2 * Generate latency statistics for a configurable number of write
3 * operations of configurable size.
5 * Created on: May 21, 2012
6 * Author: Eleanor Cawthon
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "include/rados/librados.hpp"
15 #include "include/Context.h"
16 #include "common/ceph_context.h"
17 #include "common/ceph_mutex.h"
18 #include "common/Cond.h"
19 #include "include/utime.h"
20 #include "common/ceph_argparse.h"
21 #include "test/omap_bench.h"
30 using ceph::bufferlist
;
32 int OmapBench::setup(int argc
, const char** argv
) {
33 //parse key_value_store_bench args
34 vector
<const char*> args
;
35 argv_to_vec(argc
,argv
,args
);
36 for (unsigned i
= 0; i
< args
.size(); i
++) {
37 if(i
< args
.size() - 1) {
38 if (strcmp(args
[i
], "-t") == 0) {
39 threads
= atoi(args
[i
+1]);
40 } else if (strcmp(args
[i
], "-o") == 0) {
41 objects
= atoi(args
[i
+1]);
42 } else if (strcmp(args
[i
], "--entries") == 0) {
43 entries_per_omap
= atoi(args
[i
+1]);
44 } else if (strcmp(args
[i
], "--keysize") == 0) {
45 key_size
= atoi(args
[i
+1]);
46 } else if (strcmp(args
[i
], "--valsize") == 0) {
47 value_size
= atoi(args
[i
+1]);
48 } else if (strcmp(args
[i
], "--inc") == 0) {
49 increment
= atoi(args
[i
+1]);
50 } else if (strcmp(args
[i
], "--omaptype") == 0) {
51 if(strcmp("rand",args
[i
+1]) == 0) {
52 omap_generator
= OmapBench::generate_non_uniform_omap
;
54 else if (strcmp("uniform", args
[i
+1]) == 0) {
55 omap_generator
= OmapBench::generate_uniform_omap
;
57 } else if (strcmp(args
[i
], "--name") == 0) {
60 } else if (strcmp(args
[i
], "--help") == 0) {
61 cout
<< "\nUsage: ostorebench [options]\n"
62 << "Generate latency statistics for a configurable number of "
63 << "key value pair operations of\n"
64 << "configurable size.\n\n"
66 << " -t number of threads to use (default "<<threads
;
68 << " -o number of objects to write (default "<<objects
;
70 << " --entries number of entries per (default "
73 << " --keysize number of characters per key "
74 << "(default "<<key_size
;
76 << " --valsize number of characters per value "
77 << "(default "<<value_size
;
79 << " --inc specify the increment to use in the displayed "
80 << "histogram (default "<<increment
;
82 << " --omaptype specify how omaps should be generated - "
83 << "rand for random sizes between\n"
84 << " 0 and max size, uniform for all sizes"
85 << " to be specified size.\n"
86 << " (default uniform)\n";
87 cout
<< " --name the rados id to use (default "<< rados_id
92 int r
= rados
.init(rados_id
.c_str());
94 cout
<< "error during init" << std::endl
;
97 r
= rados
.conf_parse_argv(argc
, argv
);
99 cout
<< "error during parsing args" << std::endl
;
102 r
= rados
.conf_parse_env(NULL
);
104 cout
<< "error during parsing env" << std::endl
;
107 r
= rados
.conf_read_file(NULL
);
109 cout
<< "error during read file" << std::endl
;
114 cout
<< "error during connect" << std::endl
;
117 r
= rados
.ioctx_create(pool_name
.c_str(), io_ctx
);
119 cout
<< "error creating io ctx" << std::endl
;
127 Writer::Writer(OmapBench
*omap_bench
) : ob(omap_bench
) {
129 ob
->data_lock
.lock();
130 name
<< omap_bench
->prefix
<< ++(ob
->data
.started_ops
);
131 ob
->data_lock
.unlock();
134 void Writer::start_time() {
135 begin_time
= ceph_clock_now();
137 void Writer::stop_time() {
138 end_time
= ceph_clock_now();
140 double Writer::get_time() {
141 return (end_time
- begin_time
) * 1000;
143 string
Writer::get_oid() {
146 std::map
<std::string
, bufferlist
> & Writer::get_omap() {
150 //AioWriter functions
151 AioWriter::AioWriter(OmapBench
*ob
) : Writer(ob
) {
154 AioWriter::~AioWriter() {
155 if(aioc
) aioc
->release();
157 librados::AioCompletion
* AioWriter::get_aioc() {
160 void AioWriter::set_aioc(librados::callback_t complete
) {
161 aioc
= ob
->rados
.aio_create_completion(this, complete
);
166 void OmapBench::aio_is_complete(rados_completion_t c
, void *arg
) {
167 AioWriter
*aiow
= reinterpret_cast<AioWriter
*>(arg
);
169 ceph::mutex
* data_lock
= &aiow
->ob
->data_lock
;
170 ceph::mutex
* thread_is_free_lock
= &aiow
->ob
->thread_is_free_lock
;
171 ceph::condition_variable
* thread_is_free
= &aiow
->ob
->thread_is_free
;
172 int &busythreads_count
= aiow
->ob
->busythreads_count
;
173 o_bench_data
&data
= aiow
->ob
->data
;
174 int INCREMENT
= aiow
->ob
->increment
;
175 int err
= aiow
->get_aioc()->get_return_value();
177 cout
<< "error writing AioCompletion";
180 double time
= aiow
->get_time();
183 data
.avg_latency
= (data
.avg_latency
* data
.completed_ops
+ time
)
184 / (data
.completed_ops
+ 1);
185 data
.completed_ops
++;
186 if (time
< data
.min_latency
) {
187 data
.min_latency
= time
;
189 if (time
> data
.max_latency
) {
190 data
.max_latency
= time
;
192 data
.total_latency
+= time
;
193 ++(data
.freq_map
[time
/ INCREMENT
]);
194 if(data
.freq_map
[time
/INCREMENT
] > data
.mode
.second
) {
195 data
.mode
.first
= time
/INCREMENT
;
196 data
.mode
.second
= data
.freq_map
[time
/INCREMENT
];
200 thread_is_free_lock
->lock();
202 thread_is_free
->notify_all();
203 thread_is_free_lock
->unlock();
206 string
OmapBench::random_string(int len
) {
208 string alphanum
= "0123456789"
209 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
210 "abcdefghijklmnopqrstuvwxyz";
212 for (int i
= 0; i
< len
; ++i
) {
213 ret
.push_back(alphanum
[rand() % (alphanum
.size() - 1)]);
219 int OmapBench::run() {
220 return (((OmapBench
*)this)->*OmapBench::test
)(omap_generator
);
223 int OmapBench::print_written_omap() {
224 for (int i
= 1; i
<= objects
; i
++) {
226 librados::ObjectReadOperation key_read
;
227 set
<string
> out_keys
;
228 map
<string
, bufferlist
> out_vals
;
229 std::stringstream objstrm
;
232 cout
<< "\nPrinting omap for "<<objstrm
.str() << std::endl
;
233 // FIXME: we ignore pmore here. this shouldn't happen for benchmark
234 // keys, though, unless the OSD limit is *really* low.
235 key_read
.omap_get_keys2("", LONG_MAX
, &out_keys
, nullptr, &err
);
236 io_ctx
.operate(objstrm
.str(), &key_read
, NULL
);
238 cout
<< "error " << err
;
239 cout
<< " getting omap key set " << std::endl
;
243 librados::ObjectReadOperation val_read
;
244 val_read
.omap_get_vals_by_keys(out_keys
, &out_vals
, &err
);
246 cout
<< "error " << err
;
247 cout
<< " getting omap value set " << std::endl
;
250 io_ctx
.operate(objstrm
.str(), &val_read
, NULL
);
252 for (set
<string
>::iterator iter
= out_keys
.begin();
253 iter
!= out_keys
.end(); ++iter
) {
254 cout
<< *iter
<< "\t" << (out_vals
)[*iter
] << std::endl
;
260 void OmapBench::print_results() {
261 cout
<< "========================================================";
262 cout
<< "\nNumber of kvmaps written:\t" << objects
;
263 cout
<< "\nNumber of ops at once:\t" << threads
;
264 cout
<< "\nEntries per kvmap:\t\t" << entries_per_omap
;
265 cout
<< "\nCharacters per key:\t" << key_size
;
266 cout
<< "\nCharacters per val:\t" << value_size
;
269 cout
<< "Average latency:\t" << data
.avg_latency
;
270 cout
<< "ms\nMinimum latency:\t" << data
.min_latency
;
271 cout
<< "ms\nMaximum latency:\t" << data
.max_latency
;
272 cout
<< "ms\nMode latency:\t\t"<<"between "<<data
.mode
.first
* increment
;
273 cout
<< " and " <<data
.mode
.first
* increment
+ increment
;
274 cout
<< "ms\nTotal latency:\t\t" << data
.total_latency
;
275 cout
<< "ms"<<std::endl
;
277 cout
<< "Histogram:" << std::endl
;
278 for(int i
= floor(data
.min_latency
/ increment
); i
<
279 ceil(data
.max_latency
/ increment
); i
++) {
280 cout
<< ">= "<< i
* increment
;
283 if (i
== 0) spaces
= 4;
284 else spaces
= 3 - floor(log10(i
));
285 for (int j
= 0; j
< spaces
; j
++) {
289 for(int j
= 0; j
< ((data
.freq_map
)[i
])*45/(data
.mode
.second
); j
++) {
294 cout
<< "\n========================================================"
298 int OmapBench::write_omap_asynchronously(AioWriter
*aiow
,
299 const std::map
<std::string
,bufferlist
> &omap
) {
300 librados::ObjectWriteOperation owo
;
305 int err
= io_ctx
.aio_operate(aiow
->get_oid(), aiow
->get_aioc(), &owo
);
307 cout
<< "writing omap failed with code "<<err
;
315 int OmapBench::generate_uniform_omap(const int omap_entries
, const int key_size
,
316 const int value_size
, std::map
<std::string
,bufferlist
> * out_omap
) {
320 for (int i
= 0; i
< omap_entries
; i
++) {
322 omap_val
.append(random_string(value_size
));
323 string key
= random_string(key_size
);
324 (*out_omap
)[key
]= omap_val
;
329 int OmapBench::generate_non_uniform_omap(const int omap_entries
,
330 const int key_size
, const int value_size
,
331 std::map
<std::string
,bufferlist
> * out_omap
) {
334 int num_entries
= rand() % omap_entries
+ 1;
335 int key_len
= rand() % key_size
+1;
336 int val_len
= rand() % value_size
+1;
339 for (int i
= 0; i
< num_entries
; i
++) {
341 omap_val
.append(random_string(val_len
));
342 string key
= random_string(key_len
);
343 (*out_omap
)[key
] = omap_val
;
348 int OmapBench::generate_small_non_random_omap(const int omap_entries
,
349 const int key_size
, const int value_size
,
350 std::map
<std::string
,bufferlist
> * out_omap
) {
355 for (int i
= 0; i
< omap_entries
; i
++) {
357 omap_val
.append("Value ");
360 (*out_omap
)[key
.str()]= omap_val
;
366 int OmapBench::test_write_objects_in_parallel(omap_generator_t omap_gen
) {
367 AioWriter
*this_aio_writer
;
369 std::unique_lock l
{thread_is_free_lock
};
370 for (int i
= 0; i
< objects
; i
++) {
371 ceph_assert(busythreads_count
<= threads
);
372 //wait for a writer to be free
373 if (busythreads_count
== threads
) {
374 thread_is_free
.wait(l
);
375 ceph_assert(busythreads_count
< threads
);
379 this_aio_writer
= new AioWriter(this);
380 this_aio_writer
->set_aioc(comp
);
384 int err
= omap_gen(entries_per_omap
, key_size
, value_size
,
385 & this_aio_writer
->get_omap());
389 err
= OmapBench::write_omap_asynchronously(this_aio_writer
,
390 (this_aio_writer
->get_omap()));
397 thread_is_free
.wait(l
, [this] { return busythreads_count
<= 0;});
402 * runs the specified test with the specified parameters and generates
403 * a histogram of latencies
405 int main(int argc
, const char** argv
) {
407 int err
= ob
.setup(argc
, argv
);
409 cout
<< "error during setup: "<<err
;
415 cout
<< "writing objects failed with code " << err
;
422 //uncomment to show omaps
423 /*err = ob.return print_written_omap();
425 cout << "printing omaps failed with code " << err;