1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 New Dream Network
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "os/ObjectStore.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "common/debug.h"
27 #include <boost/scoped_ptr.hpp>
28 #include <boost/lexical_cast.hpp>
29 #include "workload_generator.h"
30 #include "include/assert.h"
32 #include "TestObjectStoreState.h"
34 #define dout_context g_ceph_context
36 static const char *our_name
= NULL
;
39 boost::scoped_ptr
<WorkloadGenerator
> wrkldgen
;
41 #define dout_subsys ceph_subsys_
44 WorkloadGenerator::WorkloadGenerator(vector
<const char*> args
)
45 : TestObjectStoreState(NULL
),
46 m_max_in_flight(def_max_in_flight
),
48 m_destroy_coll_every_nr_runs(def_destroy_coll_every_nr_runs
),
49 m_num_colls(def_num_colls
),
50 m_write_data_bytes(0), m_write_xattr_obj_bytes(0),
51 m_write_xattr_coll_bytes(0), m_write_pglog_bytes(0),
52 m_suppress_write_data(false), m_suppress_write_xattr_obj(false),
53 m_suppress_write_xattr_coll(false), m_suppress_write_log(false),
55 m_stats_finished_txs(0),
56 m_stats_lock("WorldloadGenerator::m_stats_lock"),
58 m_stats_total_written(0),
66 dout(0) << "data = " << g_conf
->osd_data
<< dendl
;
67 dout(0) << "journal = " << g_conf
->osd_journal
<< dendl
;
68 dout(0) << "journal size = " << g_conf
->osd_journal_size
<< dendl
;
70 err
= ::mkdir(g_conf
->osd_data
.c_str(), 0755);
71 ceph_assert(err
== 0 || (err
< 0 && errno
== EEXIST
));
72 ObjectStore
*store_ptr
= ObjectStore::create(g_ceph_context
,
73 g_conf
->osd_objectstore
,
76 m_store
.reset(store_ptr
);
77 err
= m_store
->mkfs();
78 ceph_assert(err
== 0);
79 err
= m_store
->mount();
80 ceph_assert(err
== 0);
82 set_max_in_flight(m_max_in_flight
);
83 set_num_objs_per_coll(def_num_obj_per_coll
);
87 dout(0) << "#colls = " << m_num_colls
<< dendl
;
88 dout(0) << "#objs per coll = " << m_num_objs_per_coll
<< dendl
;
89 dout(0) << "#txs per destr = " << m_destroy_coll_every_nr_runs
<< dendl
;
93 size_t WorkloadGenerator::_parse_size_or_die(std::string
& val
)
99 if (val
.empty()) // this should never happen, but catch it anyway.
103 for (i
= 0; i
< val
.length(); i
++) {
104 if (!isdigit(val
[i
])) {
105 if (isalpha(val
[i
])) {
106 val
[i
] = tolower(val
[i
]);
109 case 'k': multiplier
= 10; break;
110 case 'm': multiplier
= 20; break;
111 case 'g': multiplier
= 30; break;
123 s
= strtoll(val
.c_str(), NULL
, 10) * (1 << multiplier
);
131 void WorkloadGenerator::_suppress_ops_or_die(std::string
& val
)
133 for (size_t i
= 0; i
< val
.length(); i
++) {
135 case 'c': m_suppress_write_xattr_coll
= true; break;
136 case 'o': m_suppress_write_xattr_obj
= true; break;
137 case 'l': m_suppress_write_log
= true; break;
138 case 'd': m_suppress_write_data
= true; break;
146 void WorkloadGenerator::init_args(vector
<const char*> args
)
148 for (std::vector
<const char*>::iterator i
= args
.begin(); i
!= args
.end();) {
151 if (ceph_argparse_double_dash(args
, i
)) {
153 } else if (ceph_argparse_witharg(args
, i
, &val
,
154 "--test-num-colls", (char*) NULL
)) {
155 m_num_colls
= strtoll(val
.c_str(), NULL
, 10);
156 } else if (ceph_argparse_witharg(args
, i
, &val
,
157 "--test-objs-per-coll", (char*) NULL
)) {
158 m_num_objs_per_coll
= strtoll(val
.c_str(), NULL
, 10);
159 } else if (ceph_argparse_witharg(args
, i
, &val
,
160 "--test-destroy-coll-per-N-trans", (char*) NULL
)) {
161 m_destroy_coll_every_nr_runs
= strtoll(val
.c_str(), NULL
, 10);
162 } else if (ceph_argparse_witharg(args
, i
, &val
,
163 "--test-num-ops", (char*) NULL
)) {
164 m_num_ops
= strtoll(val
.c_str(), NULL
, 10);
165 } else if (ceph_argparse_witharg(args
, i
, &val
,
166 "--test-max-in-flight", (char*) NULL
)) {
167 m_max_in_flight
= strtoll(val
.c_str(), NULL
, 10);
168 } else if (ceph_argparse_witharg(args
, i
, &val
,
169 "--test-write-data-size", (char*) NULL
)) {
170 m_write_data_bytes
= _parse_size_or_die(val
);
171 } else if (ceph_argparse_witharg(args
, i
, &val
,
172 "--test-write-xattr-obj-size", (char*) NULL
)) {
173 m_write_xattr_obj_bytes
= _parse_size_or_die(val
);
174 } else if (ceph_argparse_witharg(args
, i
, &val
,
175 "--test-write-xattr-coll-size", (char*) NULL
)) {
176 m_write_xattr_coll_bytes
= _parse_size_or_die(val
);
177 } else if (ceph_argparse_witharg(args
, i
, &val
,
178 "--test-write-pglog-size", (char*) NULL
)) {
179 m_write_pglog_bytes
= _parse_size_or_die(val
);
180 } else if (ceph_argparse_witharg(args
, i
, &val
,
181 "--test-suppress-ops", (char*) NULL
)) {
182 _suppress_ops_or_die(val
);
183 } else if (ceph_argparse_witharg(args
, i
, &val
,
184 "--test-show-stats-period", (char*) NULL
)) {
185 m_stats_show_secs
= strtoll(val
.c_str(), NULL
, 10);
186 } else if (ceph_argparse_flag(args
, i
, "--test-show-stats", (char*) NULL
)) {
188 } else if (ceph_argparse_flag(args
, i
, "--help", (char*) NULL
)) {
195 int WorkloadGenerator::get_uniform_random_value(int min
, int max
)
197 boost::uniform_int
<> value(min
, max
);
201 TestObjectStoreState::coll_entry_t
*WorkloadGenerator::get_rnd_coll_entry(bool erase
= false)
203 int index
= get_uniform_random_value(0, m_collections_ids
.size()-1);
204 coll_entry_t
*entry
= get_coll_at(index
, erase
);
208 hobject_t
*WorkloadGenerator::get_rnd_obj(coll_entry_t
*entry
)
210 assert(entry
!= NULL
);
213 (get_uniform_random_value(0,100) < 50 || !entry
->m_objects
.size());
215 if (create
&& ((int) entry
->m_objects
.size() < m_num_objs_per_coll
)) {
216 return (entry
->touch_obj(entry
->m_next_object_id
++));
219 int idx
= get_uniform_random_value(0, entry
->m_objects
.size()-1);
220 return entry
->get_obj_at(idx
);
224 * We'll generate a random amount of bytes, ranging from a single byte up to
227 size_t WorkloadGenerator::get_random_byte_amount(size_t min
, size_t max
)
229 size_t diff
= max
- min
;
230 return (size_t) (min
+ (rand() % diff
));
233 void WorkloadGenerator::get_filled_byte_array(bufferlist
& bl
, size_t size
)
235 static const char alphanum
[] = "0123456789"
236 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
237 "abcdefghijklmnopqrstuvwxyz";
241 for (unsigned int i
= 0; i
< size
- 1; i
++) {
242 bp
[i
] = alphanum
[rand() % sizeof(alphanum
)];
251 void WorkloadGenerator::do_write_object(ObjectStore::Transaction
*t
,
252 coll_t coll
, hobject_t obj
,
255 if (m_suppress_write_data
) {
256 dout(5) << __func__
<< " suppressed" << dendl
;
260 size_t size
= m_write_data_bytes
;
262 size
= get_random_byte_amount(min_write_bytes
, max_write_bytes
);
265 get_filled_byte_array(bl
, size
);
267 dout(2) << __func__
<< " " << coll
<< "/" << obj
268 << " size " << bl
.length() << dendl
;
270 if (m_do_stats
&& (stat
!= NULL
))
271 stat
->written_data
+= bl
.length();
273 t
->write(coll
, ghobject_t(obj
), 0, bl
.length(), bl
);
276 void WorkloadGenerator::do_setattr_object(ObjectStore::Transaction
*t
,
277 coll_t coll
, hobject_t obj
,
280 if (m_suppress_write_xattr_obj
) {
281 dout(5) << __func__
<< " suppressed" << dendl
;
285 size_t size
= m_write_xattr_obj_bytes
;
287 size
= get_random_byte_amount(min_xattr_obj_bytes
, max_xattr_obj_bytes
);
290 get_filled_byte_array(bl
, size
);
292 dout(2) << __func__
<< " " << coll
<< "/" << obj
<< " size " << size
<< dendl
;
294 if (m_do_stats
&& (stat
!= NULL
))
295 stat
->written_data
+= bl
.length();
297 t
->setattr(coll
, ghobject_t(obj
), "objxattr", bl
);
300 void WorkloadGenerator::do_pgmeta_omap_set(ObjectStore::Transaction
*t
, spg_t pgid
,
301 coll_t coll
, C_StatState
*stat
)
303 if (m_suppress_write_xattr_coll
) {
304 dout(5) << __func__
<< " suppressed" << dendl
;
308 size_t size
= m_write_xattr_coll_bytes
;
310 size
= get_random_byte_amount(min_xattr_coll_bytes
, max_xattr_coll_bytes
);
313 get_filled_byte_array(bl
, size
);
314 dout(2) << __func__
<< " coll " << coll
<< " size " << size
<< dendl
;
316 if (m_do_stats
&& (stat
!= NULL
))
317 stat
->written_data
+= bl
.length();
319 ghobject_t
pgmeta(pgid
.make_pgmeta_oid());
320 map
<string
,bufferlist
> values
;
321 values
["_"].claim(bl
);
322 t
->omap_setkeys(coll
, pgmeta
, values
);
326 void WorkloadGenerator::do_append_log(ObjectStore::Transaction
*t
,
327 coll_entry_t
*entry
, C_StatState
*stat
)
329 if (m_suppress_write_log
) {
330 dout(5) << __func__
<< " suppressed" << dendl
;
334 size_t size
= (m_write_pglog_bytes
? m_write_pglog_bytes
: log_append_bytes
);
337 get_filled_byte_array(bl
, size
);
338 ghobject_t log_obj
= entry
->m_meta_obj
;
340 dout(2) << __func__
<< " coll " << entry
->m_coll
<< " "
341 << coll_t::meta() << " /" << log_obj
<< " (" << bl
.length() << ")" << dendl
;
343 if (m_do_stats
&& (stat
!= NULL
))
344 stat
->written_data
+= bl
.length();
346 uint64_t s
= pg_log_size
[entry
->m_coll
];
347 t
->write(coll_t::meta(), log_obj
, s
, bl
.length(), bl
);
348 pg_log_size
[entry
->m_coll
] += bl
.length();
351 void WorkloadGenerator::do_destroy_collection(ObjectStore::Transaction
*t
,
356 entry
->m_osr
.flush();
357 vector
<ghobject_t
> ls
;
358 m_store
->collection_list(entry
->m_coll
, ghobject_t(), ghobject_t::get_max(),
360 dout(2) << __func__
<< " coll " << entry
->m_coll
361 << " (" << ls
.size() << " objects)" << dendl
;
363 for (vector
<ghobject_t
>::iterator it
= ls
.begin(); it
< ls
.end(); ++it
) {
364 t
->remove(entry
->m_coll
, *it
);
367 t
->remove_collection(entry
->m_coll
);
368 t
->remove(coll_t::meta(), entry
->m_meta_obj
);
371 TestObjectStoreState::coll_entry_t
372 *WorkloadGenerator::do_create_collection(ObjectStore::Transaction
*t
,
375 coll_entry_t
*entry
= coll_create(m_next_coll_nr
++);
377 dout(0) << __func__
<< " failed to create coll id "
378 << m_next_coll_nr
<< dendl
;
381 m_collections
.insert(make_pair(entry
->m_id
, entry
));
383 dout(2) << __func__
<< " id " << entry
->m_id
<< " coll " << entry
->m_coll
<< dendl
;
384 t
->create_collection(entry
->m_coll
, 32);
385 dout(2) << __func__
<< " meta " << coll_t::meta() << "/" << entry
->m_meta_obj
<< dendl
;
386 t
->touch(coll_t::meta(), entry
->m_meta_obj
);
390 void WorkloadGenerator::do_stats()
392 utime_t now
= ceph_clock_now();
395 utime_t duration
= (now
- m_stats_begin
);
397 // when cast to double, a utime_t behaves properly
398 double throughput
= (m_stats_total_written
/ ((double) duration
));
399 double tx_throughput (m_stats_finished_txs
/ ((double) duration
));
402 << " written: " << m_stats_total_written
403 << " duration: " << duration
<< " sec"
404 << " bandwidth: " << prettybyte_t(throughput
) << "/s"
405 << " iops: " << tx_throughput
<< "/s"
408 m_stats_lock
.Unlock();
411 void WorkloadGenerator::run()
413 bool create_coll
= false;
416 utime_t
stats_interval(m_stats_show_secs
, 0);
417 utime_t now
= ceph_clock_now();
418 utime_t stats_time
= now
;
422 C_StatState
*stat_state
= NULL
;
424 if (m_num_ops
&& (ops_run
== m_num_ops
))
427 if (!create_coll
&& !m_collections
.size()) {
428 dout(0) << "We ran out of collections!" << dendl
;
433 << " m_finished_lock is-locked: " << m_finished_lock
.is_locked()
434 << " in-flight: " << m_in_flight
.load()
439 ObjectStore::Transaction
*t
= new ObjectStore::Transaction
;
441 bool destroy_collection
= false;
442 TestObjectStoreState::coll_entry_t
*entry
= NULL
;
446 utime_t now
= ceph_clock_now();
447 utime_t elapsed
= now
- stats_time
;
448 if (elapsed
>= stats_interval
) {
452 stat_state
= new C_StatState(this, now
);
458 entry
= do_create_collection(t
, stat_state
);
460 dout(0) << __func__
<< " something went terribly wrong creating coll" << dendl
;
464 c
= new C_OnReadable(this);
468 destroy_collection
= should_destroy_collection();
469 entry
= get_rnd_coll_entry(destroy_collection
);
470 assert(entry
!= NULL
);
472 if (destroy_collection
) {
473 do_destroy_collection(t
, entry
, stat_state
);
474 c
= new C_OnDestroyed(this, entry
);
478 hobject_t
*obj
= get_rnd_obj(entry
);
480 do_write_object(t
, entry
->m_coll
, *obj
, stat_state
);
481 do_setattr_object(t
, entry
->m_coll
, *obj
, stat_state
);
482 do_pgmeta_omap_set(t
, entry
->m_pgid
, entry
->m_coll
, stat_state
);
483 do_append_log(t
, entry
, stat_state
);
485 c
= new C_OnReadable(this);
492 c
= new C_StatWrapper(stat_state
, tmp
);
495 m_store
->queue_transaction(&(entry
->m_osr
), std::move(*t
), c
);
504 dout(2) << __func__
<< " waiting for "
505 << m_in_flight
.load() << " in-flight transactions" << dendl
;
511 dout(0) << __func__
<< " finishing" << dendl
;
516 cout
<< "usage: " << our_name
<< "[options]" << std::endl
;
521 -c FILE Read configuration from FILE\n\
522 --osd-objectstore TYPE Set OSD ObjectStore type\n\
523 --osd-data PATH Set OSD Data path\n\
524 --osd-journal PATH Set OSD Journal path\n\
525 --osd-journal-size VAL Set Journal size\n\
526 --help This message\n\
528 Test-specific Options:\n\
529 --test-num-colls VAL Set the number of collections\n\
530 --test-num-objs-per-coll VAL Set the number of objects per collection\n\
531 --test-destroy-coll-per-N-trans VAL Set how many transactions to run before\n\
532 destroying a collection.\n\
533 --test-num-ops VAL Run a certain number of operations\n\
534 (a VAL of 0 runs the test forever)\n\
535 --test-max-in-flight VAL Maximum number of in-flight transactions\n\
537 --test-suppress-ops OPS Suppress ops specified in OPS\n\
538 --test-write-data-size SIZE Specify SIZE for all data writes\n\
539 --test-write-xattr-obj-size SIZE Specify SIZE for all xattrs on objects\n\
540 --test-write-xattr-coll-size SIZE Specify SIZE for all xattrs on colls\n\
541 --test-write-pglog-size SIZE Specify SIZE for all pglog writes\n\
542 --test-show-stats Show stats as we go\n\
543 --test-show-stats-period SECS Show stats every SECS (default: 5)\n\
545 SIZE is a numeric value that can be assumed as being bytes, or may be any\n\
546 other unit if specified: B or b, K or k, M or m, G or g.\n\
547 e.g., 1G = 1024M = 1048576k = 1073741824\n\
549 OPS can be one or more of the following options:\n\
550 c writes on collection's xattrs\n\
551 o writes on object's xattr\n\
553 d data writes on objects\n\
558 int main(int argc
, const char *argv
[])
560 vector
<const char*> def_args
;
561 vector
<const char*> args
;
565 def_args
.push_back("--osd-journal-size");
566 def_args
.push_back("400");
567 // def_args.push_back("--osd-data");
568 // def_args.push_back("workload_gen_dir");
569 // def_args.push_back("--osd-journal");
570 // def_args.push_back("workload_gen_dir/journal");
571 argv_to_vec(argc
, argv
, args
);
573 auto cct
= global_init(&def_args
, args
,
574 CEPH_ENTITY_TYPE_CLIENT
, CODE_ENVIRONMENT_UTILITY
,
575 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
576 common_init_finish(g_ceph_context
);
577 g_ceph_context
->_conf
->apply_changes(NULL
);
579 WorkloadGenerator
*wrkldgen_ptr
= new WorkloadGenerator(args
);
580 wrkldgen
.reset(wrkldgen_ptr
);