]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <chrono> | |
5 | #include <cassert> | |
6 | #include <condition_variable> | |
7 | #include <memory> | |
8 | #include <mutex> | |
9 | #include <thread> | |
10 | ||
11 | #include "os/ObjectStore.h" | |
12 | ||
13 | #include "global/global_init.h" | |
14 | ||
15 | #include "common/strtol.h" | |
16 | #include "common/ceph_argparse.h" | |
17 | ||
18 | #define dout_context g_ceph_context | |
19 | #define dout_subsys ceph_subsys_filestore | |
20 | ||
21 | static void usage() | |
22 | { | |
23 | derr << "usage: ceph_objectstore_bench [flags]\n" | |
24 | " --size\n" | |
25 | " total size in bytes\n" | |
26 | " --block-size\n" | |
27 | " block size in bytes for each write\n" | |
28 | " --repeats\n" | |
29 | " number of times to repeat the write cycle\n" | |
30 | " --threads\n" | |
31 | " number of threads to carry out this workload\n" | |
32 | " --multi-object\n" | |
33 | " have each thread write to a separate object\n" << dendl; | |
34 | generic_server_usage(); | |
35 | } | |
36 | ||
37 | // helper class for bytes with units | |
38 | struct byte_units { | |
39 | size_t v; | |
40 | // cppcheck-suppress noExplicitConstructor | |
41 | byte_units(size_t v) : v(v) {} | |
42 | ||
43 | bool parse(const std::string &val, std::string *err); | |
44 | ||
45 | operator size_t() const { return v; } | |
46 | }; | |
47 | ||
48 | bool byte_units::parse(const std::string &val, std::string *err) | |
49 | { | |
1adf2230 | 50 | v = strict_iecstrtoll(val.c_str(), err); |
7c673cae FG |
51 | return err->empty(); |
52 | } | |
53 | ||
54 | std::ostream& operator<<(std::ostream &out, const byte_units &amount) | |
55 | { | |
56 | static const char* units[] = { "B", "KB", "MB", "GB", "TB", "PB", "EB" }; | |
57 | static const int max_units = sizeof(units)/sizeof(*units); | |
58 | ||
59 | int unit = 0; | |
60 | auto v = amount.v; | |
61 | while (v >= 1024 && unit < max_units) { | |
62 | // preserve significant bytes | |
63 | if (v < 1048576 && (v % 1024 != 0)) | |
64 | break; | |
65 | v >>= 10; | |
66 | unit++; | |
67 | } | |
68 | return out << v << ' ' << units[unit]; | |
69 | } | |
70 | ||
71 | struct Config { | |
72 | byte_units size; | |
73 | byte_units block_size; | |
74 | int repeats; | |
75 | int threads; | |
76 | bool multi_object; | |
77 | Config() | |
78 | : size(1048576), block_size(4096), | |
79 | repeats(1), threads(1), | |
80 | multi_object(false) {} | |
81 | }; | |
82 | ||
83 | class C_NotifyCond : public Context { | |
84 | std::mutex *mutex; | |
85 | std::condition_variable *cond; | |
86 | bool *done; | |
87 | public: | |
88 | C_NotifyCond(std::mutex *mutex, std::condition_variable *cond, bool *done) | |
89 | : mutex(mutex), cond(cond), done(done) {} | |
90 | void finish(int r) override { | |
91 | std::lock_guard<std::mutex> lock(*mutex); | |
92 | *done = true; | |
93 | cond->notify_one(); | |
94 | } | |
95 | }; | |
96 | ||
97 | void osbench_worker(ObjectStore *os, const Config &cfg, | |
98 | const coll_t cid, const ghobject_t oid, | |
99 | uint64_t starting_offset) | |
100 | { | |
101 | bufferlist data; | |
102 | data.append(buffer::create(cfg.block_size)); | |
103 | ||
104 | dout(0) << "Writing " << cfg.size | |
105 | << " in blocks of " << cfg.block_size << dendl; | |
106 | ||
107 | assert(starting_offset < cfg.size); | |
108 | assert(starting_offset % cfg.block_size == 0); | |
109 | ||
110 | ObjectStore::Sequencer sequencer("osbench"); | |
111 | ||
112 | for (int i = 0; i < cfg.repeats; ++i) { | |
113 | uint64_t offset = starting_offset; | |
114 | size_t len = cfg.size; | |
115 | ||
116 | vector<ObjectStore::Transaction> tls; | |
117 | ||
118 | std::cout << "Write cycle " << i << std::endl; | |
119 | while (len) { | |
120 | size_t count = len < cfg.block_size ? len : (size_t)cfg.block_size; | |
121 | ||
122 | auto t = new ObjectStore::Transaction; | |
123 | t->write(cid, oid, offset, count, data); | |
124 | tls.push_back(std::move(*t)); | |
125 | delete t; | |
126 | ||
127 | offset += count; | |
128 | if (offset > cfg.size) | |
129 | offset -= cfg.size; | |
130 | len -= count; | |
131 | } | |
132 | ||
133 | // set up the finisher | |
134 | std::mutex mutex; | |
135 | std::condition_variable cond; | |
136 | bool done = false; | |
137 | ||
138 | os->queue_transactions(&sequencer, tls, nullptr, | |
139 | new C_NotifyCond(&mutex, &cond, &done)); | |
140 | ||
141 | std::unique_lock<std::mutex> lock(mutex); | |
142 | cond.wait(lock, [&done](){ return done; }); | |
143 | lock.unlock(); | |
144 | ||
145 | ||
146 | } | |
147 | sequencer.flush(); | |
148 | } | |
149 | ||
150 | int main(int argc, const char *argv[]) | |
151 | { | |
152 | Config cfg; | |
153 | ||
154 | // command-line arguments | |
155 | vector<const char*> args; | |
156 | argv_to_vec(argc, argv, args); | |
157 | env_to_vec(args); | |
158 | ||
159 | auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, | |
160 | CODE_ENVIRONMENT_UTILITY, 0); | |
161 | ||
162 | std::string val; | |
163 | vector<const char*>::iterator i = args.begin(); | |
164 | while (i != args.end()) { | |
165 | if (ceph_argparse_double_dash(args, i)) | |
166 | break; | |
167 | ||
168 | if (ceph_argparse_witharg(args, i, &val, "--size", (char*)nullptr)) { | |
169 | std::string err; | |
170 | if (!cfg.size.parse(val, &err)) { | |
171 | derr << "error parsing size: " << err << dendl; | |
172 | usage(); | |
173 | } | |
174 | } else if (ceph_argparse_witharg(args, i, &val, "--block-size", (char*)nullptr)) { | |
175 | std::string err; | |
176 | if (!cfg.block_size.parse(val, &err)) { | |
177 | derr << "error parsing block-size: " << err << dendl; | |
178 | usage(); | |
179 | } | |
180 | } else if (ceph_argparse_witharg(args, i, &val, "--repeats", (char*)nullptr)) { | |
181 | cfg.repeats = atoi(val.c_str()); | |
182 | } else if (ceph_argparse_witharg(args, i, &val, "--threads", (char*)nullptr)) { | |
183 | cfg.threads = atoi(val.c_str()); | |
184 | } else if (ceph_argparse_flag(args, i, "--multi-object", (char*)nullptr)) { | |
185 | cfg.multi_object = true; | |
186 | } else { | |
187 | derr << "Error: can't understand argument: " << *i << "\n" << dendl; | |
188 | usage(); | |
189 | } | |
190 | } | |
191 | ||
192 | common_init_finish(g_ceph_context); | |
193 | ||
194 | // create object store | |
195 | dout(0) << "objectstore " << g_conf->osd_objectstore << dendl; | |
196 | dout(0) << "data " << g_conf->osd_data << dendl; | |
197 | dout(0) << "journal " << g_conf->osd_journal << dendl; | |
198 | dout(0) << "size " << cfg.size << dendl; | |
199 | dout(0) << "block-size " << cfg.block_size << dendl; | |
200 | dout(0) << "repeats " << cfg.repeats << dendl; | |
201 | dout(0) << "threads " << cfg.threads << dendl; | |
202 | ||
203 | auto os = std::unique_ptr<ObjectStore>( | |
204 | ObjectStore::create(g_ceph_context, | |
205 | g_conf->osd_objectstore, | |
206 | g_conf->osd_data, | |
207 | g_conf->osd_journal)); | |
208 | ||
209 | //Checking data folder: create if needed or error if it's not empty | |
210 | DIR *dir = ::opendir(g_conf->osd_data.c_str()); | |
211 | if (!dir) { | |
212 | std::string cmd("mkdir -p "); | |
213 | cmd+=g_conf->osd_data; | |
214 | int r = ::system( cmd.c_str() ); | |
215 | if( r<0 ){ | |
216 | derr << "Failed to create data directory, ret = " << r << dendl; | |
217 | return 1; | |
218 | } | |
219 | } | |
220 | else { | |
221 | bool non_empty = readdir(dir) != NULL && readdir(dir) != NULL && readdir(dir) != NULL; | |
222 | if( non_empty ){ | |
223 | derr << "Data directory '"<<g_conf->osd_data<<"' isn't empty, please clean it first."<< dendl; | |
224 | return 1; | |
225 | } | |
226 | } | |
227 | ::closedir(dir); | |
228 | ||
229 | //Create folders for journal if needed | |
230 | string journal_base = g_conf->osd_journal.substr(0, g_conf->osd_journal.rfind('/')); | |
231 | struct stat sb; | |
232 | if (stat(journal_base.c_str(), &sb) != 0 ){ | |
233 | std::string cmd("mkdir -p "); | |
234 | cmd+=journal_base; | |
235 | int r = ::system( cmd.c_str() ); | |
236 | if( r<0 ){ | |
237 | derr << "Failed to create journal directory, ret = " << r << dendl; | |
238 | return 1; | |
239 | } | |
240 | } | |
241 | ||
242 | if (!os) { | |
243 | derr << "bad objectstore type " << g_conf->osd_objectstore << dendl; | |
244 | return 1; | |
245 | } | |
246 | if (os->mkfs() < 0) { | |
247 | derr << "mkfs failed" << dendl; | |
248 | return 1; | |
249 | } | |
250 | if (os->mount() < 0) { | |
251 | derr << "mount failed" << dendl; | |
252 | return 1; | |
253 | } | |
254 | ||
255 | dout(10) << "created objectstore " << os.get() << dendl; | |
256 | ||
257 | // create a collection | |
258 | spg_t pg; | |
259 | const coll_t cid(pg); | |
260 | { | |
261 | ObjectStore::Sequencer osr(__func__); | |
262 | ObjectStore::Transaction t; | |
263 | t.create_collection(cid, 0); | |
264 | os->apply_transaction(&osr, std::move(t)); | |
265 | } | |
266 | ||
267 | // create the objects | |
268 | std::vector<ghobject_t> oids; | |
269 | if (cfg.multi_object) { | |
270 | oids.reserve(cfg.threads); | |
271 | for (int i = 0; i < cfg.threads; i++) { | |
272 | std::stringstream oss; | |
273 | oss << "osbench-thread-" << i; | |
274 | oids.emplace_back(hobject_t(sobject_t(oss.str(), CEPH_NOSNAP))); | |
275 | ||
276 | ObjectStore::Sequencer osr(__func__); | |
277 | ObjectStore::Transaction t; | |
278 | t.touch(cid, oids[i]); | |
279 | int r = os->apply_transaction(&osr, std::move(t)); | |
280 | assert(r == 0); | |
281 | } | |
282 | } else { | |
283 | oids.emplace_back(hobject_t(sobject_t("osbench", CEPH_NOSNAP))); | |
284 | ||
285 | ObjectStore::Sequencer osr(__func__); | |
286 | ObjectStore::Transaction t; | |
287 | t.touch(cid, oids.back()); | |
288 | int r = os->apply_transaction(&osr, std::move(t)); | |
289 | assert(r == 0); | |
290 | } | |
291 | ||
292 | // run the worker threads | |
293 | std::vector<std::thread> workers; | |
294 | workers.reserve(cfg.threads); | |
295 | ||
296 | using namespace std::chrono; | |
297 | auto t1 = high_resolution_clock::now(); | |
298 | for (int i = 0; i < cfg.threads; i++) { | |
299 | const auto &oid = cfg.multi_object ? oids[i] : oids[0]; | |
300 | workers.emplace_back(osbench_worker, os.get(), std::ref(cfg), | |
301 | cid, oid, i * cfg.size / cfg.threads); | |
302 | } | |
303 | for (auto &worker : workers) | |
304 | worker.join(); | |
305 | auto t2 = high_resolution_clock::now(); | |
306 | workers.clear(); | |
307 | ||
308 | auto duration = duration_cast<microseconds>(t2 - t1); | |
309 | byte_units total = cfg.size * cfg.repeats * cfg.threads; | |
310 | byte_units rate = (1000000LL * total) / duration.count(); | |
311 | size_t iops = (1000000LL * total / cfg.block_size) / duration.count(); | |
312 | dout(0) << "Wrote " << total << " in " | |
313 | << duration.count() << "us, at a rate of " << rate << "/s and " | |
314 | << iops << " iops" << dendl; | |
315 | ||
316 | // remove the objects | |
317 | ObjectStore::Sequencer osr(__func__); | |
318 | ObjectStore::Transaction t; | |
319 | for (const auto &oid : oids) | |
320 | t.remove(cid, oid); | |
321 | os->apply_transaction(&osr,std::move(t)); | |
322 | ||
323 | os->umount(); | |
324 | return 0; | |
325 | } |