1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <condition_variable>
11 #include "os/ObjectStore.h"
13 #include "global/global_init.h"
15 #include "common/strtol.h"
16 #include "common/ceph_argparse.h"
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_filestore
25 cout
<< "usage: ceph_objectstore_bench [flags]\n"
27 " total size in bytes\n"
29 " block size in bytes for each write\n"
31 " number of times to repeat the write cycle\n"
33 " number of threads to carry out this workload\n"
35 " have each thread write to a separate object\n" << std::endl
;
36 generic_server_usage();
39 // helper class for bytes with units
42 // cppcheck-suppress noExplicitConstructor
43 byte_units(size_t v
) : v(v
) {}
45 bool parse(const std::string
&val
, std::string
*err
);
47 operator size_t() const { return v
; }
50 bool byte_units::parse(const std::string
&val
, std::string
*err
)
52 v
= strict_iecstrtoll(val
, err
);
56 std::ostream
& operator<<(std::ostream
&out
, const byte_units
&amount
)
58 static const char* units
[] = { "B", "KB", "MB", "GB", "TB", "PB", "EB" };
59 static const int max_units
= sizeof(units
)/sizeof(*units
);
63 while (v
>= 1024 && unit
< max_units
) {
64 // preserve significant bytes
65 if (v
< 1048576 && (v
% 1024 != 0))
70 return out
<< v
<< ' ' << units
[unit
];
75 byte_units block_size
;
80 : size(1048576), block_size(4096),
81 repeats(1), threads(1),
82 multi_object(false) {}
85 class C_NotifyCond
: public Context
{
87 std::condition_variable
*cond
;
90 C_NotifyCond(std::mutex
*mutex
, std::condition_variable
*cond
, bool *done
)
91 : mutex(mutex
), cond(cond
), done(done
) {}
92 void finish(int r
) override
{
93 std::lock_guard
<std::mutex
> lock(*mutex
);
99 void osbench_worker(ObjectStore
*os
, const Config
&cfg
,
100 const coll_t cid
, const ghobject_t oid
,
101 uint64_t starting_offset
)
104 data
.append(buffer::create(cfg
.block_size
));
106 dout(0) << "Writing " << cfg
.size
107 << " in blocks of " << cfg
.block_size
<< dendl
;
109 ceph_assert(starting_offset
< cfg
.size
);
110 ceph_assert(starting_offset
% cfg
.block_size
== 0);
112 ObjectStore::CollectionHandle ch
= os
->open_collection(cid
);
115 for (int i
= 0; i
< cfg
.repeats
; ++i
) {
116 uint64_t offset
= starting_offset
;
117 size_t len
= cfg
.size
;
119 vector
<ObjectStore::Transaction
> tls
;
121 std::cout
<< "Write cycle " << i
<< std::endl
;
123 size_t count
= len
< cfg
.block_size
? len
: (size_t)cfg
.block_size
;
125 auto t
= new ObjectStore::Transaction
;
126 t
->write(cid
, oid
, offset
, count
, data
);
127 tls
.push_back(std::move(*t
));
131 if (offset
> cfg
.size
)
136 // set up the finisher
138 std::condition_variable cond
;
141 tls
.back().register_on_commit(new C_NotifyCond(&mutex
, &cond
, &done
));
142 os
->queue_transactions(ch
, tls
);
144 std::unique_lock
<std::mutex
> lock(mutex
);
145 cond
.wait(lock
, [&done
](){ return done
; });
150 int main(int argc
, const char *argv
[])
152 // command-line arguments
153 auto args
= argv_to_vec(argc
, argv
);
156 cerr
<< argv
[0] << ": -h or --help for usage" << std::endl
;
159 if (ceph_argparse_need_usage(args
)) {
164 auto cct
= global_init(nullptr, args
, CEPH_ENTITY_TYPE_OSD
,
165 CODE_ENVIRONMENT_UTILITY
,
166 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
170 vector
<const char*>::iterator i
= args
.begin();
171 while (i
!= args
.end()) {
172 if (ceph_argparse_double_dash(args
, i
))
175 if (ceph_argparse_witharg(args
, i
, &val
, "--size", (char*)nullptr)) {
177 if (!cfg
.size
.parse(val
, &err
)) {
178 derr
<< "error parsing size: " << err
<< dendl
;
181 } else if (ceph_argparse_witharg(args
, i
, &val
, "--block-size", (char*)nullptr)) {
183 if (!cfg
.block_size
.parse(val
, &err
)) {
184 derr
<< "error parsing block-size: " << err
<< dendl
;
187 } else if (ceph_argparse_witharg(args
, i
, &val
, "--repeats", (char*)nullptr)) {
188 cfg
.repeats
= atoi(val
.c_str());
189 } else if (ceph_argparse_witharg(args
, i
, &val
, "--threads", (char*)nullptr)) {
190 cfg
.threads
= atoi(val
.c_str());
191 } else if (ceph_argparse_flag(args
, i
, "--multi-object", (char*)nullptr)) {
192 cfg
.multi_object
= true;
194 derr
<< "Error: can't understand argument: " << *i
<< "\n" << dendl
;
199 common_init_finish(g_ceph_context
);
201 // create object store
202 dout(0) << "objectstore " << g_conf()->osd_objectstore
<< dendl
;
203 dout(0) << "data " << g_conf()->osd_data
<< dendl
;
204 dout(0) << "journal " << g_conf()->osd_journal
<< dendl
;
205 dout(0) << "size " << cfg
.size
<< dendl
;
206 dout(0) << "block-size " << cfg
.block_size
<< dendl
;
207 dout(0) << "repeats " << cfg
.repeats
<< dendl
;
208 dout(0) << "threads " << cfg
.threads
<< dendl
;
211 ObjectStore::create(g_ceph_context
,
212 g_conf()->osd_objectstore
,
214 g_conf()->osd_journal
);
216 //Checking data folder: create if needed or error if it's not empty
217 DIR *dir
= ::opendir(g_conf()->osd_data
.c_str());
219 std::string
cmd("mkdir -p ");
220 cmd
+=g_conf()->osd_data
;
221 int r
= ::system( cmd
.c_str() );
223 derr
<< "Failed to create data directory, ret = " << r
<< dendl
;
228 bool non_empty
= readdir(dir
) != NULL
&& readdir(dir
) != NULL
&& readdir(dir
) != NULL
;
230 derr
<< "Data directory '"<<g_conf()->osd_data
<<"' isn't empty, please clean it first."<< dendl
;
236 //Create folders for journal if needed
237 string journal_base
= g_conf()->osd_journal
.substr(0, g_conf()->osd_journal
.rfind('/'));
239 if (stat(journal_base
.c_str(), &sb
) != 0 ){
240 std::string
cmd("mkdir -p ");
242 int r
= ::system( cmd
.c_str() );
244 derr
<< "Failed to create journal directory, ret = " << r
<< dendl
;
250 derr
<< "bad objectstore type " << g_conf()->osd_objectstore
<< dendl
;
253 if (os
->mkfs() < 0) {
254 derr
<< "mkfs failed" << dendl
;
257 if (os
->mount() < 0) {
258 derr
<< "mount failed" << dendl
;
262 dout(10) << "created objectstore " << os
.get() << dendl
;
264 // create a collection
266 const coll_t
cid(pg
);
267 ObjectStore::CollectionHandle ch
= os
->create_new_collection(cid
);
269 ObjectStore::Transaction t
;
270 t
.create_collection(cid
, 0);
271 os
->queue_transaction(ch
, std::move(t
));
274 // create the objects
275 std::vector
<ghobject_t
> oids
;
276 if (cfg
.multi_object
) {
277 oids
.reserve(cfg
.threads
);
278 for (int i
= 0; i
< cfg
.threads
; i
++) {
279 std::stringstream oss
;
280 oss
<< "osbench-thread-" << i
;
281 oids
.emplace_back(hobject_t(sobject_t(oss
.str(), CEPH_NOSNAP
)));
283 ObjectStore::Transaction t
;
284 t
.touch(cid
, oids
[i
]);
285 int r
= os
->queue_transaction(ch
, std::move(t
));
289 oids
.emplace_back(hobject_t(sobject_t("osbench", CEPH_NOSNAP
)));
291 ObjectStore::Transaction t
;
292 t
.touch(cid
, oids
.back());
293 int r
= os
->queue_transaction(ch
, std::move(t
));
297 // run the worker threads
298 std::vector
<std::thread
> workers
;
299 workers
.reserve(cfg
.threads
);
301 using namespace std::chrono
;
302 auto t1
= high_resolution_clock::now();
303 for (int i
= 0; i
< cfg
.threads
; i
++) {
304 const auto &oid
= cfg
.multi_object
? oids
[i
] : oids
[0];
305 workers
.emplace_back(osbench_worker
, os
.get(), std::ref(cfg
),
306 cid
, oid
, i
* cfg
.size
/ cfg
.threads
);
308 for (auto &worker
: workers
)
310 auto t2
= high_resolution_clock::now();
313 auto duration
= duration_cast
<microseconds
>(t2
- t1
);
314 byte_units total
= cfg
.size
* cfg
.repeats
* cfg
.threads
;
315 byte_units rate
= (1000000LL * total
) / duration
.count();
316 size_t iops
= (1000000LL * total
/ cfg
.block_size
) / duration
.count();
317 dout(0) << "Wrote " << total
<< " in "
318 << duration
.count() << "us, at a rate of " << rate
<< "/s and "
319 << iops
<< " iops" << dendl
;
321 // remove the objects
322 ObjectStore::Transaction t
;
323 for (const auto &oid
: oids
)
325 os
->queue_transaction(ch
, std::move(t
));