]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/fio/fio_ceph_objectstore.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / fio / fio_ceph_objectstore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph ObjectStore engine
5 *
6 * IO engine using Ceph's ObjectStore class to test low-level performance of
7 * Ceph OSDs.
8 *
9 */
10
11#include <memory>
12#include <system_error>
13#include <vector>
14
15#include "os/ObjectStore.h"
16#include "global/global_init.h"
17#include "common/errno.h"
18#include "include/intarith.h"
19#include "include/stringify.h"
11fdf7f2 20#include "include/random.h"
224ce89b 21#include "common/perf_counters.h"
7c673cae
FG
22
23#include <fio.h>
24#include <optgroup.h>
25
11fdf7f2
TL
26#include "include/ceph_assert.h" // fio.h clobbers our assert.h
27#include <algorithm>
7c673cae
FG
28
29#define dout_context g_ceph_context
30#define dout_subsys ceph_subsys_
31
32namespace {
33
34/// fio configuration options read from the job file
35struct Options {
36 thread_data* td;
37 char* conf;
11fdf7f2
TL
38 unsigned long long
39 oi_attr_len_low,
40 oi_attr_len_high,
41 snapset_attr_len_low,
42 snapset_attr_len_high,
43 pglog_omap_len_low,
44 pglog_omap_len_high,
45 pglog_dup_omap_len_low,
46 pglog_dup_omap_len_high,
47 _fastinfo_omap_len_low,
48 _fastinfo_omap_len_high;
49 unsigned simulate_pglog;
50 unsigned single_pool_mode;
51 unsigned preallocate_files;
7c673cae
FG
52};
53
54template <class Func> // void Func(fio_option&)
55fio_option make_option(Func&& func)
56{
57 // zero-initialize and set common defaults
58 auto o = fio_option{};
59 o.category = FIO_OPT_C_ENGINE;
60 o.group = FIO_OPT_G_RBD;
61 func(std::ref(o));
62 return o;
63}
64
65static std::vector<fio_option> ceph_options{
66 make_option([] (fio_option& o) {
67 o.name = "conf";
68 o.lname = "ceph configuration file";
69 o.type = FIO_OPT_STR_STORE;
70 o.help = "Path to a ceph configuration file";
71 o.off1 = offsetof(Options, conf);
72 }),
11fdf7f2
TL
73 make_option([] (fio_option& o) {
74 o.name = "oi_attr_len";
75 o.lname = "OI Attr length";
76 o.type = FIO_OPT_STR_VAL;
77 o.help = "Set OI(aka '_') attribute to specified length";
78 o.off1 = offsetof(Options, oi_attr_len_low);
79 o.off2 = offsetof(Options, oi_attr_len_high);
80 o.def = 0;
81 o.minval = 0;
82 }),
83 make_option([] (fio_option& o) {
84 o.name = "snapset_attr_len";
85 o.lname = "Attr 'snapset' length";
86 o.type = FIO_OPT_STR_VAL;
87 o.help = "Set 'snapset' attribute to specified length";
88 o.off1 = offsetof(Options, snapset_attr_len_low);
89 o.off2 = offsetof(Options, snapset_attr_len_high);
90 o.def = 0;
91 o.minval = 0;
92 }),
93 make_option([] (fio_option& o) {
94 o.name = "_fastinfo_omap_len";
95 o.lname = "'_fastinfo' omap entry length";
96 o.type = FIO_OPT_STR_VAL;
97 o.help = "Set '_fastinfo' OMAP attribute to specified length";
98 o.off1 = offsetof(Options, _fastinfo_omap_len_low);
99 o.off2 = offsetof(Options, _fastinfo_omap_len_high);
100 o.def = 0;
101 o.minval = 0;
102 }),
103 make_option([] (fio_option& o) {
104 o.name = "pglog_simulation";
105 o.lname = "pglog behavior simulation";
106 o.type = FIO_OPT_BOOL;
107 o.help = "Enables PG Log simulation behavior";
108 o.off1 = offsetof(Options, simulate_pglog);
109 o.def = "0";
110 }),
111 make_option([] (fio_option& o) {
112 o.name = "pglog_omap_len";
113 o.lname = "pglog omap entry length";
114 o.type = FIO_OPT_STR_VAL;
115 o.help = "Set pglog omap entry to specified length";
116 o.off1 = offsetof(Options, pglog_omap_len_low);
117 o.off2 = offsetof(Options, pglog_omap_len_high);
118 o.def = 0;
119 o.minval = 0;
120 }),
121 make_option([] (fio_option& o) {
122 o.name = "pglog_dup_omap_len";
123 o.lname = "uplicate pglog omap entry length";
124 o.type = FIO_OPT_STR_VAL;
125 o.help = "Set duplicate pglog omap entry to specified length";
126 o.off1 = offsetof(Options, pglog_dup_omap_len_low);
127 o.off2 = offsetof(Options, pglog_dup_omap_len_high);
128 o.def = 0;
129 o.minval = 0;
130 }),
131 make_option([] (fio_option& o) {
132 o.name = "single_pool_mode";
133 o.lname = "single(shared among jobs) pool mode";
134 o.type = FIO_OPT_BOOL;
135 o.help = "Enables the mode when all jobs run against the same pool";
136 o.off1 = offsetof(Options, single_pool_mode);
137 o.def = "0";
138 }),
139 make_option([] (fio_option& o) {
140 o.name = "preallocate_files";
141 o.lname = "preallocate files on init";
142 o.type = FIO_OPT_BOOL;
143 o.help = "Enables/disables file preallocation (touch and resize) on init";
144 o.off1 = offsetof(Options, preallocate_files);
145 o.def = "1";
146 }),
7c673cae
FG
147 {} // fio expects a 'null'-terminated list
148};
149
150
11fdf7f2
TL
151struct Collection {
152 spg_t pg;
153 coll_t cid;
154 ObjectStore::CollectionHandle ch;
155 // Can't use mutex directly in vectors hence dynamic allocation
156
157 std::unique_ptr<std::mutex> lock;
158 uint64_t pglog_ver_head = 1;
159 uint64_t pglog_ver_tail = 1;
160 uint64_t pglog_dup_ver_tail = 1;
161
162 // use big pool ids to avoid clashing with existing collections
163 static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
164
165 Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch)
166 : pg(pg), cid(pg), ch(_ch),
167 lock(new std::mutex) {
168 }
169};
170
171int destroy_collections(
172 std::unique_ptr<ObjectStore>& os,
173 std::vector<Collection>& collections)
174{
175 ObjectStore::Transaction t;
176 bool failed = false;
177 // remove our collections
178 for (auto& coll : collections) {
179 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
180 t.remove(coll.cid, pgmeta_oid);
181 t.remove_collection(coll.cid);
182 int r = os->queue_transaction(coll.ch, std::move(t));
183 if (r && !failed) {
184 derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
185 failed = true;
186 }
187 }
188 return 0;
189}
190
191int init_collections(std::unique_ptr<ObjectStore>& os,
192 uint64_t pool,
193 std::vector<Collection>& collections,
194 uint64_t count)
195{
196 ceph_assert(count > 0);
197 collections.reserve(count);
198
199 const int split_bits = cbits(count - 1);
200
201 {
202 // propagate Superblock object to ensure proper functioning of tools that
203 // need it. E.g. ceph-objectstore-tool
204 coll_t cid(coll_t::meta());
205 bool exists = os->collection_exists(cid);
206 if (!exists) {
207 auto ch = os->create_new_collection(cid);
208
209 OSDSuperblock superblock;
210 bufferlist bl;
211 encode(superblock, bl);
212
213 ObjectStore::Transaction t;
214 t.create_collection(cid, split_bits);
215 t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
216 int r = os->queue_transaction(ch, std::move(t));
217
218 if (r < 0) {
219 derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl;
220 return r;
221 }
222 }
223 }
224
225 for (uint32_t i = 0; i < count; i++) {
226 auto pg = spg_t{pg_t{i, pool}};
227 coll_t cid(pg);
228
229 bool exists = os->collection_exists(cid);
230 auto ch = exists ?
231 os->open_collection(cid) :
232 os->create_new_collection(cid) ;
233
234 collections.emplace_back(pg, ch);
235
236 ObjectStore::Transaction t;
237 auto& coll = collections.back();
238 if (!exists) {
239 t.create_collection(coll.cid, split_bits);
240 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
241 t.touch(coll.cid, pgmeta_oid);
242 int r = os->queue_transaction(coll.ch, std::move(t));
243 if (r) {
244 derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
245 destroy_collections(os, collections);
246 return r;
247 }
248 }
249 }
250 return 0;
251}
252
7c673cae
FG
253/// global engine state shared between all jobs within the process. this
254/// includes g_ceph_context and the ObjectStore instance
255struct Engine {
256 /// the initial g_ceph_context reference to be dropped on destruction
257 boost::intrusive_ptr<CephContext> cct;
258 std::unique_ptr<ObjectStore> os;
259
11fdf7f2
TL
260 std::vector<Collection> collections; //< shared collections to spread objects over
261
7c673cae
FG
262 std::mutex lock;
263 int ref_count;
11fdf7f2 264 const bool unlink; //< unlink objects on destruction
7c673cae 265
11fdf7f2 266 explicit Engine(thread_data* td);
7c673cae
FG
267 ~Engine();
268
269 static Engine* get_instance(thread_data* td) {
270 // note: creates an Engine with the options associated with the first job
271 static Engine engine(td);
272 return &engine;
273 }
274
275 void ref() {
276 std::lock_guard<std::mutex> l(lock);
277 ++ref_count;
278 }
279 void deref() {
280 std::lock_guard<std::mutex> l(lock);
281 --ref_count;
282 if (!ref_count) {
283 ostringstream ostr;
284 Formatter* f = Formatter::create("json-pretty", "json-pretty", "json-pretty");
224ce89b 285 cct->get_perfcounters_collection()->dump_formatted(f, false);
31f18b77 286 ostr << "FIO plugin ";
7c673cae 287 f->flush(ostr);
11fdf7f2 288 if (g_conf()->rocksdb_perf) {
31f18b77
FG
289 os->get_db_statistics(f);
290 ostr << "FIO get_db_statistics ";
291 f->flush(ostr);
292 }
11fdf7f2
TL
293 ostr << "Mempools: ";
294 f->open_object_section("mempools");
295 mempool::dump(f);
296 f->close_section();
297 f->flush(ostr);
298
299 ostr << "Generate db histogram: ";
300 os->generate_db_histogram(f);
301 f->flush(ostr);
7c673cae 302 delete f;
11fdf7f2
TL
303
304 if (unlink) {
305 destroy_collections(os, collections);
306 }
7c673cae 307 os->umount();
31f18b77 308 dout(0) << ostr.str() << dendl;
7c673cae
FG
309 }
310 }
311};
312
11fdf7f2
TL
313Engine::Engine(thread_data* td)
314 : ref_count(0),
315 unlink(td->o.unlink)
7c673cae
FG
316{
317 // add the ceph command line arguments
11fdf7f2 318 auto o = static_cast<Options*>(td->eo);
7c673cae
FG
319 if (!o->conf) {
320 throw std::runtime_error("missing conf option for ceph configuration file");
321 }
322 std::vector<const char*> args{
323 "-i", "0", // identify as osd.0 for osd_data and osd_journal
324 "--conf", o->conf, // use the requested conf file
325 };
326 if (td->o.directory) { // allow conf files to use ${fio_dir} for data
327 args.emplace_back("--fio_dir");
328 args.emplace_back(td->o.directory);
329 }
330
331 // claim the g_ceph_context reference and release it on destruction
332 cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
11fdf7f2
TL
333 CODE_ENVIRONMENT_UTILITY,
334 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
7c673cae
FG
335 common_init_finish(g_ceph_context);
336
337 // create the ObjectStore
338 os.reset(ObjectStore::create(g_ceph_context,
11fdf7f2
TL
339 g_conf().get_val<std::string>("osd objectstore"),
340 g_conf().get_val<std::string>("osd data"),
341 g_conf().get_val<std::string>("osd journal")));
7c673cae 342 if (!os)
11fdf7f2 343 throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore);
7c673cae 344
31f18b77 345 unsigned num_shards;
11fdf7f2
TL
346 if(g_conf()->osd_op_num_shards)
347 num_shards = g_conf()->osd_op_num_shards;
31f18b77 348 else if(os->is_rotational())
11fdf7f2 349 num_shards = g_conf()->osd_op_num_shards_hdd;
31f18b77 350 else
11fdf7f2 351 num_shards = g_conf()->osd_op_num_shards_ssd;
31f18b77 352 os->set_cache_shards(num_shards);
7c673cae 353
11fdf7f2
TL
354 //normalize options
355 o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high);
356 o->snapset_attr_len_high = max(o->snapset_attr_len_low,
357 o->snapset_attr_len_high);
358 o->pglog_omap_len_high = max(o->pglog_omap_len_low,
359 o->pglog_omap_len_high);
360 o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low,
361 o->pglog_dup_omap_len_high);
362 o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low,
363 o->_fastinfo_omap_len_high);
364
7c673cae
FG
365 int r = os->mkfs();
366 if (r < 0)
367 throw std::system_error(-r, std::system_category(), "mkfs failed");
368
369 r = os->mount();
370 if (r < 0)
371 throw std::system_error(-r, std::system_category(), "mount failed");
11fdf7f2
TL
372
373 // create shared collections up to osd_pool_default_pg_num
374 if (o->single_pool_mode) {
375 uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
376 if (count > td->o.nr_files)
377 count = td->o.nr_files;
378 init_collections(os, Collection::MIN_POOL_ID, collections, count);
379 }
7c673cae
FG
380}
381
382Engine::~Engine()
383{
11fdf7f2 384 ceph_assert(!ref_count);
7c673cae
FG
385}
386
7c673cae
FG
387struct Object {
388 ghobject_t oid;
389 Collection& coll;
390
391 Object(const char* name, Collection& coll)
392 : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")),
393 coll(coll) {}
394};
395
11fdf7f2
TL
396/// treat each fio job either like a separate pool with its own collections and objects
397/// or just a client using its own objects from the shared pool
7c673cae
FG
398struct Job {
399 Engine* engine; //< shared ptr to the global Engine
11fdf7f2 400 std::vector<Collection> collections; //< job's private collections to spread objects over
7c673cae
FG
401 std::vector<Object> objects; //< associate an object with each fio_file
402 std::vector<io_u*> events; //< completions for fio_ceph_os_event()
403 const bool unlink; //< unlink objects on destruction
404
11fdf7f2
TL
405 bufferptr one_for_all_data; //< preallocated buffer long enough
406 //< to use for vairious operations
407
7c673cae
FG
408 Job(Engine* engine, const thread_data* td);
409 ~Job();
410};
411
412Job::Job(Engine* engine, const thread_data* td)
413 : engine(engine),
414 events(td->o.iodepth),
415 unlink(td->o.unlink)
416{
417 engine->ref();
11fdf7f2
TL
418 auto o = static_cast<Options*>(td->eo);
419 unsigned long long max_data = max(o->oi_attr_len_high,
420 o->snapset_attr_len_high);
421 max_data = max(max_data, o->pglog_omap_len_high);
422 max_data = max(max_data, o->pglog_dup_omap_len_high);
423 max_data = max(max_data, o->_fastinfo_omap_len_high);
424 one_for_all_data = buffer::create(max_data);
425
426 std::vector<Collection>* colls;
427 // create private collections up to osd_pool_default_pg_num
428 if (!o->single_pool_mode) {
429 uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
430 if (count > td->o.nr_files)
431 count = td->o.nr_files;
432 // use the fio thread_number for our unique pool id
433 const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1;
434 init_collections(engine->os, pool, collections, count);
435 colls = &collections;
436 } else {
437 colls = &engine->collections;
7c673cae 438 }
7c673cae 439 const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
11fdf7f2 440 ObjectStore::Transaction t;
7c673cae
FG
441
442 // create an object for each file in the job
11fdf7f2 443 objects.reserve(td->o.nr_files);
7c673cae
FG
444 for (uint32_t i = 0; i < td->o.nr_files; i++) {
445 auto f = td->files[i];
446 f->real_file_size = file_size;
447 f->engine_pos = i;
448
11fdf7f2
TL
449 // associate each object with a collection in a round-robin fashion.
450 auto& coll = (*colls)[i % colls->size()];
7c673cae
FG
451
452 objects.emplace_back(f->file_name, coll);
11fdf7f2
TL
453 if (o->preallocate_files) {
454 auto& oid = objects.back().oid;
455 t.touch(coll.cid, oid);
456 t.truncate(coll.cid, oid, file_size);
457 int r = engine->os->queue_transaction(coll.ch, std::move(t));
458 if (r) {
459 engine->deref();
460 throw std::system_error(r, std::system_category(), "job init");
461 }
462 }
7c673cae
FG
463 }
464}
465
466Job::~Job()
467{
468 if (unlink) {
469 ObjectStore::Transaction t;
11fdf7f2 470 bool failed = false;
7c673cae
FG
471 // remove our objects
472 for (auto& obj : objects) {
473 t.remove(obj.coll.cid, obj.oid);
11fdf7f2
TL
474 int r = engine->os->queue_transaction(obj.coll.ch, std::move(t));
475 if (r && !failed) {
476 derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
477 failed = true;
478 }
7c673cae 479 }
11fdf7f2 480 destroy_collections(engine->os, collections);
7c673cae
FG
481 }
482 engine->deref();
483}
484
7c673cae
FG
485int fio_ceph_os_setup(thread_data* td)
486{
487 // if there are multiple jobs, they must run in the same process against a
488 // single instance of the ObjectStore. explicitly disable fio's default
489 // job-per-process configuration
490 td->o.use_thread = 1;
491
492 try {
493 // get or create the global Engine instance
494 auto engine = Engine::get_instance(td);
495 // create a Job for this thread
496 td->io_ops_data = new Job(engine, td);
497 } catch (std::exception& e) {
498 std::cerr << "setup failed with " << e.what() << std::endl;
499 return -1;
500 }
501 return 0;
502}
503
504void fio_ceph_os_cleanup(thread_data* td)
505{
506 auto job = static_cast<Job*>(td->io_ops_data);
507 td->io_ops_data = nullptr;
508 delete job;
509}
510
511
512io_u* fio_ceph_os_event(thread_data* td, int event)
513{
514 // return the requested event from fio_ceph_os_getevents()
515 auto job = static_cast<Job*>(td->io_ops_data);
516 return job->events[event];
517}
518
519int fio_ceph_os_getevents(thread_data* td, unsigned int min,
520 unsigned int max, const timespec* t)
521{
522 auto job = static_cast<Job*>(td->io_ops_data);
523 unsigned int events = 0;
11fdf7f2
TL
524 io_u* u = NULL;
525 unsigned int i = 0;
7c673cae
FG
526
527 // loop through inflight ios until we find 'min' completions
528 do {
529 io_u_qiter(&td->io_u_all, u, i) {
530 if (!(u->flags & IO_U_F_FLIGHT))
531 continue;
532
533 if (u->engine_data) {
534 u->engine_data = nullptr;
535 job->events[events] = u;
536 events++;
537 }
538 }
539 if (events >= min)
540 break;
541 usleep(100);
542 } while (1);
543
544 return events;
545}
546
547/// completion context for ObjectStore::queue_transaction()
548class UnitComplete : public Context {
549 io_u* u;
550 public:
11fdf7f2 551 explicit UnitComplete(io_u* u) : u(u) {}
7c673cae
FG
552 void finish(int r) {
553 // mark the pointer to indicate completion for fio_ceph_os_getevents()
554 u->engine_data = reinterpret_cast<void*>(1ull);
555 }
556};
557
11fdf7f2 558enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
7c673cae
FG
559{
560 fio_ro_check(td, u);
561
11fdf7f2
TL
562
563
564 auto o = static_cast<const Options*>(td->eo);
7c673cae
FG
565 auto job = static_cast<Job*>(td->io_ops_data);
566 auto& object = job->objects[u->file->engine_pos];
567 auto& coll = object.coll;
568 auto& os = job->engine->os;
569
570 if (u->ddir == DDIR_WRITE) {
571 // provide a hint if we're likely to read this data back
572 const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
573
574 bufferlist bl;
575 bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
576 u->xfer_buflen ) );
577
11fdf7f2
TL
578 map<string,bufferptr> attrset;
579 map<string, bufferlist> omaps;
580 // enqueue a write transaction on the collection's handle
7c673cae 581 ObjectStore::Transaction t;
11fdf7f2
TL
582 char ver_key[64];
583
584 // fill attrs if any
585 if (o->oi_attr_len_high) {
586 ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low);
587 // fill with the garbage as we do not care of the actual content...
588 job->one_for_all_data.set_length(
589 ceph::util::generate_random_number(
590 o->oi_attr_len_low, o->oi_attr_len_high));
591 attrset["_"] = job->one_for_all_data;
592 }
593 if (o->snapset_attr_len_high) {
594 ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low);
595 job->one_for_all_data.set_length(
596 ceph::util::generate_random_number
597 (o->snapset_attr_len_low, o->snapset_attr_len_high));
598 attrset["snapset"] = job->one_for_all_data;
599
600 }
601 if (o->_fastinfo_omap_len_high) {
602 ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low);
603 // fill with the garbage as we do not care of the actual content...
604 job->one_for_all_data.set_length(
605 ceph::util::generate_random_number(
606 o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high));
607 omaps["_fastinfo"].append(job->one_for_all_data);
608 }
609
610 uint64_t pglog_trim_head = 0, pglog_trim_tail = 0;
611 uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0;
612 if (o->simulate_pglog) {
613
614 uint64_t pglog_ver_cnt = 0;
615 {
616 std::lock_guard<std::mutex> l(*coll.lock);
617 pglog_ver_cnt = coll.pglog_ver_head++;
618 if (o->pglog_omap_len_high &&
619 pglog_ver_cnt >=
620 coll.pglog_ver_tail +
621 g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) {
622 pglog_trim_tail = coll.pglog_ver_tail;
623 coll.pglog_ver_tail = pglog_trim_head =
624 pglog_trim_tail + g_conf()->osd_pg_log_trim_min;
625
626 if (o->pglog_dup_omap_len_high &&
627 pglog_ver_cnt >=
628 coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked +
629 g_conf()->osd_pg_log_trim_min) {
630 pglog_dup_trim_tail = coll.pglog_dup_ver_tail;
631 coll.pglog_dup_ver_tail = pglog_dup_trim_head =
632 pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min;
633 }
634 }
635 }
636
637 if (o->pglog_omap_len_high) {
638 ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low);
639 snprintf(ver_key, sizeof(ver_key),
640 "0000000011.%020llu", (unsigned long long)pglog_ver_cnt);
641 // fill with the garbage as we do not care of the actual content...
642 job->one_for_all_data.set_length(
643 ceph::util::generate_random_number(
644 o->pglog_omap_len_low, o->pglog_omap_len_high));
645 omaps[ver_key].append(job->one_for_all_data);
646 }
647 if (o->pglog_dup_omap_len_high) {
648 //insert dup
649 ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low);
650 for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
651 snprintf(ver_key, sizeof(ver_key),
652 "dup_0000000011.%020llu", (unsigned long long)i);
653 // fill with the garbage as we do not care of the actual content...
654 job->one_for_all_data.set_length(
655 ceph::util::generate_random_number(
656 o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high));
657 omaps[ver_key].append(job->one_for_all_data);
658 }
659 }
660 }
661
662 if (attrset.size()) {
663 t.setattrs(coll.cid, object.oid, attrset);
664 }
7c673cae 665 t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
11fdf7f2
TL
666
667 set<string> rmkeys;
668 for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
669 snprintf(ver_key, sizeof(ver_key),
670 "0000000011.%020llu", (unsigned long long)i);
671 rmkeys.emplace(ver_key);
672 }
673 for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) {
674 snprintf(ver_key, sizeof(ver_key),
675 "dup_0000000011.%020llu", (unsigned long long)i);
676 rmkeys.emplace(ver_key);
677 }
678
679 if (rmkeys.size()) {
680 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
681 t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys);
682 }
683
684 if (omaps.size()) {
685 ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
686 t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
687 }
688 t.register_on_commit(new UnitComplete(u));
689 os->queue_transaction(coll.ch,
690 std::move(t));
7c673cae
FG
691 return FIO_Q_QUEUED;
692 }
693
694 if (u->ddir == DDIR_READ) {
695 // ObjectStore reads are synchronous, so make the call and return COMPLETED
696 bufferlist bl;
11fdf7f2 697 int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl);
7c673cae
FG
698 if (r < 0) {
699 u->error = r;
700 td_verror(td, u->error, "xfer");
701 } else {
702 bl.copy(0, bl.length(), static_cast<char*>(u->xfer_buf));
703 u->resid = u->xfer_buflen - r;
704 }
705 return FIO_Q_COMPLETED;
706 }
707
708 derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl;
709 u->error = -EINVAL;
710 td_verror(td, u->error, "xfer");
711 return FIO_Q_COMPLETED;
712}
713
714int fio_ceph_os_commit(thread_data* td)
715{
716 // commit() allows the engine to batch up queued requests to be submitted all
717 // at once. it would be natural for queue() to collect transactions in a list,
718 // and use commit() to pass them all to ObjectStore::queue_transactions(). but
719 // because we spread objects over multiple collections, we a) need to use a
720 // different sequencer for each collection, and b) are less likely to see a
721 // benefit from batching requests within a collection
722 return 0;
723}
724
725// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
726// prevent fio from creating the files
727int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; }
728int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; }
729
730int fio_ceph_os_io_u_init(thread_data* td, io_u* u)
731{
732 // no data is allocated, we just use the pointer as a boolean 'completed' flag
733 u->engine_data = nullptr;
734 return 0;
735}
736
737void fio_ceph_os_io_u_free(thread_data* td, io_u* u)
738{
739 u->engine_data = nullptr;
740}
741
742
743// ioengine_ops for get_ioengine()
744struct ceph_ioengine : public ioengine_ops {
745 ceph_ioengine() : ioengine_ops({}) {
746 name = "ceph-os";
747 version = FIO_IOOPS_VERSION;
748 flags = FIO_DISKLESSIO;
749 setup = fio_ceph_os_setup;
750 queue = fio_ceph_os_queue;
751 commit = fio_ceph_os_commit;
752 getevents = fio_ceph_os_getevents;
753 event = fio_ceph_os_event;
754 cleanup = fio_ceph_os_cleanup;
755 open_file = fio_ceph_os_open;
756 close_file = fio_ceph_os_close;
757 io_u_init = fio_ceph_os_io_u_init;
758 io_u_free = fio_ceph_os_io_u_free;
759 options = ceph_options.data();
760 option_struct_size = sizeof(struct Options);
761 }
762};
763
764} // anonymous namespace
765
766extern "C" {
767// the exported fio engine interface
768void get_ioengine(struct ioengine_ops** ioengine_ptr) {
769 static ceph_ioengine ioengine;
770 *ioengine_ptr = &ioengine;
771}
772} // extern "C"