]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/fio/fio_ceph_objectstore.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / test / fio / fio_ceph_objectstore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph ObjectStore engine
5 *
6 * IO engine using Ceph's ObjectStore class to test low-level performance of
7 * Ceph OSDs.
8 *
9 */
10
11#include <memory>
12#include <system_error>
13#include <vector>
14
15#include "os/ObjectStore.h"
16#include "global/global_init.h"
17#include "common/errno.h"
18#include "include/intarith.h"
19#include "include/stringify.h"
20
21#include <fio.h>
22#include <optgroup.h>
23
24#include "include/assert.h" // fio.h clobbers our assert.h
25
26#define dout_context g_ceph_context
27#define dout_subsys ceph_subsys_
28
29namespace {
30
31/// fio configuration options read from the job file
32struct Options {
33 thread_data* td;
34 char* conf;
35};
36
37template <class Func> // void Func(fio_option&)
38fio_option make_option(Func&& func)
39{
40 // zero-initialize and set common defaults
41 auto o = fio_option{};
42 o.category = FIO_OPT_C_ENGINE;
43 o.group = FIO_OPT_G_RBD;
44 func(std::ref(o));
45 return o;
46}
47
48static std::vector<fio_option> ceph_options{
49 make_option([] (fio_option& o) {
50 o.name = "conf";
51 o.lname = "ceph configuration file";
52 o.type = FIO_OPT_STR_STORE;
53 o.help = "Path to a ceph configuration file";
54 o.off1 = offsetof(Options, conf);
55 }),
56 {} // fio expects a 'null'-terminated list
57};
58
59
60/// global engine state shared between all jobs within the process. this
61/// includes g_ceph_context and the ObjectStore instance
62struct Engine {
63 /// the initial g_ceph_context reference to be dropped on destruction
64 boost::intrusive_ptr<CephContext> cct;
65 std::unique_ptr<ObjectStore> os;
66
67 std::mutex lock;
68 int ref_count;
69
70 Engine(const thread_data* td);
71 ~Engine();
72
73 static Engine* get_instance(thread_data* td) {
74 // note: creates an Engine with the options associated with the first job
75 static Engine engine(td);
76 return &engine;
77 }
78
79 void ref() {
80 std::lock_guard<std::mutex> l(lock);
81 ++ref_count;
82 }
83 void deref() {
84 std::lock_guard<std::mutex> l(lock);
85 --ref_count;
86 if (!ref_count) {
87 ostringstream ostr;
88 Formatter* f = Formatter::create("json-pretty", "json-pretty", "json-pretty");
89 os->dump_perf_counters(f);
90 f->flush(ostr);
91 delete f;
92 os->umount();
93 dout(0) << "FIO plugin " << ostr.str() << dendl;
94 }
95 }
96};
97
98Engine::Engine(const thread_data* td) : ref_count(0)
99{
100 // add the ceph command line arguments
101 auto o = static_cast<const Options*>(td->eo);
102 if (!o->conf) {
103 throw std::runtime_error("missing conf option for ceph configuration file");
104 }
105 std::vector<const char*> args{
106 "-i", "0", // identify as osd.0 for osd_data and osd_journal
107 "--conf", o->conf, // use the requested conf file
108 };
109 if (td->o.directory) { // allow conf files to use ${fio_dir} for data
110 args.emplace_back("--fio_dir");
111 args.emplace_back(td->o.directory);
112 }
113
114 // claim the g_ceph_context reference and release it on destruction
115 cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
116 CODE_ENVIRONMENT_UTILITY, 0);
117 common_init_finish(g_ceph_context);
118
119 // create the ObjectStore
120 os.reset(ObjectStore::create(g_ceph_context,
121 g_conf->osd_objectstore,
122 g_conf->osd_data,
123 g_conf->osd_journal));
124 if (!os)
125 throw std::runtime_error("bad objectstore type " + g_conf->osd_objectstore);
126
127 os->set_cache_shards(g_conf->osd_op_num_shards);
128
129 int r = os->mkfs();
130 if (r < 0)
131 throw std::system_error(-r, std::system_category(), "mkfs failed");
132
133 r = os->mount();
134 if (r < 0)
135 throw std::system_error(-r, std::system_category(), "mount failed");
136}
137
138Engine::~Engine()
139{
140 assert(!ref_count);
141}
142
143
144struct Collection {
145 spg_t pg;
146 coll_t cid;
147 ObjectStore::Sequencer sequencer;
148
149 // use big pool ids to avoid clashing with existing collections
150 static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
151
152 Collection(const spg_t& pg)
153 : pg(pg), cid(pg), sequencer(stringify(pg)) {}
154};
155
156struct Object {
157 ghobject_t oid;
158 Collection& coll;
159
160 Object(const char* name, Collection& coll)
161 : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")),
162 coll(coll) {}
163};
164
165/// treat each fio job like a separate pool with its own collections and objects
166struct Job {
167 Engine* engine; //< shared ptr to the global Engine
168 std::vector<Collection> collections; //< spread objects over collections
169 std::vector<Object> objects; //< associate an object with each fio_file
170 std::vector<io_u*> events; //< completions for fio_ceph_os_event()
171 const bool unlink; //< unlink objects on destruction
172
173 Job(Engine* engine, const thread_data* td);
174 ~Job();
175};
176
177Job::Job(Engine* engine, const thread_data* td)
178 : engine(engine),
179 events(td->o.iodepth),
180 unlink(td->o.unlink)
181{
182 engine->ref();
183 // use the fio thread_number for our unique pool id
184 const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number;
185
186 // create a collection for each object, up to osd_pool_default_pg_num
187 uint32_t count = g_conf->osd_pool_default_pg_num;
188 if (count > td->o.nr_files)
189 count = td->o.nr_files;
190
191 assert(count > 0);
192 collections.reserve(count);
193
194 const int split_bits = cbits(count - 1);
195
196 ObjectStore::Transaction t;
197 for (uint32_t i = 0; i < count; i++) {
198 auto pg = spg_t{pg_t{i, pool}};
199 collections.emplace_back(pg);
200
201 auto& cid = collections.back().cid;
202 if (!engine->os->collection_exists(cid))
203 t.create_collection(cid, split_bits);
204 }
205
206 const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
207
208 // create an object for each file in the job
209 for (uint32_t i = 0; i < td->o.nr_files; i++) {
210 auto f = td->files[i];
211 f->real_file_size = file_size;
212 f->engine_pos = i;
213
214 // associate each object with a collection in a round-robin fashion
215 auto& coll = collections[i % collections.size()];
216
217 objects.emplace_back(f->file_name, coll);
218 auto& oid = objects.back().oid;
219
220 t.touch(coll.cid, oid);
221 t.truncate(coll.cid, oid, file_size);
222 }
223
224 // apply the entire transaction synchronously
225 ObjectStore::Sequencer sequencer("job init");
226 int r = engine->os->apply_transaction(&sequencer, std::move(t));
227 if (r) {
228 engine->deref();
229 throw std::system_error(r, std::system_category(), "job init");
230 }
231}
232
233Job::~Job()
234{
235 if (unlink) {
236 ObjectStore::Transaction t;
237 // remove our objects
238 for (auto& obj : objects) {
239 t.remove(obj.coll.cid, obj.oid);
240 }
241 // remove our collections
242 for (auto& coll : collections) {
243 t.remove_collection(coll.cid);
244 }
245 ObjectStore::Sequencer sequencer("job cleanup");
246 int r = engine->os->apply_transaction(&sequencer, std::move(t));
247 if (r)
248 derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
249 }
250 engine->deref();
251}
252
253
254int fio_ceph_os_setup(thread_data* td)
255{
256 // if there are multiple jobs, they must run in the same process against a
257 // single instance of the ObjectStore. explicitly disable fio's default
258 // job-per-process configuration
259 td->o.use_thread = 1;
260
261 try {
262 // get or create the global Engine instance
263 auto engine = Engine::get_instance(td);
264 // create a Job for this thread
265 td->io_ops_data = new Job(engine, td);
266 } catch (std::exception& e) {
267 std::cerr << "setup failed with " << e.what() << std::endl;
268 return -1;
269 }
270 return 0;
271}
272
273void fio_ceph_os_cleanup(thread_data* td)
274{
275 auto job = static_cast<Job*>(td->io_ops_data);
276 td->io_ops_data = nullptr;
277 delete job;
278}
279
280
281io_u* fio_ceph_os_event(thread_data* td, int event)
282{
283 // return the requested event from fio_ceph_os_getevents()
284 auto job = static_cast<Job*>(td->io_ops_data);
285 return job->events[event];
286}
287
288int fio_ceph_os_getevents(thread_data* td, unsigned int min,
289 unsigned int max, const timespec* t)
290{
291 auto job = static_cast<Job*>(td->io_ops_data);
292 unsigned int events = 0;
293 io_u* u;
294 unsigned int i;
295
296 // loop through inflight ios until we find 'min' completions
297 do {
298 io_u_qiter(&td->io_u_all, u, i) {
299 if (!(u->flags & IO_U_F_FLIGHT))
300 continue;
301
302 if (u->engine_data) {
303 u->engine_data = nullptr;
304 job->events[events] = u;
305 events++;
306 }
307 }
308 if (events >= min)
309 break;
310 usleep(100);
311 } while (1);
312
313 return events;
314}
315
316/// completion context for ObjectStore::queue_transaction()
317class UnitComplete : public Context {
318 io_u* u;
319 public:
320 UnitComplete(io_u* u) : u(u) {}
321 void finish(int r) {
322 // mark the pointer to indicate completion for fio_ceph_os_getevents()
323 u->engine_data = reinterpret_cast<void*>(1ull);
324 }
325};
326
327int fio_ceph_os_queue(thread_data* td, io_u* u)
328{
329 fio_ro_check(td, u);
330
331 auto job = static_cast<Job*>(td->io_ops_data);
332 auto& object = job->objects[u->file->engine_pos];
333 auto& coll = object.coll;
334 auto& os = job->engine->os;
335
336 if (u->ddir == DDIR_WRITE) {
337 // provide a hint if we're likely to read this data back
338 const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
339
340 bufferlist bl;
341 bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
342 u->xfer_buflen ) );
343
344 // enqueue a write transaction on the collection's sequencer
345 ObjectStore::Transaction t;
346 t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
347 os->queue_transaction(&coll.sequencer,
348 std::move(t),
349 nullptr,
350 new UnitComplete(u));
351 return FIO_Q_QUEUED;
352 }
353
354 if (u->ddir == DDIR_READ) {
355 // ObjectStore reads are synchronous, so make the call and return COMPLETED
356 bufferlist bl;
357 int r = os->read(coll.cid, object.oid, u->offset, u->xfer_buflen, bl);
358 if (r < 0) {
359 u->error = r;
360 td_verror(td, u->error, "xfer");
361 } else {
362 bl.copy(0, bl.length(), static_cast<char*>(u->xfer_buf));
363 u->resid = u->xfer_buflen - r;
364 }
365 return FIO_Q_COMPLETED;
366 }
367
368 derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl;
369 u->error = -EINVAL;
370 td_verror(td, u->error, "xfer");
371 return FIO_Q_COMPLETED;
372}
373
374int fio_ceph_os_commit(thread_data* td)
375{
376 // commit() allows the engine to batch up queued requests to be submitted all
377 // at once. it would be natural for queue() to collect transactions in a list,
378 // and use commit() to pass them all to ObjectStore::queue_transactions(). but
379 // because we spread objects over multiple collections, we a) need to use a
380 // different sequencer for each collection, and b) are less likely to see a
381 // benefit from batching requests within a collection
382 return 0;
383}
384
385// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
386// prevent fio from creating the files
387int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; }
388int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; }
389
390int fio_ceph_os_io_u_init(thread_data* td, io_u* u)
391{
392 // no data is allocated, we just use the pointer as a boolean 'completed' flag
393 u->engine_data = nullptr;
394 return 0;
395}
396
397void fio_ceph_os_io_u_free(thread_data* td, io_u* u)
398{
399 u->engine_data = nullptr;
400}
401
402
403// ioengine_ops for get_ioengine()
404struct ceph_ioengine : public ioengine_ops {
405 ceph_ioengine() : ioengine_ops({}) {
406 name = "ceph-os";
407 version = FIO_IOOPS_VERSION;
408 flags = FIO_DISKLESSIO;
409 setup = fio_ceph_os_setup;
410 queue = fio_ceph_os_queue;
411 commit = fio_ceph_os_commit;
412 getevents = fio_ceph_os_getevents;
413 event = fio_ceph_os_event;
414 cleanup = fio_ceph_os_cleanup;
415 open_file = fio_ceph_os_open;
416 close_file = fio_ceph_os_close;
417 io_u_init = fio_ceph_os_io_u_init;
418 io_u_free = fio_ceph_os_io_u_free;
419 options = ceph_options.data();
420 option_struct_size = sizeof(struct Options);
421 }
422};
423
424} // anonymous namespace
425
426extern "C" {
427// the exported fio engine interface
428void get_ioengine(struct ioengine_ops** ioengine_ptr) {
429 static ceph_ioengine ioengine;
430 *ioengine_ptr = &ioengine;
431}
432} // extern "C"