]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph ObjectStore engine | |
5 | * | |
6 | * IO engine using Ceph's ObjectStore class to test low-level performance of | |
7 | * Ceph OSDs. | |
8 | * | |
9 | */ | |
10 | ||
11 | #include <memory> | |
12 | #include <system_error> | |
13 | #include <vector> | |
9f95a23c | 14 | #include <fstream> |
7c673cae FG |
15 | |
16 | #include "os/ObjectStore.h" | |
17 | #include "global/global_init.h" | |
18 | #include "common/errno.h" | |
19 | #include "include/intarith.h" | |
20 | #include "include/stringify.h" | |
11fdf7f2 | 21 | #include "include/random.h" |
9f95a23c | 22 | #include "include/str_list.h" |
224ce89b | 23 | #include "common/perf_counters.h" |
9f95a23c | 24 | #include "common/TracepointProvider.h" |
7c673cae FG |
25 | |
26 | #include <fio.h> | |
27 | #include <optgroup.h> | |
28 | ||
11fdf7f2 TL |
29 | #include "include/ceph_assert.h" // fio.h clobbers our assert.h |
30 | #include <algorithm> | |
7c673cae FG |
31 | |
32 | #define dout_context g_ceph_context | |
33 | #define dout_subsys ceph_subsys_ | |
34 | ||
20effc67 TL |
35 | using namespace std; |
36 | ||
7c673cae FG |
37 | namespace { |
38 | ||
39 | /// fio configuration options read from the job file | |
40 | struct Options { | |
41 | thread_data* td; | |
42 | char* conf; | |
9f95a23c TL |
43 | char* perf_output_file; |
44 | char* throttle_values; | |
45 | char* deferred_throttle_values; | |
11fdf7f2 | 46 | unsigned long long |
9f95a23c | 47 | cycle_throttle_period, |
11fdf7f2 TL |
48 | oi_attr_len_low, |
49 | oi_attr_len_high, | |
50 | snapset_attr_len_low, | |
51 | snapset_attr_len_high, | |
52 | pglog_omap_len_low, | |
53 | pglog_omap_len_high, | |
54 | pglog_dup_omap_len_low, | |
55 | pglog_dup_omap_len_high, | |
56 | _fastinfo_omap_len_low, | |
57 | _fastinfo_omap_len_high; | |
58 | unsigned simulate_pglog; | |
59 | unsigned single_pool_mode; | |
60 | unsigned preallocate_files; | |
9f95a23c | 61 | unsigned check_files; |
7c673cae FG |
62 | }; |
63 | ||
64 | template <class Func> // void Func(fio_option&) | |
65 | fio_option make_option(Func&& func) | |
66 | { | |
67 | // zero-initialize and set common defaults | |
68 | auto o = fio_option{}; | |
69 | o.category = FIO_OPT_C_ENGINE; | |
70 | o.group = FIO_OPT_G_RBD; | |
71 | func(std::ref(o)); | |
72 | return o; | |
73 | } | |
74 | ||
75 | static std::vector<fio_option> ceph_options{ | |
76 | make_option([] (fio_option& o) { | |
77 | o.name = "conf"; | |
78 | o.lname = "ceph configuration file"; | |
79 | o.type = FIO_OPT_STR_STORE; | |
80 | o.help = "Path to a ceph configuration file"; | |
81 | o.off1 = offsetof(Options, conf); | |
82 | }), | |
9f95a23c TL |
83 | make_option([] (fio_option& o) { |
84 | o.name = "perf_output_file"; | |
85 | o.lname = "perf output target"; | |
86 | o.type = FIO_OPT_STR_STORE; | |
87 | o.help = "Path to which to write json formatted perf output"; | |
88 | o.off1 = offsetof(Options, perf_output_file); | |
89 | o.def = 0; | |
90 | }), | |
11fdf7f2 TL |
91 | make_option([] (fio_option& o) { |
92 | o.name = "oi_attr_len"; | |
93 | o.lname = "OI Attr length"; | |
94 | o.type = FIO_OPT_STR_VAL; | |
95 | o.help = "Set OI(aka '_') attribute to specified length"; | |
96 | o.off1 = offsetof(Options, oi_attr_len_low); | |
97 | o.off2 = offsetof(Options, oi_attr_len_high); | |
98 | o.def = 0; | |
99 | o.minval = 0; | |
100 | }), | |
101 | make_option([] (fio_option& o) { | |
102 | o.name = "snapset_attr_len"; | |
103 | o.lname = "Attr 'snapset' length"; | |
104 | o.type = FIO_OPT_STR_VAL; | |
105 | o.help = "Set 'snapset' attribute to specified length"; | |
106 | o.off1 = offsetof(Options, snapset_attr_len_low); | |
107 | o.off2 = offsetof(Options, snapset_attr_len_high); | |
108 | o.def = 0; | |
109 | o.minval = 0; | |
110 | }), | |
111 | make_option([] (fio_option& o) { | |
112 | o.name = "_fastinfo_omap_len"; | |
113 | o.lname = "'_fastinfo' omap entry length"; | |
114 | o.type = FIO_OPT_STR_VAL; | |
115 | o.help = "Set '_fastinfo' OMAP attribute to specified length"; | |
116 | o.off1 = offsetof(Options, _fastinfo_omap_len_low); | |
117 | o.off2 = offsetof(Options, _fastinfo_omap_len_high); | |
118 | o.def = 0; | |
119 | o.minval = 0; | |
120 | }), | |
121 | make_option([] (fio_option& o) { | |
122 | o.name = "pglog_simulation"; | |
123 | o.lname = "pglog behavior simulation"; | |
124 | o.type = FIO_OPT_BOOL; | |
125 | o.help = "Enables PG Log simulation behavior"; | |
126 | o.off1 = offsetof(Options, simulate_pglog); | |
127 | o.def = "0"; | |
128 | }), | |
129 | make_option([] (fio_option& o) { | |
130 | o.name = "pglog_omap_len"; | |
131 | o.lname = "pglog omap entry length"; | |
132 | o.type = FIO_OPT_STR_VAL; | |
133 | o.help = "Set pglog omap entry to specified length"; | |
134 | o.off1 = offsetof(Options, pglog_omap_len_low); | |
135 | o.off2 = offsetof(Options, pglog_omap_len_high); | |
136 | o.def = 0; | |
137 | o.minval = 0; | |
138 | }), | |
139 | make_option([] (fio_option& o) { | |
140 | o.name = "pglog_dup_omap_len"; | |
141 | o.lname = "uplicate pglog omap entry length"; | |
142 | o.type = FIO_OPT_STR_VAL; | |
143 | o.help = "Set duplicate pglog omap entry to specified length"; | |
144 | o.off1 = offsetof(Options, pglog_dup_omap_len_low); | |
145 | o.off2 = offsetof(Options, pglog_dup_omap_len_high); | |
146 | o.def = 0; | |
147 | o.minval = 0; | |
148 | }), | |
149 | make_option([] (fio_option& o) { | |
150 | o.name = "single_pool_mode"; | |
151 | o.lname = "single(shared among jobs) pool mode"; | |
152 | o.type = FIO_OPT_BOOL; | |
153 | o.help = "Enables the mode when all jobs run against the same pool"; | |
154 | o.off1 = offsetof(Options, single_pool_mode); | |
155 | o.def = "0"; | |
156 | }), | |
157 | make_option([] (fio_option& o) { | |
158 | o.name = "preallocate_files"; | |
159 | o.lname = "preallocate files on init"; | |
160 | o.type = FIO_OPT_BOOL; | |
161 | o.help = "Enables/disables file preallocation (touch and resize) on init"; | |
162 | o.off1 = offsetof(Options, preallocate_files); | |
163 | o.def = "1"; | |
164 | }), | |
9f95a23c TL |
165 | make_option([] (fio_option& o) { |
166 | o.name = "check_files"; | |
167 | o.lname = "ensure files exist and are correct on init"; | |
168 | o.type = FIO_OPT_BOOL; | |
169 | o.help = "Enables/disables checking of files on init"; | |
170 | o.off1 = offsetof(Options, check_files); | |
171 | o.def = "0"; | |
172 | }), | |
173 | make_option([] (fio_option& o) { | |
174 | o.name = "bluestore_throttle"; | |
175 | o.lname = "set bluestore throttle"; | |
176 | o.type = FIO_OPT_STR_STORE; | |
177 | o.help = "comma delimited list of throttle values", | |
178 | o.off1 = offsetof(Options, throttle_values); | |
179 | o.def = 0; | |
180 | }), | |
181 | make_option([] (fio_option& o) { | |
182 | o.name = "bluestore_deferred_throttle"; | |
183 | o.lname = "set bluestore deferred throttle"; | |
184 | o.type = FIO_OPT_STR_STORE; | |
185 | o.help = "comma delimited list of throttle values", | |
186 | o.off1 = offsetof(Options, deferred_throttle_values); | |
187 | o.def = 0; | |
188 | }), | |
189 | make_option([] (fio_option& o) { | |
190 | o.name = "vary_bluestore_throttle_period"; | |
191 | o.lname = "period between different throttle values"; | |
192 | o.type = FIO_OPT_STR_VAL; | |
193 | o.help = "set to non-zero value to periodically cycle through throttle options"; | |
194 | o.off1 = offsetof(Options, cycle_throttle_period); | |
195 | o.def = "0"; | |
196 | o.minval = 0; | |
197 | }), | |
7c673cae FG |
198 | {} // fio expects a 'null'-terminated list |
199 | }; | |
200 | ||
201 | ||
11fdf7f2 TL |
202 | struct Collection { |
203 | spg_t pg; | |
204 | coll_t cid; | |
205 | ObjectStore::CollectionHandle ch; | |
206 | // Can't use mutex directly in vectors hence dynamic allocation | |
207 | ||
208 | std::unique_ptr<std::mutex> lock; | |
209 | uint64_t pglog_ver_head = 1; | |
210 | uint64_t pglog_ver_tail = 1; | |
211 | uint64_t pglog_dup_ver_tail = 1; | |
212 | ||
213 | // use big pool ids to avoid clashing with existing collections | |
214 | static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff; | |
215 | ||
216 | Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch) | |
217 | : pg(pg), cid(pg), ch(_ch), | |
218 | lock(new std::mutex) { | |
219 | } | |
220 | }; | |
221 | ||
222 | int destroy_collections( | |
223 | std::unique_ptr<ObjectStore>& os, | |
224 | std::vector<Collection>& collections) | |
225 | { | |
226 | ObjectStore::Transaction t; | |
227 | bool failed = false; | |
228 | // remove our collections | |
229 | for (auto& coll : collections) { | |
230 | ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); | |
231 | t.remove(coll.cid, pgmeta_oid); | |
232 | t.remove_collection(coll.cid); | |
233 | int r = os->queue_transaction(coll.ch, std::move(t)); | |
234 | if (r && !failed) { | |
235 | derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl; | |
236 | failed = true; | |
237 | } | |
238 | } | |
239 | return 0; | |
240 | } | |
241 | ||
242 | int init_collections(std::unique_ptr<ObjectStore>& os, | |
243 | uint64_t pool, | |
244 | std::vector<Collection>& collections, | |
245 | uint64_t count) | |
246 | { | |
247 | ceph_assert(count > 0); | |
248 | collections.reserve(count); | |
249 | ||
250 | const int split_bits = cbits(count - 1); | |
251 | ||
252 | { | |
253 | // propagate Superblock object to ensure proper functioning of tools that | |
254 | // need it. E.g. ceph-objectstore-tool | |
255 | coll_t cid(coll_t::meta()); | |
256 | bool exists = os->collection_exists(cid); | |
257 | if (!exists) { | |
258 | auto ch = os->create_new_collection(cid); | |
259 | ||
260 | OSDSuperblock superblock; | |
261 | bufferlist bl; | |
262 | encode(superblock, bl); | |
263 | ||
264 | ObjectStore::Transaction t; | |
265 | t.create_collection(cid, split_bits); | |
266 | t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); | |
267 | int r = os->queue_transaction(ch, std::move(t)); | |
268 | ||
269 | if (r < 0) { | |
270 | derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl; | |
271 | return r; | |
272 | } | |
273 | } | |
274 | } | |
275 | ||
276 | for (uint32_t i = 0; i < count; i++) { | |
277 | auto pg = spg_t{pg_t{i, pool}}; | |
278 | coll_t cid(pg); | |
279 | ||
280 | bool exists = os->collection_exists(cid); | |
281 | auto ch = exists ? | |
282 | os->open_collection(cid) : | |
283 | os->create_new_collection(cid) ; | |
284 | ||
285 | collections.emplace_back(pg, ch); | |
286 | ||
287 | ObjectStore::Transaction t; | |
288 | auto& coll = collections.back(); | |
289 | if (!exists) { | |
290 | t.create_collection(coll.cid, split_bits); | |
291 | ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); | |
292 | t.touch(coll.cid, pgmeta_oid); | |
293 | int r = os->queue_transaction(coll.ch, std::move(t)); | |
294 | if (r) { | |
295 | derr << "Engine init failed with " << cpp_strerror(-r) << dendl; | |
296 | destroy_collections(os, collections); | |
297 | return r; | |
298 | } | |
299 | } | |
300 | } | |
301 | return 0; | |
302 | } | |
303 | ||
7c673cae FG |
304 | /// global engine state shared between all jobs within the process. this |
305 | /// includes g_ceph_context and the ObjectStore instance | |
306 | struct Engine { | |
307 | /// the initial g_ceph_context reference to be dropped on destruction | |
308 | boost::intrusive_ptr<CephContext> cct; | |
309 | std::unique_ptr<ObjectStore> os; | |
310 | ||
11fdf7f2 TL |
311 | std::vector<Collection> collections; //< shared collections to spread objects over |
312 | ||
7c673cae FG |
313 | std::mutex lock; |
314 | int ref_count; | |
11fdf7f2 | 315 | const bool unlink; //< unlink objects on destruction |
7c673cae | 316 | |
9f95a23c TL |
317 | // file to which to output formatted perf information |
318 | const std::optional<std::string> perf_output_file; | |
319 | ||
11fdf7f2 | 320 | explicit Engine(thread_data* td); |
7c673cae FG |
321 | ~Engine(); |
322 | ||
323 | static Engine* get_instance(thread_data* td) { | |
324 | // note: creates an Engine with the options associated with the first job | |
325 | static Engine engine(td); | |
326 | return &engine; | |
327 | } | |
328 | ||
329 | void ref() { | |
330 | std::lock_guard<std::mutex> l(lock); | |
331 | ++ref_count; | |
332 | } | |
333 | void deref() { | |
334 | std::lock_guard<std::mutex> l(lock); | |
335 | --ref_count; | |
336 | if (!ref_count) { | |
337 | ostringstream ostr; | |
9f95a23c TL |
338 | Formatter* f = Formatter::create( |
339 | "json-pretty", "json-pretty", "json-pretty"); | |
340 | f->open_object_section("perf_output"); | |
1e59de90 | 341 | cct->get_perfcounters_collection()->dump_formatted(f, false, false); |
11fdf7f2 | 342 | if (g_conf()->rocksdb_perf) { |
9f95a23c | 343 | f->open_object_section("rocksdb_perf"); |
31f18b77 | 344 | os->get_db_statistics(f); |
9f95a23c | 345 | f->close_section(); |
31f18b77 | 346 | } |
11fdf7f2 | 347 | mempool::dump(f); |
9f95a23c TL |
348 | { |
349 | f->open_object_section("db_histogram"); | |
350 | os->generate_db_histogram(f); | |
351 | f->close_section(); | |
352 | } | |
11fdf7f2 | 353 | f->close_section(); |
11fdf7f2 | 354 | |
11fdf7f2 | 355 | f->flush(ostr); |
7c673cae | 356 | delete f; |
11fdf7f2 TL |
357 | |
358 | if (unlink) { | |
359 | destroy_collections(os, collections); | |
360 | } | |
7c673cae | 361 | os->umount(); |
9f95a23c TL |
362 | dout(0) << "FIO plugin perf dump:" << dendl; |
363 | dout(0) << ostr.str() << dendl; | |
364 | if (perf_output_file) { | |
365 | try { | |
366 | std::ofstream foutput(*perf_output_file); | |
367 | foutput << ostr.str() << std::endl; | |
368 | } catch (std::exception &e) { | |
369 | std::cerr << "Unable to write formatted output to " | |
370 | << *perf_output_file | |
371 | << ", exception: " << e.what() | |
372 | << std::endl; | |
373 | } | |
374 | } | |
7c673cae FG |
375 | } |
376 | } | |
377 | }; | |
378 | ||
9f95a23c TL |
379 | TracepointProvider::Traits bluestore_tracepoint_traits("libbluestore_tp.so", |
380 | "bluestore_tracing"); | |
381 | ||
11fdf7f2 TL |
382 | Engine::Engine(thread_data* td) |
383 | : ref_count(0), | |
9f95a23c TL |
384 | unlink(td->o.unlink), |
385 | perf_output_file( | |
386 | static_cast<Options*>(td->eo)->perf_output_file ? | |
387 | std::make_optional(static_cast<Options*>(td->eo)->perf_output_file) : | |
388 | std::nullopt) | |
7c673cae FG |
389 | { |
390 | // add the ceph command line arguments | |
11fdf7f2 | 391 | auto o = static_cast<Options*>(td->eo); |
7c673cae FG |
392 | if (!o->conf) { |
393 | throw std::runtime_error("missing conf option for ceph configuration file"); | |
394 | } | |
395 | std::vector<const char*> args{ | |
396 | "-i", "0", // identify as osd.0 for osd_data and osd_journal | |
397 | "--conf", o->conf, // use the requested conf file | |
398 | }; | |
399 | if (td->o.directory) { // allow conf files to use ${fio_dir} for data | |
400 | args.emplace_back("--fio_dir"); | |
401 | args.emplace_back(td->o.directory); | |
402 | } | |
403 | ||
404 | // claim the g_ceph_context reference and release it on destruction | |
405 | cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, | |
11fdf7f2 TL |
406 | CODE_ENVIRONMENT_UTILITY, |
407 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); | |
7c673cae FG |
408 | common_init_finish(g_ceph_context); |
409 | ||
9f95a23c TL |
410 | TracepointProvider::initialize<bluestore_tracepoint_traits>(g_ceph_context); |
411 | ||
7c673cae | 412 | // create the ObjectStore |
20effc67 TL |
413 | os = ObjectStore::create(g_ceph_context, |
414 | g_conf().get_val<std::string>("osd objectstore"), | |
415 | g_conf().get_val<std::string>("osd data"), | |
416 | g_conf().get_val<std::string>("osd journal")); | |
7c673cae | 417 | if (!os) |
11fdf7f2 | 418 | throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore); |
7c673cae | 419 | |
31f18b77 | 420 | unsigned num_shards; |
11fdf7f2 TL |
421 | if(g_conf()->osd_op_num_shards) |
422 | num_shards = g_conf()->osd_op_num_shards; | |
31f18b77 | 423 | else if(os->is_rotational()) |
11fdf7f2 | 424 | num_shards = g_conf()->osd_op_num_shards_hdd; |
31f18b77 | 425 | else |
11fdf7f2 | 426 | num_shards = g_conf()->osd_op_num_shards_ssd; |
31f18b77 | 427 | os->set_cache_shards(num_shards); |
7c673cae | 428 | |
11fdf7f2 TL |
429 | //normalize options |
430 | o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high); | |
431 | o->snapset_attr_len_high = max(o->snapset_attr_len_low, | |
432 | o->snapset_attr_len_high); | |
433 | o->pglog_omap_len_high = max(o->pglog_omap_len_low, | |
434 | o->pglog_omap_len_high); | |
435 | o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low, | |
436 | o->pglog_dup_omap_len_high); | |
437 | o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low, | |
438 | o->_fastinfo_omap_len_high); | |
439 | ||
7c673cae FG |
440 | int r = os->mkfs(); |
441 | if (r < 0) | |
442 | throw std::system_error(-r, std::system_category(), "mkfs failed"); | |
443 | ||
444 | r = os->mount(); | |
445 | if (r < 0) | |
446 | throw std::system_error(-r, std::system_category(), "mount failed"); | |
11fdf7f2 TL |
447 | |
448 | // create shared collections up to osd_pool_default_pg_num | |
449 | if (o->single_pool_mode) { | |
450 | uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num"); | |
451 | if (count > td->o.nr_files) | |
452 | count = td->o.nr_files; | |
453 | init_collections(os, Collection::MIN_POOL_ID, collections, count); | |
454 | } | |
7c673cae FG |
455 | } |
456 | ||
457 | Engine::~Engine() | |
458 | { | |
11fdf7f2 | 459 | ceph_assert(!ref_count); |
7c673cae FG |
460 | } |
461 | ||
7c673cae FG |
462 | struct Object { |
463 | ghobject_t oid; | |
464 | Collection& coll; | |
465 | ||
466 | Object(const char* name, Collection& coll) | |
467 | : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")), | |
468 | coll(coll) {} | |
469 | }; | |
470 | ||
11fdf7f2 TL |
471 | /// treat each fio job either like a separate pool with its own collections and objects |
472 | /// or just a client using its own objects from the shared pool | |
7c673cae FG |
473 | struct Job { |
474 | Engine* engine; //< shared ptr to the global Engine | |
9f95a23c | 475 | const unsigned subjob_number; //< subjob num |
11fdf7f2 | 476 | std::vector<Collection> collections; //< job's private collections to spread objects over |
7c673cae FG |
477 | std::vector<Object> objects; //< associate an object with each fio_file |
478 | std::vector<io_u*> events; //< completions for fio_ceph_os_event() | |
479 | const bool unlink; //< unlink objects on destruction | |
480 | ||
11fdf7f2 TL |
481 | bufferptr one_for_all_data; //< preallocated buffer long enough |
482 | //< to use for vairious operations | |
9f95a23c TL |
483 | std::mutex throttle_lock; |
484 | const vector<unsigned> throttle_values; | |
485 | const vector<unsigned> deferred_throttle_values; | |
486 | std::chrono::duration<double> cycle_throttle_period; | |
487 | mono_clock::time_point last = ceph::mono_clock::zero(); | |
488 | unsigned index = 0; | |
489 | ||
490 | static vector<unsigned> parse_throttle_str(const char *p) { | |
491 | vector<unsigned> ret; | |
492 | if (p == nullptr) { | |
493 | return ret; | |
494 | } | |
495 | ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable { | |
496 | if (s.size() > 0) { | |
497 | ret.push_back(std::stoul(std::string(s))); | |
498 | } | |
499 | }); | |
500 | return ret; | |
501 | } | |
502 | void check_throttle(); | |
11fdf7f2 | 503 | |
7c673cae FG |
504 | Job(Engine* engine, const thread_data* td); |
505 | ~Job(); | |
506 | }; | |
507 | ||
508 | Job::Job(Engine* engine, const thread_data* td) | |
509 | : engine(engine), | |
9f95a23c | 510 | subjob_number(td->subjob_number), |
7c673cae | 511 | events(td->o.iodepth), |
9f95a23c TL |
512 | unlink(td->o.unlink), |
513 | throttle_values( | |
514 | parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)), | |
515 | deferred_throttle_values( | |
516 | parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)), | |
517 | cycle_throttle_period( | |
518 | static_cast<Options*>(td->eo)->cycle_throttle_period) | |
7c673cae FG |
519 | { |
520 | engine->ref(); | |
11fdf7f2 TL |
521 | auto o = static_cast<Options*>(td->eo); |
522 | unsigned long long max_data = max(o->oi_attr_len_high, | |
523 | o->snapset_attr_len_high); | |
524 | max_data = max(max_data, o->pglog_omap_len_high); | |
525 | max_data = max(max_data, o->pglog_dup_omap_len_high); | |
526 | max_data = max(max_data, o->_fastinfo_omap_len_high); | |
527 | one_for_all_data = buffer::create(max_data); | |
528 | ||
529 | std::vector<Collection>* colls; | |
530 | // create private collections up to osd_pool_default_pg_num | |
531 | if (!o->single_pool_mode) { | |
532 | uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num"); | |
533 | if (count > td->o.nr_files) | |
534 | count = td->o.nr_files; | |
535 | // use the fio thread_number for our unique pool id | |
536 | const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1; | |
537 | init_collections(engine->os, pool, collections, count); | |
538 | colls = &collections; | |
539 | } else { | |
540 | colls = &engine->collections; | |
7c673cae | 541 | } |
7c673cae | 542 | const uint64_t file_size = td->o.size / max(1u, td->o.nr_files); |
11fdf7f2 | 543 | ObjectStore::Transaction t; |
7c673cae FG |
544 | |
545 | // create an object for each file in the job | |
11fdf7f2 | 546 | objects.reserve(td->o.nr_files); |
9f95a23c | 547 | unsigned checked_or_preallocated = 0; |
7c673cae FG |
548 | for (uint32_t i = 0; i < td->o.nr_files; i++) { |
549 | auto f = td->files[i]; | |
550 | f->real_file_size = file_size; | |
551 | f->engine_pos = i; | |
552 | ||
11fdf7f2 TL |
553 | // associate each object with a collection in a round-robin fashion. |
554 | auto& coll = (*colls)[i % colls->size()]; | |
7c673cae FG |
555 | |
556 | objects.emplace_back(f->file_name, coll); | |
11fdf7f2 TL |
557 | if (o->preallocate_files) { |
558 | auto& oid = objects.back().oid; | |
559 | t.touch(coll.cid, oid); | |
560 | t.truncate(coll.cid, oid, file_size); | |
561 | int r = engine->os->queue_transaction(coll.ch, std::move(t)); | |
562 | if (r) { | |
563 | engine->deref(); | |
564 | throw std::system_error(r, std::system_category(), "job init"); | |
565 | } | |
566 | } | |
9f95a23c TL |
567 | if (o->check_files) { |
568 | auto& oid = objects.back().oid; | |
569 | struct stat st; | |
570 | int r = engine->os->stat(coll.ch, oid, &st); | |
571 | if (r || ((unsigned)st.st_size) != file_size) { | |
572 | derr << "Problem checking " << oid << ", r=" << r | |
573 | << ", st.st_size=" << st.st_size | |
574 | << ", file_size=" << file_size | |
575 | << ", nr_files=" << td->o.nr_files << dendl; | |
576 | engine->deref(); | |
577 | throw std::system_error( | |
578 | r, std::system_category(), "job init -- cannot check file"); | |
579 | } | |
580 | } | |
581 | if (o->check_files || o->preallocate_files) { | |
582 | ++checked_or_preallocated; | |
583 | } | |
584 | } | |
585 | if (o->check_files) { | |
586 | derr << "fio_ceph_objectstore checked " << checked_or_preallocated | |
587 | << " files"<< dendl; | |
588 | } | |
589 | if (o->preallocate_files ){ | |
590 | derr << "fio_ceph_objectstore preallocated " << checked_or_preallocated | |
591 | << " files"<< dendl; | |
7c673cae FG |
592 | } |
593 | } | |
594 | ||
595 | Job::~Job() | |
596 | { | |
597 | if (unlink) { | |
598 | ObjectStore::Transaction t; | |
11fdf7f2 | 599 | bool failed = false; |
7c673cae FG |
600 | // remove our objects |
601 | for (auto& obj : objects) { | |
602 | t.remove(obj.coll.cid, obj.oid); | |
11fdf7f2 TL |
603 | int r = engine->os->queue_transaction(obj.coll.ch, std::move(t)); |
604 | if (r && !failed) { | |
605 | derr << "job cleanup failed with " << cpp_strerror(-r) << dendl; | |
606 | failed = true; | |
607 | } | |
7c673cae | 608 | } |
11fdf7f2 | 609 | destroy_collections(engine->os, collections); |
7c673cae FG |
610 | } |
611 | engine->deref(); | |
612 | } | |
613 | ||
9f95a23c TL |
614 | void Job::check_throttle() |
615 | { | |
616 | if (subjob_number != 0) | |
617 | return; | |
618 | ||
619 | std::lock_guard<std::mutex> l(throttle_lock); | |
620 | if (throttle_values.empty() && deferred_throttle_values.empty()) | |
621 | return; | |
622 | ||
623 | if (ceph::mono_clock::is_zero(last) || | |
624 | ((cycle_throttle_period != cycle_throttle_period.zero()) && | |
625 | (ceph::mono_clock::now() - last) > cycle_throttle_period)) { | |
626 | unsigned tvals = throttle_values.size() ? throttle_values.size() : 1; | |
627 | unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1; | |
628 | if (!throttle_values.empty()) { | |
629 | std::string val = std::to_string(throttle_values[index % tvals]); | |
630 | std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl; | |
631 | int r = engine->cct->_conf.set_val( | |
632 | "bluestore_throttle_bytes", | |
633 | val, | |
634 | nullptr); | |
635 | ceph_assert(r == 0); | |
636 | } | |
637 | if (!deferred_throttle_values.empty()) { | |
638 | std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]); | |
639 | std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl; | |
640 | int r = engine->cct->_conf.set_val( | |
641 | "bluestore_throttle_deferred_bytes", | |
642 | val, | |
643 | nullptr); | |
644 | ceph_assert(r == 0); | |
645 | } | |
646 | engine->cct->_conf.apply_changes(nullptr); | |
647 | index++; | |
648 | index %= tvals * dtvals; | |
649 | last = ceph::mono_clock::now(); | |
650 | } | |
651 | } | |
652 | ||
7c673cae FG |
653 | int fio_ceph_os_setup(thread_data* td) |
654 | { | |
655 | // if there are multiple jobs, they must run in the same process against a | |
656 | // single instance of the ObjectStore. explicitly disable fio's default | |
657 | // job-per-process configuration | |
658 | td->o.use_thread = 1; | |
659 | ||
660 | try { | |
661 | // get or create the global Engine instance | |
662 | auto engine = Engine::get_instance(td); | |
663 | // create a Job for this thread | |
664 | td->io_ops_data = new Job(engine, td); | |
665 | } catch (std::exception& e) { | |
666 | std::cerr << "setup failed with " << e.what() << std::endl; | |
667 | return -1; | |
668 | } | |
669 | return 0; | |
670 | } | |
671 | ||
672 | void fio_ceph_os_cleanup(thread_data* td) | |
673 | { | |
674 | auto job = static_cast<Job*>(td->io_ops_data); | |
675 | td->io_ops_data = nullptr; | |
676 | delete job; | |
677 | } | |
678 | ||
679 | ||
680 | io_u* fio_ceph_os_event(thread_data* td, int event) | |
681 | { | |
682 | // return the requested event from fio_ceph_os_getevents() | |
683 | auto job = static_cast<Job*>(td->io_ops_data); | |
684 | return job->events[event]; | |
685 | } | |
686 | ||
687 | int fio_ceph_os_getevents(thread_data* td, unsigned int min, | |
688 | unsigned int max, const timespec* t) | |
689 | { | |
690 | auto job = static_cast<Job*>(td->io_ops_data); | |
691 | unsigned int events = 0; | |
11fdf7f2 TL |
692 | io_u* u = NULL; |
693 | unsigned int i = 0; | |
7c673cae FG |
694 | |
695 | // loop through inflight ios until we find 'min' completions | |
696 | do { | |
697 | io_u_qiter(&td->io_u_all, u, i) { | |
698 | if (!(u->flags & IO_U_F_FLIGHT)) | |
699 | continue; | |
700 | ||
701 | if (u->engine_data) { | |
702 | u->engine_data = nullptr; | |
703 | job->events[events] = u; | |
704 | events++; | |
705 | } | |
706 | } | |
707 | if (events >= min) | |
708 | break; | |
709 | usleep(100); | |
710 | } while (1); | |
711 | ||
712 | return events; | |
713 | } | |
714 | ||
715 | /// completion context for ObjectStore::queue_transaction() | |
716 | class UnitComplete : public Context { | |
717 | io_u* u; | |
718 | public: | |
11fdf7f2 | 719 | explicit UnitComplete(io_u* u) : u(u) {} |
7c673cae FG |
720 | void finish(int r) { |
721 | // mark the pointer to indicate completion for fio_ceph_os_getevents() | |
722 | u->engine_data = reinterpret_cast<void*>(1ull); | |
723 | } | |
724 | }; | |
725 | ||
11fdf7f2 | 726 | enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u) |
7c673cae FG |
727 | { |
728 | fio_ro_check(td, u); | |
729 | ||
11fdf7f2 TL |
730 | |
731 | ||
732 | auto o = static_cast<const Options*>(td->eo); | |
7c673cae FG |
733 | auto job = static_cast<Job*>(td->io_ops_data); |
734 | auto& object = job->objects[u->file->engine_pos]; | |
735 | auto& coll = object.coll; | |
736 | auto& os = job->engine->os; | |
737 | ||
9f95a23c TL |
738 | job->check_throttle(); |
739 | ||
7c673cae FG |
740 | if (u->ddir == DDIR_WRITE) { |
741 | // provide a hint if we're likely to read this data back | |
742 | const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0; | |
743 | ||
744 | bufferlist bl; | |
745 | bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf), | |
746 | u->xfer_buflen ) ); | |
747 | ||
20effc67 | 748 | map<string,bufferptr,less<>> attrset; |
11fdf7f2 TL |
749 | map<string, bufferlist> omaps; |
750 | // enqueue a write transaction on the collection's handle | |
7c673cae | 751 | ObjectStore::Transaction t; |
11fdf7f2 TL |
752 | char ver_key[64]; |
753 | ||
754 | // fill attrs if any | |
755 | if (o->oi_attr_len_high) { | |
756 | ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low); | |
757 | // fill with the garbage as we do not care of the actual content... | |
758 | job->one_for_all_data.set_length( | |
759 | ceph::util::generate_random_number( | |
760 | o->oi_attr_len_low, o->oi_attr_len_high)); | |
761 | attrset["_"] = job->one_for_all_data; | |
762 | } | |
763 | if (o->snapset_attr_len_high) { | |
764 | ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low); | |
765 | job->one_for_all_data.set_length( | |
766 | ceph::util::generate_random_number | |
767 | (o->snapset_attr_len_low, o->snapset_attr_len_high)); | |
768 | attrset["snapset"] = job->one_for_all_data; | |
769 | ||
770 | } | |
771 | if (o->_fastinfo_omap_len_high) { | |
772 | ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low); | |
773 | // fill with the garbage as we do not care of the actual content... | |
774 | job->one_for_all_data.set_length( | |
775 | ceph::util::generate_random_number( | |
776 | o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high)); | |
777 | omaps["_fastinfo"].append(job->one_for_all_data); | |
778 | } | |
779 | ||
780 | uint64_t pglog_trim_head = 0, pglog_trim_tail = 0; | |
781 | uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0; | |
782 | if (o->simulate_pglog) { | |
783 | ||
784 | uint64_t pglog_ver_cnt = 0; | |
785 | { | |
786 | std::lock_guard<std::mutex> l(*coll.lock); | |
787 | pglog_ver_cnt = coll.pglog_ver_head++; | |
788 | if (o->pglog_omap_len_high && | |
789 | pglog_ver_cnt >= | |
790 | coll.pglog_ver_tail + | |
791 | g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) { | |
792 | pglog_trim_tail = coll.pglog_ver_tail; | |
793 | coll.pglog_ver_tail = pglog_trim_head = | |
794 | pglog_trim_tail + g_conf()->osd_pg_log_trim_min; | |
795 | ||
796 | if (o->pglog_dup_omap_len_high && | |
797 | pglog_ver_cnt >= | |
798 | coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked + | |
799 | g_conf()->osd_pg_log_trim_min) { | |
800 | pglog_dup_trim_tail = coll.pglog_dup_ver_tail; | |
801 | coll.pglog_dup_ver_tail = pglog_dup_trim_head = | |
802 | pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min; | |
803 | } | |
804 | } | |
805 | } | |
806 | ||
807 | if (o->pglog_omap_len_high) { | |
808 | ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low); | |
809 | snprintf(ver_key, sizeof(ver_key), | |
810 | "0000000011.%020llu", (unsigned long long)pglog_ver_cnt); | |
811 | // fill with the garbage as we do not care of the actual content... | |
812 | job->one_for_all_data.set_length( | |
813 | ceph::util::generate_random_number( | |
814 | o->pglog_omap_len_low, o->pglog_omap_len_high)); | |
815 | omaps[ver_key].append(job->one_for_all_data); | |
816 | } | |
817 | if (o->pglog_dup_omap_len_high) { | |
818 | //insert dup | |
819 | ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low); | |
820 | for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { | |
821 | snprintf(ver_key, sizeof(ver_key), | |
822 | "dup_0000000011.%020llu", (unsigned long long)i); | |
823 | // fill with the garbage as we do not care of the actual content... | |
824 | job->one_for_all_data.set_length( | |
825 | ceph::util::generate_random_number( | |
826 | o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high)); | |
827 | omaps[ver_key].append(job->one_for_all_data); | |
828 | } | |
829 | } | |
830 | } | |
831 | ||
20effc67 | 832 | if (!attrset.empty()) { |
11fdf7f2 TL |
833 | t.setattrs(coll.cid, object.oid, attrset); |
834 | } | |
7c673cae | 835 | t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags); |
11fdf7f2 TL |
836 | |
837 | set<string> rmkeys; | |
838 | for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { | |
839 | snprintf(ver_key, sizeof(ver_key), | |
840 | "0000000011.%020llu", (unsigned long long)i); | |
841 | rmkeys.emplace(ver_key); | |
842 | } | |
843 | for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) { | |
844 | snprintf(ver_key, sizeof(ver_key), | |
845 | "dup_0000000011.%020llu", (unsigned long long)i); | |
846 | rmkeys.emplace(ver_key); | |
847 | } | |
848 | ||
849 | if (rmkeys.size()) { | |
850 | ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); | |
851 | t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys); | |
852 | } | |
853 | ||
854 | if (omaps.size()) { | |
855 | ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); | |
856 | t.omap_setkeys(coll.cid, pgmeta_oid, omaps); | |
857 | } | |
858 | t.register_on_commit(new UnitComplete(u)); | |
859 | os->queue_transaction(coll.ch, | |
860 | std::move(t)); | |
7c673cae FG |
861 | return FIO_Q_QUEUED; |
862 | } | |
863 | ||
864 | if (u->ddir == DDIR_READ) { | |
865 | // ObjectStore reads are synchronous, so make the call and return COMPLETED | |
866 | bufferlist bl; | |
11fdf7f2 | 867 | int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl); |
7c673cae FG |
868 | if (r < 0) { |
869 | u->error = r; | |
870 | td_verror(td, u->error, "xfer"); | |
871 | } else { | |
9f95a23c | 872 | bl.begin().copy(bl.length(), static_cast<char*>(u->xfer_buf)); |
7c673cae FG |
873 | u->resid = u->xfer_buflen - r; |
874 | } | |
875 | return FIO_Q_COMPLETED; | |
876 | } | |
877 | ||
878 | derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl; | |
879 | u->error = -EINVAL; | |
880 | td_verror(td, u->error, "xfer"); | |
881 | return FIO_Q_COMPLETED; | |
882 | } | |
883 | ||
884 | int fio_ceph_os_commit(thread_data* td) | |
885 | { | |
886 | // commit() allows the engine to batch up queued requests to be submitted all | |
887 | // at once. it would be natural for queue() to collect transactions in a list, | |
888 | // and use commit() to pass them all to ObjectStore::queue_transactions(). but | |
889 | // because we spread objects over multiple collections, we a) need to use a | |
890 | // different sequencer for each collection, and b) are less likely to see a | |
891 | // benefit from batching requests within a collection | |
892 | return 0; | |
893 | } | |
894 | ||
895 | // open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to | |
896 | // prevent fio from creating the files | |
897 | int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; } | |
898 | int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; } | |
899 | ||
900 | int fio_ceph_os_io_u_init(thread_data* td, io_u* u) | |
901 | { | |
902 | // no data is allocated, we just use the pointer as a boolean 'completed' flag | |
903 | u->engine_data = nullptr; | |
904 | return 0; | |
905 | } | |
906 | ||
907 | void fio_ceph_os_io_u_free(thread_data* td, io_u* u) | |
908 | { | |
909 | u->engine_data = nullptr; | |
910 | } | |
911 | ||
912 | ||
913 | // ioengine_ops for get_ioengine() | |
914 | struct ceph_ioengine : public ioengine_ops { | |
915 | ceph_ioengine() : ioengine_ops({}) { | |
916 | name = "ceph-os"; | |
917 | version = FIO_IOOPS_VERSION; | |
918 | flags = FIO_DISKLESSIO; | |
919 | setup = fio_ceph_os_setup; | |
920 | queue = fio_ceph_os_queue; | |
921 | commit = fio_ceph_os_commit; | |
922 | getevents = fio_ceph_os_getevents; | |
923 | event = fio_ceph_os_event; | |
924 | cleanup = fio_ceph_os_cleanup; | |
925 | open_file = fio_ceph_os_open; | |
926 | close_file = fio_ceph_os_close; | |
927 | io_u_init = fio_ceph_os_io_u_init; | |
928 | io_u_free = fio_ceph_os_io_u_free; | |
929 | options = ceph_options.data(); | |
930 | option_struct_size = sizeof(struct Options); | |
931 | } | |
932 | }; | |
933 | ||
934 | } // anonymous namespace | |
935 | ||
936 | extern "C" { | |
937 | // the exported fio engine interface | |
938 | void get_ioengine(struct ioengine_ops** ioengine_ptr) { | |
939 | static ceph_ioengine ioengine; | |
940 | *ioengine_ptr = &ioengine; | |
941 | } | |
942 | } // extern "C" |