]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/fio/fio_ceph_objectstore.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / fio / fio_ceph_objectstore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph ObjectStore engine
5 *
6 * IO engine using Ceph's ObjectStore class to test low-level performance of
7 * Ceph OSDs.
8 *
9 */
10
11#include <memory>
12#include <system_error>
13#include <vector>
9f95a23c 14#include <fstream>
7c673cae
FG
15
16#include "os/ObjectStore.h"
17#include "global/global_init.h"
18#include "common/errno.h"
19#include "include/intarith.h"
20#include "include/stringify.h"
11fdf7f2 21#include "include/random.h"
9f95a23c 22#include "include/str_list.h"
224ce89b 23#include "common/perf_counters.h"
9f95a23c 24#include "common/TracepointProvider.h"
7c673cae
FG
25
26#include <fio.h>
27#include <optgroup.h>
28
11fdf7f2
TL
29#include "include/ceph_assert.h" // fio.h clobbers our assert.h
30#include <algorithm>
7c673cae
FG
31
32#define dout_context g_ceph_context
33#define dout_subsys ceph_subsys_
34
20effc67
TL
35using namespace std;
36
7c673cae
FG
37namespace {
38
39/// fio configuration options read from the job file
40struct Options {
41 thread_data* td;
42 char* conf;
9f95a23c
TL
43 char* perf_output_file;
44 char* throttle_values;
45 char* deferred_throttle_values;
11fdf7f2 46 unsigned long long
9f95a23c 47 cycle_throttle_period,
11fdf7f2
TL
48 oi_attr_len_low,
49 oi_attr_len_high,
50 snapset_attr_len_low,
51 snapset_attr_len_high,
52 pglog_omap_len_low,
53 pglog_omap_len_high,
54 pglog_dup_omap_len_low,
55 pglog_dup_omap_len_high,
56 _fastinfo_omap_len_low,
57 _fastinfo_omap_len_high;
58 unsigned simulate_pglog;
59 unsigned single_pool_mode;
60 unsigned preallocate_files;
9f95a23c 61 unsigned check_files;
7c673cae
FG
62};
63
64template <class Func> // void Func(fio_option&)
65fio_option make_option(Func&& func)
66{
67 // zero-initialize and set common defaults
68 auto o = fio_option{};
69 o.category = FIO_OPT_C_ENGINE;
70 o.group = FIO_OPT_G_RBD;
71 func(std::ref(o));
72 return o;
73}
74
75static std::vector<fio_option> ceph_options{
76 make_option([] (fio_option& o) {
77 o.name = "conf";
78 o.lname = "ceph configuration file";
79 o.type = FIO_OPT_STR_STORE;
80 o.help = "Path to a ceph configuration file";
81 o.off1 = offsetof(Options, conf);
82 }),
9f95a23c
TL
83 make_option([] (fio_option& o) {
84 o.name = "perf_output_file";
85 o.lname = "perf output target";
86 o.type = FIO_OPT_STR_STORE;
87 o.help = "Path to which to write json formatted perf output";
88 o.off1 = offsetof(Options, perf_output_file);
89 o.def = 0;
90 }),
11fdf7f2
TL
91 make_option([] (fio_option& o) {
92 o.name = "oi_attr_len";
93 o.lname = "OI Attr length";
94 o.type = FIO_OPT_STR_VAL;
95 o.help = "Set OI(aka '_') attribute to specified length";
96 o.off1 = offsetof(Options, oi_attr_len_low);
97 o.off2 = offsetof(Options, oi_attr_len_high);
98 o.def = 0;
99 o.minval = 0;
100 }),
101 make_option([] (fio_option& o) {
102 o.name = "snapset_attr_len";
103 o.lname = "Attr 'snapset' length";
104 o.type = FIO_OPT_STR_VAL;
105 o.help = "Set 'snapset' attribute to specified length";
106 o.off1 = offsetof(Options, snapset_attr_len_low);
107 o.off2 = offsetof(Options, snapset_attr_len_high);
108 o.def = 0;
109 o.minval = 0;
110 }),
111 make_option([] (fio_option& o) {
112 o.name = "_fastinfo_omap_len";
113 o.lname = "'_fastinfo' omap entry length";
114 o.type = FIO_OPT_STR_VAL;
115 o.help = "Set '_fastinfo' OMAP attribute to specified length";
116 o.off1 = offsetof(Options, _fastinfo_omap_len_low);
117 o.off2 = offsetof(Options, _fastinfo_omap_len_high);
118 o.def = 0;
119 o.minval = 0;
120 }),
121 make_option([] (fio_option& o) {
122 o.name = "pglog_simulation";
123 o.lname = "pglog behavior simulation";
124 o.type = FIO_OPT_BOOL;
125 o.help = "Enables PG Log simulation behavior";
126 o.off1 = offsetof(Options, simulate_pglog);
127 o.def = "0";
128 }),
129 make_option([] (fio_option& o) {
130 o.name = "pglog_omap_len";
131 o.lname = "pglog omap entry length";
132 o.type = FIO_OPT_STR_VAL;
133 o.help = "Set pglog omap entry to specified length";
134 o.off1 = offsetof(Options, pglog_omap_len_low);
135 o.off2 = offsetof(Options, pglog_omap_len_high);
136 o.def = 0;
137 o.minval = 0;
138 }),
139 make_option([] (fio_option& o) {
140 o.name = "pglog_dup_omap_len";
141 o.lname = "uplicate pglog omap entry length";
142 o.type = FIO_OPT_STR_VAL;
143 o.help = "Set duplicate pglog omap entry to specified length";
144 o.off1 = offsetof(Options, pglog_dup_omap_len_low);
145 o.off2 = offsetof(Options, pglog_dup_omap_len_high);
146 o.def = 0;
147 o.minval = 0;
148 }),
149 make_option([] (fio_option& o) {
150 o.name = "single_pool_mode";
151 o.lname = "single(shared among jobs) pool mode";
152 o.type = FIO_OPT_BOOL;
153 o.help = "Enables the mode when all jobs run against the same pool";
154 o.off1 = offsetof(Options, single_pool_mode);
155 o.def = "0";
156 }),
157 make_option([] (fio_option& o) {
158 o.name = "preallocate_files";
159 o.lname = "preallocate files on init";
160 o.type = FIO_OPT_BOOL;
161 o.help = "Enables/disables file preallocation (touch and resize) on init";
162 o.off1 = offsetof(Options, preallocate_files);
163 o.def = "1";
164 }),
9f95a23c
TL
165 make_option([] (fio_option& o) {
166 o.name = "check_files";
167 o.lname = "ensure files exist and are correct on init";
168 o.type = FIO_OPT_BOOL;
169 o.help = "Enables/disables checking of files on init";
170 o.off1 = offsetof(Options, check_files);
171 o.def = "0";
172 }),
173 make_option([] (fio_option& o) {
174 o.name = "bluestore_throttle";
175 o.lname = "set bluestore throttle";
176 o.type = FIO_OPT_STR_STORE;
177 o.help = "comma delimited list of throttle values",
178 o.off1 = offsetof(Options, throttle_values);
179 o.def = 0;
180 }),
181 make_option([] (fio_option& o) {
182 o.name = "bluestore_deferred_throttle";
183 o.lname = "set bluestore deferred throttle";
184 o.type = FIO_OPT_STR_STORE;
185 o.help = "comma delimited list of throttle values",
186 o.off1 = offsetof(Options, deferred_throttle_values);
187 o.def = 0;
188 }),
189 make_option([] (fio_option& o) {
190 o.name = "vary_bluestore_throttle_period";
191 o.lname = "period between different throttle values";
192 o.type = FIO_OPT_STR_VAL;
193 o.help = "set to non-zero value to periodically cycle through throttle options";
194 o.off1 = offsetof(Options, cycle_throttle_period);
195 o.def = "0";
196 o.minval = 0;
197 }),
7c673cae
FG
198 {} // fio expects a 'null'-terminated list
199};
200
201
11fdf7f2
TL
202struct Collection {
203 spg_t pg;
204 coll_t cid;
205 ObjectStore::CollectionHandle ch;
206 // Can't use mutex directly in vectors hence dynamic allocation
207
208 std::unique_ptr<std::mutex> lock;
209 uint64_t pglog_ver_head = 1;
210 uint64_t pglog_ver_tail = 1;
211 uint64_t pglog_dup_ver_tail = 1;
212
213 // use big pool ids to avoid clashing with existing collections
214 static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
215
216 Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch)
217 : pg(pg), cid(pg), ch(_ch),
218 lock(new std::mutex) {
219 }
220};
221
222int destroy_collections(
223 std::unique_ptr<ObjectStore>& os,
224 std::vector<Collection>& collections)
225{
226 ObjectStore::Transaction t;
227 bool failed = false;
228 // remove our collections
229 for (auto& coll : collections) {
230 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
231 t.remove(coll.cid, pgmeta_oid);
232 t.remove_collection(coll.cid);
233 int r = os->queue_transaction(coll.ch, std::move(t));
234 if (r && !failed) {
235 derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
236 failed = true;
237 }
238 }
239 return 0;
240}
241
242int init_collections(std::unique_ptr<ObjectStore>& os,
243 uint64_t pool,
244 std::vector<Collection>& collections,
245 uint64_t count)
246{
247 ceph_assert(count > 0);
248 collections.reserve(count);
249
250 const int split_bits = cbits(count - 1);
251
252 {
253 // propagate Superblock object to ensure proper functioning of tools that
254 // need it. E.g. ceph-objectstore-tool
255 coll_t cid(coll_t::meta());
256 bool exists = os->collection_exists(cid);
257 if (!exists) {
258 auto ch = os->create_new_collection(cid);
259
260 OSDSuperblock superblock;
261 bufferlist bl;
262 encode(superblock, bl);
263
264 ObjectStore::Transaction t;
265 t.create_collection(cid, split_bits);
266 t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
267 int r = os->queue_transaction(ch, std::move(t));
268
269 if (r < 0) {
270 derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl;
271 return r;
272 }
273 }
274 }
275
276 for (uint32_t i = 0; i < count; i++) {
277 auto pg = spg_t{pg_t{i, pool}};
278 coll_t cid(pg);
279
280 bool exists = os->collection_exists(cid);
281 auto ch = exists ?
282 os->open_collection(cid) :
283 os->create_new_collection(cid) ;
284
285 collections.emplace_back(pg, ch);
286
287 ObjectStore::Transaction t;
288 auto& coll = collections.back();
289 if (!exists) {
290 t.create_collection(coll.cid, split_bits);
291 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
292 t.touch(coll.cid, pgmeta_oid);
293 int r = os->queue_transaction(coll.ch, std::move(t));
294 if (r) {
295 derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
296 destroy_collections(os, collections);
297 return r;
298 }
299 }
300 }
301 return 0;
302}
303
7c673cae
FG
304/// global engine state shared between all jobs within the process. this
305/// includes g_ceph_context and the ObjectStore instance
306struct Engine {
307 /// the initial g_ceph_context reference to be dropped on destruction
308 boost::intrusive_ptr<CephContext> cct;
309 std::unique_ptr<ObjectStore> os;
310
11fdf7f2
TL
311 std::vector<Collection> collections; //< shared collections to spread objects over
312
7c673cae
FG
313 std::mutex lock;
314 int ref_count;
11fdf7f2 315 const bool unlink; //< unlink objects on destruction
7c673cae 316
9f95a23c
TL
317 // file to which to output formatted perf information
318 const std::optional<std::string> perf_output_file;
319
11fdf7f2 320 explicit Engine(thread_data* td);
7c673cae
FG
321 ~Engine();
322
323 static Engine* get_instance(thread_data* td) {
324 // note: creates an Engine with the options associated with the first job
325 static Engine engine(td);
326 return &engine;
327 }
328
329 void ref() {
330 std::lock_guard<std::mutex> l(lock);
331 ++ref_count;
332 }
333 void deref() {
334 std::lock_guard<std::mutex> l(lock);
335 --ref_count;
336 if (!ref_count) {
337 ostringstream ostr;
9f95a23c
TL
338 Formatter* f = Formatter::create(
339 "json-pretty", "json-pretty", "json-pretty");
340 f->open_object_section("perf_output");
224ce89b 341 cct->get_perfcounters_collection()->dump_formatted(f, false);
11fdf7f2 342 if (g_conf()->rocksdb_perf) {
9f95a23c 343 f->open_object_section("rocksdb_perf");
31f18b77 344 os->get_db_statistics(f);
9f95a23c 345 f->close_section();
31f18b77 346 }
11fdf7f2 347 mempool::dump(f);
9f95a23c
TL
348 {
349 f->open_object_section("db_histogram");
350 os->generate_db_histogram(f);
351 f->close_section();
352 }
11fdf7f2 353 f->close_section();
11fdf7f2 354
11fdf7f2 355 f->flush(ostr);
7c673cae 356 delete f;
11fdf7f2
TL
357
358 if (unlink) {
359 destroy_collections(os, collections);
360 }
7c673cae 361 os->umount();
9f95a23c
TL
362 dout(0) << "FIO plugin perf dump:" << dendl;
363 dout(0) << ostr.str() << dendl;
364 if (perf_output_file) {
365 try {
366 std::ofstream foutput(*perf_output_file);
367 foutput << ostr.str() << std::endl;
368 } catch (std::exception &e) {
369 std::cerr << "Unable to write formatted output to "
370 << *perf_output_file
371 << ", exception: " << e.what()
372 << std::endl;
373 }
374 }
7c673cae
FG
375 }
376 }
377};
378
9f95a23c
TL
379TracepointProvider::Traits bluestore_tracepoint_traits("libbluestore_tp.so",
380 "bluestore_tracing");
381
11fdf7f2
TL
382Engine::Engine(thread_data* td)
383 : ref_count(0),
9f95a23c
TL
384 unlink(td->o.unlink),
385 perf_output_file(
386 static_cast<Options*>(td->eo)->perf_output_file ?
387 std::make_optional(static_cast<Options*>(td->eo)->perf_output_file) :
388 std::nullopt)
7c673cae
FG
389{
390 // add the ceph command line arguments
11fdf7f2 391 auto o = static_cast<Options*>(td->eo);
7c673cae
FG
392 if (!o->conf) {
393 throw std::runtime_error("missing conf option for ceph configuration file");
394 }
395 std::vector<const char*> args{
396 "-i", "0", // identify as osd.0 for osd_data and osd_journal
397 "--conf", o->conf, // use the requested conf file
398 };
399 if (td->o.directory) { // allow conf files to use ${fio_dir} for data
400 args.emplace_back("--fio_dir");
401 args.emplace_back(td->o.directory);
402 }
403
404 // claim the g_ceph_context reference and release it on destruction
405 cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
11fdf7f2
TL
406 CODE_ENVIRONMENT_UTILITY,
407 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
7c673cae
FG
408 common_init_finish(g_ceph_context);
409
9f95a23c
TL
410 TracepointProvider::initialize<bluestore_tracepoint_traits>(g_ceph_context);
411
7c673cae 412 // create the ObjectStore
20effc67
TL
413 os = ObjectStore::create(g_ceph_context,
414 g_conf().get_val<std::string>("osd objectstore"),
415 g_conf().get_val<std::string>("osd data"),
416 g_conf().get_val<std::string>("osd journal"));
7c673cae 417 if (!os)
11fdf7f2 418 throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore);
7c673cae 419
31f18b77 420 unsigned num_shards;
11fdf7f2
TL
421 if(g_conf()->osd_op_num_shards)
422 num_shards = g_conf()->osd_op_num_shards;
31f18b77 423 else if(os->is_rotational())
11fdf7f2 424 num_shards = g_conf()->osd_op_num_shards_hdd;
31f18b77 425 else
11fdf7f2 426 num_shards = g_conf()->osd_op_num_shards_ssd;
31f18b77 427 os->set_cache_shards(num_shards);
7c673cae 428
11fdf7f2
TL
429 //normalize options
430 o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high);
431 o->snapset_attr_len_high = max(o->snapset_attr_len_low,
432 o->snapset_attr_len_high);
433 o->pglog_omap_len_high = max(o->pglog_omap_len_low,
434 o->pglog_omap_len_high);
435 o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low,
436 o->pglog_dup_omap_len_high);
437 o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low,
438 o->_fastinfo_omap_len_high);
439
7c673cae
FG
440 int r = os->mkfs();
441 if (r < 0)
442 throw std::system_error(-r, std::system_category(), "mkfs failed");
443
444 r = os->mount();
445 if (r < 0)
446 throw std::system_error(-r, std::system_category(), "mount failed");
11fdf7f2
TL
447
448 // create shared collections up to osd_pool_default_pg_num
449 if (o->single_pool_mode) {
450 uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
451 if (count > td->o.nr_files)
452 count = td->o.nr_files;
453 init_collections(os, Collection::MIN_POOL_ID, collections, count);
454 }
7c673cae
FG
455}
456
457Engine::~Engine()
458{
11fdf7f2 459 ceph_assert(!ref_count);
7c673cae
FG
460}
461
7c673cae
FG
462struct Object {
463 ghobject_t oid;
464 Collection& coll;
465
466 Object(const char* name, Collection& coll)
467 : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")),
468 coll(coll) {}
469};
470
11fdf7f2
TL
471/// treat each fio job either like a separate pool with its own collections and objects
472/// or just a client using its own objects from the shared pool
7c673cae
FG
473struct Job {
474 Engine* engine; //< shared ptr to the global Engine
9f95a23c 475 const unsigned subjob_number; //< subjob num
11fdf7f2 476 std::vector<Collection> collections; //< job's private collections to spread objects over
7c673cae
FG
477 std::vector<Object> objects; //< associate an object with each fio_file
478 std::vector<io_u*> events; //< completions for fio_ceph_os_event()
479 const bool unlink; //< unlink objects on destruction
480
11fdf7f2
TL
481 bufferptr one_for_all_data; //< preallocated buffer long enough
482 //< to use for vairious operations
9f95a23c
TL
483 std::mutex throttle_lock;
484 const vector<unsigned> throttle_values;
485 const vector<unsigned> deferred_throttle_values;
486 std::chrono::duration<double> cycle_throttle_period;
487 mono_clock::time_point last = ceph::mono_clock::zero();
488 unsigned index = 0;
489
490 static vector<unsigned> parse_throttle_str(const char *p) {
491 vector<unsigned> ret;
492 if (p == nullptr) {
493 return ret;
494 }
495 ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable {
496 if (s.size() > 0) {
497 ret.push_back(std::stoul(std::string(s)));
498 }
499 });
500 return ret;
501 }
502 void check_throttle();
11fdf7f2 503
7c673cae
FG
504 Job(Engine* engine, const thread_data* td);
505 ~Job();
506};
507
508Job::Job(Engine* engine, const thread_data* td)
509 : engine(engine),
9f95a23c 510 subjob_number(td->subjob_number),
7c673cae 511 events(td->o.iodepth),
9f95a23c
TL
512 unlink(td->o.unlink),
513 throttle_values(
514 parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)),
515 deferred_throttle_values(
516 parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)),
517 cycle_throttle_period(
518 static_cast<Options*>(td->eo)->cycle_throttle_period)
7c673cae
FG
519{
520 engine->ref();
11fdf7f2
TL
521 auto o = static_cast<Options*>(td->eo);
522 unsigned long long max_data = max(o->oi_attr_len_high,
523 o->snapset_attr_len_high);
524 max_data = max(max_data, o->pglog_omap_len_high);
525 max_data = max(max_data, o->pglog_dup_omap_len_high);
526 max_data = max(max_data, o->_fastinfo_omap_len_high);
527 one_for_all_data = buffer::create(max_data);
528
529 std::vector<Collection>* colls;
530 // create private collections up to osd_pool_default_pg_num
531 if (!o->single_pool_mode) {
532 uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
533 if (count > td->o.nr_files)
534 count = td->o.nr_files;
535 // use the fio thread_number for our unique pool id
536 const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1;
537 init_collections(engine->os, pool, collections, count);
538 colls = &collections;
539 } else {
540 colls = &engine->collections;
7c673cae 541 }
7c673cae 542 const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
11fdf7f2 543 ObjectStore::Transaction t;
7c673cae
FG
544
545 // create an object for each file in the job
11fdf7f2 546 objects.reserve(td->o.nr_files);
9f95a23c 547 unsigned checked_or_preallocated = 0;
7c673cae
FG
548 for (uint32_t i = 0; i < td->o.nr_files; i++) {
549 auto f = td->files[i];
550 f->real_file_size = file_size;
551 f->engine_pos = i;
552
11fdf7f2
TL
553 // associate each object with a collection in a round-robin fashion.
554 auto& coll = (*colls)[i % colls->size()];
7c673cae
FG
555
556 objects.emplace_back(f->file_name, coll);
11fdf7f2
TL
557 if (o->preallocate_files) {
558 auto& oid = objects.back().oid;
559 t.touch(coll.cid, oid);
560 t.truncate(coll.cid, oid, file_size);
561 int r = engine->os->queue_transaction(coll.ch, std::move(t));
562 if (r) {
563 engine->deref();
564 throw std::system_error(r, std::system_category(), "job init");
565 }
566 }
9f95a23c
TL
567 if (o->check_files) {
568 auto& oid = objects.back().oid;
569 struct stat st;
570 int r = engine->os->stat(coll.ch, oid, &st);
571 if (r || ((unsigned)st.st_size) != file_size) {
572 derr << "Problem checking " << oid << ", r=" << r
573 << ", st.st_size=" << st.st_size
574 << ", file_size=" << file_size
575 << ", nr_files=" << td->o.nr_files << dendl;
576 engine->deref();
577 throw std::system_error(
578 r, std::system_category(), "job init -- cannot check file");
579 }
580 }
581 if (o->check_files || o->preallocate_files) {
582 ++checked_or_preallocated;
583 }
584 }
585 if (o->check_files) {
586 derr << "fio_ceph_objectstore checked " << checked_or_preallocated
587 << " files"<< dendl;
588 }
589 if (o->preallocate_files ){
590 derr << "fio_ceph_objectstore preallocated " << checked_or_preallocated
591 << " files"<< dendl;
7c673cae
FG
592 }
593}
594
595Job::~Job()
596{
597 if (unlink) {
598 ObjectStore::Transaction t;
11fdf7f2 599 bool failed = false;
7c673cae
FG
600 // remove our objects
601 for (auto& obj : objects) {
602 t.remove(obj.coll.cid, obj.oid);
11fdf7f2
TL
603 int r = engine->os->queue_transaction(obj.coll.ch, std::move(t));
604 if (r && !failed) {
605 derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
606 failed = true;
607 }
7c673cae 608 }
11fdf7f2 609 destroy_collections(engine->os, collections);
7c673cae
FG
610 }
611 engine->deref();
612}
613
9f95a23c
TL
614void Job::check_throttle()
615{
616 if (subjob_number != 0)
617 return;
618
619 std::lock_guard<std::mutex> l(throttle_lock);
620 if (throttle_values.empty() && deferred_throttle_values.empty())
621 return;
622
623 if (ceph::mono_clock::is_zero(last) ||
624 ((cycle_throttle_period != cycle_throttle_period.zero()) &&
625 (ceph::mono_clock::now() - last) > cycle_throttle_period)) {
626 unsigned tvals = throttle_values.size() ? throttle_values.size() : 1;
627 unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1;
628 if (!throttle_values.empty()) {
629 std::string val = std::to_string(throttle_values[index % tvals]);
630 std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl;
631 int r = engine->cct->_conf.set_val(
632 "bluestore_throttle_bytes",
633 val,
634 nullptr);
635 ceph_assert(r == 0);
636 }
637 if (!deferred_throttle_values.empty()) {
638 std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]);
639 std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl;
640 int r = engine->cct->_conf.set_val(
641 "bluestore_throttle_deferred_bytes",
642 val,
643 nullptr);
644 ceph_assert(r == 0);
645 }
646 engine->cct->_conf.apply_changes(nullptr);
647 index++;
648 index %= tvals * dtvals;
649 last = ceph::mono_clock::now();
650 }
651}
652
7c673cae
FG
653int fio_ceph_os_setup(thread_data* td)
654{
655 // if there are multiple jobs, they must run in the same process against a
656 // single instance of the ObjectStore. explicitly disable fio's default
657 // job-per-process configuration
658 td->o.use_thread = 1;
659
660 try {
661 // get or create the global Engine instance
662 auto engine = Engine::get_instance(td);
663 // create a Job for this thread
664 td->io_ops_data = new Job(engine, td);
665 } catch (std::exception& e) {
666 std::cerr << "setup failed with " << e.what() << std::endl;
667 return -1;
668 }
669 return 0;
670}
671
672void fio_ceph_os_cleanup(thread_data* td)
673{
674 auto job = static_cast<Job*>(td->io_ops_data);
675 td->io_ops_data = nullptr;
676 delete job;
677}
678
679
680io_u* fio_ceph_os_event(thread_data* td, int event)
681{
682 // return the requested event from fio_ceph_os_getevents()
683 auto job = static_cast<Job*>(td->io_ops_data);
684 return job->events[event];
685}
686
687int fio_ceph_os_getevents(thread_data* td, unsigned int min,
688 unsigned int max, const timespec* t)
689{
690 auto job = static_cast<Job*>(td->io_ops_data);
691 unsigned int events = 0;
11fdf7f2
TL
692 io_u* u = NULL;
693 unsigned int i = 0;
7c673cae
FG
694
695 // loop through inflight ios until we find 'min' completions
696 do {
697 io_u_qiter(&td->io_u_all, u, i) {
698 if (!(u->flags & IO_U_F_FLIGHT))
699 continue;
700
701 if (u->engine_data) {
702 u->engine_data = nullptr;
703 job->events[events] = u;
704 events++;
705 }
706 }
707 if (events >= min)
708 break;
709 usleep(100);
710 } while (1);
711
712 return events;
713}
714
715/// completion context for ObjectStore::queue_transaction()
716class UnitComplete : public Context {
717 io_u* u;
718 public:
11fdf7f2 719 explicit UnitComplete(io_u* u) : u(u) {}
7c673cae
FG
720 void finish(int r) {
721 // mark the pointer to indicate completion for fio_ceph_os_getevents()
722 u->engine_data = reinterpret_cast<void*>(1ull);
723 }
724};
725
11fdf7f2 726enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
7c673cae
FG
727{
728 fio_ro_check(td, u);
729
11fdf7f2
TL
730
731
732 auto o = static_cast<const Options*>(td->eo);
7c673cae
FG
733 auto job = static_cast<Job*>(td->io_ops_data);
734 auto& object = job->objects[u->file->engine_pos];
735 auto& coll = object.coll;
736 auto& os = job->engine->os;
737
9f95a23c
TL
738 job->check_throttle();
739
7c673cae
FG
740 if (u->ddir == DDIR_WRITE) {
741 // provide a hint if we're likely to read this data back
742 const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
743
744 bufferlist bl;
745 bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
746 u->xfer_buflen ) );
747
20effc67 748 map<string,bufferptr,less<>> attrset;
11fdf7f2
TL
749 map<string, bufferlist> omaps;
750 // enqueue a write transaction on the collection's handle
7c673cae 751 ObjectStore::Transaction t;
11fdf7f2
TL
752 char ver_key[64];
753
754 // fill attrs if any
755 if (o->oi_attr_len_high) {
756 ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low);
757 // fill with the garbage as we do not care of the actual content...
758 job->one_for_all_data.set_length(
759 ceph::util::generate_random_number(
760 o->oi_attr_len_low, o->oi_attr_len_high));
761 attrset["_"] = job->one_for_all_data;
762 }
763 if (o->snapset_attr_len_high) {
764 ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low);
765 job->one_for_all_data.set_length(
766 ceph::util::generate_random_number
767 (o->snapset_attr_len_low, o->snapset_attr_len_high));
768 attrset["snapset"] = job->one_for_all_data;
769
770 }
771 if (o->_fastinfo_omap_len_high) {
772 ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low);
773 // fill with the garbage as we do not care of the actual content...
774 job->one_for_all_data.set_length(
775 ceph::util::generate_random_number(
776 o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high));
777 omaps["_fastinfo"].append(job->one_for_all_data);
778 }
779
780 uint64_t pglog_trim_head = 0, pglog_trim_tail = 0;
781 uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0;
782 if (o->simulate_pglog) {
783
784 uint64_t pglog_ver_cnt = 0;
785 {
786 std::lock_guard<std::mutex> l(*coll.lock);
787 pglog_ver_cnt = coll.pglog_ver_head++;
788 if (o->pglog_omap_len_high &&
789 pglog_ver_cnt >=
790 coll.pglog_ver_tail +
791 g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) {
792 pglog_trim_tail = coll.pglog_ver_tail;
793 coll.pglog_ver_tail = pglog_trim_head =
794 pglog_trim_tail + g_conf()->osd_pg_log_trim_min;
795
796 if (o->pglog_dup_omap_len_high &&
797 pglog_ver_cnt >=
798 coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked +
799 g_conf()->osd_pg_log_trim_min) {
800 pglog_dup_trim_tail = coll.pglog_dup_ver_tail;
801 coll.pglog_dup_ver_tail = pglog_dup_trim_head =
802 pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min;
803 }
804 }
805 }
806
807 if (o->pglog_omap_len_high) {
808 ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low);
809 snprintf(ver_key, sizeof(ver_key),
810 "0000000011.%020llu", (unsigned long long)pglog_ver_cnt);
811 // fill with the garbage as we do not care of the actual content...
812 job->one_for_all_data.set_length(
813 ceph::util::generate_random_number(
814 o->pglog_omap_len_low, o->pglog_omap_len_high));
815 omaps[ver_key].append(job->one_for_all_data);
816 }
817 if (o->pglog_dup_omap_len_high) {
818 //insert dup
819 ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low);
820 for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
821 snprintf(ver_key, sizeof(ver_key),
822 "dup_0000000011.%020llu", (unsigned long long)i);
823 // fill with the garbage as we do not care of the actual content...
824 job->one_for_all_data.set_length(
825 ceph::util::generate_random_number(
826 o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high));
827 omaps[ver_key].append(job->one_for_all_data);
828 }
829 }
830 }
831
20effc67 832 if (!attrset.empty()) {
11fdf7f2
TL
833 t.setattrs(coll.cid, object.oid, attrset);
834 }
7c673cae 835 t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
11fdf7f2
TL
836
837 set<string> rmkeys;
838 for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
839 snprintf(ver_key, sizeof(ver_key),
840 "0000000011.%020llu", (unsigned long long)i);
841 rmkeys.emplace(ver_key);
842 }
843 for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) {
844 snprintf(ver_key, sizeof(ver_key),
845 "dup_0000000011.%020llu", (unsigned long long)i);
846 rmkeys.emplace(ver_key);
847 }
848
849 if (rmkeys.size()) {
850 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
851 t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys);
852 }
853
854 if (omaps.size()) {
855 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
856 t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
857 }
858 t.register_on_commit(new UnitComplete(u));
859 os->queue_transaction(coll.ch,
860 std::move(t));
7c673cae
FG
861 return FIO_Q_QUEUED;
862 }
863
864 if (u->ddir == DDIR_READ) {
865 // ObjectStore reads are synchronous, so make the call and return COMPLETED
866 bufferlist bl;
11fdf7f2 867 int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl);
7c673cae
FG
868 if (r < 0) {
869 u->error = r;
870 td_verror(td, u->error, "xfer");
871 } else {
9f95a23c 872 bl.begin().copy(bl.length(), static_cast<char*>(u->xfer_buf));
7c673cae
FG
873 u->resid = u->xfer_buflen - r;
874 }
875 return FIO_Q_COMPLETED;
876 }
877
878 derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl;
879 u->error = -EINVAL;
880 td_verror(td, u->error, "xfer");
881 return FIO_Q_COMPLETED;
882}
883
884int fio_ceph_os_commit(thread_data* td)
885{
886 // commit() allows the engine to batch up queued requests to be submitted all
887 // at once. it would be natural for queue() to collect transactions in a list,
888 // and use commit() to pass them all to ObjectStore::queue_transactions(). but
889 // because we spread objects over multiple collections, we a) need to use a
890 // different sequencer for each collection, and b) are less likely to see a
891 // benefit from batching requests within a collection
892 return 0;
893}
894
895// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
896// prevent fio from creating the files
897int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; }
898int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; }
899
900int fio_ceph_os_io_u_init(thread_data* td, io_u* u)
901{
902 // no data is allocated, we just use the pointer as a boolean 'completed' flag
903 u->engine_data = nullptr;
904 return 0;
905}
906
907void fio_ceph_os_io_u_free(thread_data* td, io_u* u)
908{
909 u->engine_data = nullptr;
910}
911
912
913// ioengine_ops for get_ioengine()
914struct ceph_ioengine : public ioengine_ops {
915 ceph_ioengine() : ioengine_ops({}) {
916 name = "ceph-os";
917 version = FIO_IOOPS_VERSION;
918 flags = FIO_DISKLESSIO;
919 setup = fio_ceph_os_setup;
920 queue = fio_ceph_os_queue;
921 commit = fio_ceph_os_commit;
922 getevents = fio_ceph_os_getevents;
923 event = fio_ceph_os_event;
924 cleanup = fio_ceph_os_cleanup;
925 open_file = fio_ceph_os_open;
926 close_file = fio_ceph_os_close;
927 io_u_init = fio_ceph_os_io_u_init;
928 io_u_free = fio_ceph_os_io_u_free;
929 options = ceph_options.data();
930 option_struct_size = sizeof(struct Options);
931 }
932};
933
934} // anonymous namespace
935
936extern "C" {
937// the exported fio engine interface
938void get_ioengine(struct ioengine_ops** ioengine_ptr) {
939 static ceph_ioengine ioengine;
940 *ioengine_ptr = &ioengine;
941}
942} // extern "C"