]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * CEPH messenger engine | |
5 | * | |
6 | * FIO engine which uses ceph messenger as a transport. See corresponding | |
7 | * FIO client and server jobs for details. | |
8 | */ | |
9 | ||
10 | #include "global/global_init.h" | |
11 | #include "msg/Messenger.h" | |
12 | #include "messages/MOSDOp.h" | |
13 | #include "messages/MOSDOpReply.h" | |
14 | #include "common/perf_counters.h" | |
15 | #include "auth/DummyAuth.h" | |
16 | #include "ring_buffer.h" | |
17 | ||
18 | #include <fio.h> | |
19 | #include <flist.h> | |
20 | #include <optgroup.h> | |
21 | ||
22 | #define dout_context g_ceph_context | |
23 | #define dout_subsys ceph_subsys_ | |
24 | ||
20effc67 TL |
25 | using namespace std; |
26 | ||
9f95a23c TL |
27 | enum ceph_msgr_type { |
28 | CEPH_MSGR_TYPE_UNDEF, | |
29 | CEPH_MSGR_TYPE_POSIX, | |
30 | CEPH_MSGR_TYPE_DPDK, | |
31 | CEPH_MSGR_TYPE_RDMA, | |
32 | }; | |
33 | ||
34 | const char *ceph_msgr_types[] = { "undef", "async+posix", | |
35 | "async+dpdk", "async+rdma" }; | |
36 | ||
37 | struct ceph_msgr_options { | |
38 | struct thread_data *td__; | |
39 | unsigned int is_receiver; | |
40 | unsigned int is_single; | |
41 | unsigned int port; | |
42 | const char *hostname; | |
43 | const char *conffile; | |
44 | enum ceph_msgr_type ms_type; | |
45 | }; | |
46 | ||
47 | class FioDispatcher; | |
48 | ||
49 | struct ceph_msgr_data { | |
50 | ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) : | |
51 | o(o_) { | |
52 | INIT_FLIST_HEAD(&io_inflight_list); | |
53 | INIT_FLIST_HEAD(&io_pending_list); | |
54 | ring_buffer_init(&io_completed_q, iodepth); | |
55 | pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE); | |
56 | } | |
57 | ||
58 | struct ceph_msgr_options *o; | |
59 | Messenger *msgr = NULL; | |
60 | FioDispatcher *disp = NULL; | |
61 | pthread_spinlock_t spin; | |
62 | struct ring_buffer io_completed_q; | |
63 | struct flist_head io_inflight_list; | |
64 | struct flist_head io_pending_list; | |
65 | unsigned int io_inflight_nr = 0; | |
66 | unsigned int io_pending_nr = 0; | |
67 | }; | |
68 | ||
69 | struct ceph_msgr_io { | |
70 | struct flist_head list; | |
71 | struct ceph_msgr_data *data; | |
72 | struct io_u *io_u; | |
73 | MOSDOp *req_msg; /** Cached request, valid only for sender */ | |
74 | }; | |
75 | ||
76 | struct ceph_msgr_reply_io { | |
77 | struct flist_head list; | |
78 | MOSDOpReply *rep; | |
79 | }; | |
80 | ||
81 | static void *str_to_ptr(const std::string &str) | |
82 | { | |
1e59de90 TL |
83 | // str is assumed to be a valid ptr string |
84 | return reinterpret_cast<void*>(ceph::parse<uintptr_t>(str, 16).value()); | |
9f95a23c TL |
85 | } |
86 | ||
87 | static std::string ptr_to_str(void *ptr) | |
88 | { | |
89 | char buf[32]; | |
90 | ||
91 | snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr); | |
92 | return std::string(buf); | |
93 | } | |
94 | ||
95 | /* | |
96 | * Used for refcounters print on the last context put, almost duplicates | |
97 | * global context refcounter, sigh. | |
98 | */ | |
99 | static std::atomic<int> ctx_ref(1); | |
100 | static DummyAuthClientServer *g_dummy_auth; | |
101 | ||
102 | static void create_or_get_ceph_context(struct ceph_msgr_options *o) | |
103 | { | |
104 | if (g_ceph_context) { | |
105 | g_ceph_context->get(); | |
106 | ctx_ref++; | |
107 | return; | |
108 | } | |
109 | ||
110 | boost::intrusive_ptr<CephContext> cct; | |
111 | vector<const char*> args; | |
112 | ||
113 | if (o->conffile) | |
114 | args = { "--conf", o->conffile }; | |
115 | ||
116 | cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, | |
117 | CODE_ENVIRONMENT_UTILITY, | |
118 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); | |
119 | /* Will use g_ceph_context instead */ | |
120 | cct.detach(); | |
121 | ||
122 | common_init_finish(g_ceph_context); | |
123 | g_ceph_context->_conf.apply_changes(NULL); | |
124 | g_dummy_auth = new DummyAuthClientServer(g_ceph_context); | |
125 | g_dummy_auth->auth_registry.refresh_config(); | |
126 | } | |
127 | ||
128 | static void put_ceph_context(void) | |
129 | { | |
130 | if (--ctx_ref == 0) { | |
131 | ostringstream ostr; | |
132 | Formatter* f; | |
133 | ||
134 | f = Formatter::create("json-pretty"); | |
1e59de90 | 135 | g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false, false); |
9f95a23c TL |
136 | ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl; |
137 | f->flush(ostr); | |
138 | ostr << ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl; | |
139 | ||
140 | delete f; | |
141 | delete g_dummy_auth; | |
142 | dout(0) << ostr.str() << dendl; | |
143 | } | |
144 | ||
145 | g_ceph_context->put(); | |
146 | } | |
147 | ||
148 | static void ceph_msgr_sender_on_reply(const object_t &oid) | |
149 | { | |
150 | struct ceph_msgr_data *data; | |
151 | struct ceph_msgr_io *io; | |
152 | ||
153 | /* | |
154 | * Here we abuse object and use it as a raw pointer. Since this is | |
155 | * only for benchmarks and testing we do not care about anything | |
156 | * but performance. So no need to use global structure in order | |
157 | * to search for reply, just send a pointer and get it back. | |
158 | */ | |
159 | ||
160 | io = (decltype(io))str_to_ptr(oid.name); | |
161 | data = io->data; | |
162 | ring_buffer_enqueue(&data->io_completed_q, (void *)io); | |
163 | } | |
164 | ||
165 | ||
166 | class ReplyCompletion : public Message::CompletionHook { | |
167 | struct ceph_msgr_io *m_io; | |
168 | ||
169 | public: | |
170 | ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) : | |
171 | Message::CompletionHook(rep), | |
172 | m_io(io) { | |
173 | } | |
174 | void finish(int err) override { | |
175 | struct ceph_msgr_data *data = m_io->data; | |
176 | ||
177 | ring_buffer_enqueue(&data->io_completed_q, (void *)m_io); | |
178 | } | |
179 | }; | |
180 | ||
181 | static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data, | |
182 | MOSDOp *req) | |
183 | { | |
184 | MOSDOpReply *rep; | |
185 | ||
186 | rep = new MOSDOpReply(req, 0, 0, 0, false); | |
187 | rep->set_connection(req->get_connection()); | |
188 | ||
189 | pthread_spin_lock(&data->spin); | |
190 | if (data->io_inflight_nr) { | |
191 | struct ceph_msgr_io *io; | |
192 | ||
193 | data->io_inflight_nr--; | |
194 | io = flist_first_entry(&data->io_inflight_list, | |
195 | struct ceph_msgr_io, list); | |
196 | flist_del(&io->list); | |
197 | pthread_spin_unlock(&data->spin); | |
198 | ||
199 | rep->set_completion_hook(new ReplyCompletion(rep, io)); | |
200 | rep->get_connection()->send_message(rep); | |
201 | } else { | |
202 | struct ceph_msgr_reply_io *rep_io; | |
203 | ||
204 | rep_io = (decltype(rep_io))malloc(sizeof(*rep_io)); | |
205 | rep_io->rep = rep; | |
206 | ||
207 | data->io_pending_nr++; | |
208 | flist_add_tail(&rep_io->list, &data->io_pending_list); | |
209 | pthread_spin_unlock(&data->spin); | |
210 | } | |
211 | } | |
212 | ||
213 | class FioDispatcher : public Dispatcher { | |
214 | struct ceph_msgr_data *m_data; | |
215 | ||
216 | public: | |
217 | FioDispatcher(struct ceph_msgr_data *data): | |
218 | Dispatcher(g_ceph_context), | |
219 | m_data(data) { | |
220 | } | |
221 | bool ms_can_fast_dispatch_any() const override { | |
222 | return true; | |
223 | } | |
224 | bool ms_can_fast_dispatch(const Message *m) const override { | |
225 | switch (m->get_type()) { | |
226 | case CEPH_MSG_OSD_OP: | |
227 | return m_data->o->is_receiver; | |
228 | case CEPH_MSG_OSD_OPREPLY: | |
229 | return !m_data->o->is_receiver; | |
230 | default: | |
231 | return false; | |
232 | } | |
233 | } | |
234 | void ms_handle_fast_connect(Connection *con) override { | |
235 | } | |
236 | void ms_handle_fast_accept(Connection *con) override { | |
237 | } | |
238 | bool ms_dispatch(Message *m) override { | |
239 | return true; | |
240 | } | |
241 | void ms_fast_dispatch(Message *m) override { | |
242 | if (m_data->o->is_receiver) { | |
243 | MOSDOp *req; | |
244 | ||
245 | /* | |
246 | * Server side, handle request. | |
247 | */ | |
248 | ||
249 | req = static_cast<MOSDOp*>(m); | |
250 | req->finish_decode(); | |
251 | ||
252 | ceph_msgr_receiver_on_request(m_data, req); | |
253 | } else { | |
254 | MOSDOpReply *rep; | |
255 | ||
256 | /* | |
257 | * Client side, get reply, extract objid and mark | |
258 | * IO as completed. | |
259 | */ | |
260 | ||
261 | rep = static_cast<MOSDOpReply*>(m); | |
262 | ceph_msgr_sender_on_reply(rep->get_oid()); | |
263 | } | |
264 | m->put(); | |
265 | } | |
266 | bool ms_handle_reset(Connection *con) override { | |
267 | return true; | |
268 | } | |
269 | void ms_handle_remote_reset(Connection *con) override { | |
270 | } | |
271 | bool ms_handle_refused(Connection *con) override { | |
272 | return false; | |
273 | } | |
aee94f69 | 274 | int ms_handle_fast_authentication(Connection *con) override { |
9f95a23c TL |
275 | return 1; |
276 | } | |
277 | }; | |
278 | ||
279 | static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o) | |
280 | { | |
281 | entity_addr_t addr; | |
282 | ||
283 | addr.parse(o->hostname); | |
284 | addr.set_port(o->port); | |
285 | addr.set_nonce(0); | |
286 | ||
287 | return addr; | |
288 | } | |
289 | ||
290 | static Messenger *create_messenger(struct ceph_msgr_options *o) | |
291 | { | |
292 | entity_name_t ename = o->is_receiver ? | |
293 | entity_name_t::OSD(0) : entity_name_t::CLIENT(0); | |
294 | std::string lname = o->is_receiver ? | |
295 | "receiver" : "sender"; | |
296 | ||
9f95a23c TL |
297 | std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ? |
298 | ceph_msgr_types[o->ms_type] : | |
299 | g_ceph_context->_conf.get_val<std::string>("ms_type"); | |
300 | ||
301 | /* o->td__>pid doesn't set value, so use getpid() instead*/ | |
302 | auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number); | |
303 | Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(), | |
f67539c2 | 304 | ename, lname, nonce); |
9f95a23c TL |
305 | if (o->is_receiver) { |
306 | msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
307 | msgr->bind(hostname_to_addr(o)); | |
308 | } else { | |
309 | msgr->set_default_policy(Messenger::Policy::lossless_client(0)); | |
310 | } | |
311 | msgr->set_auth_client(g_dummy_auth); | |
312 | msgr->set_auth_server(g_dummy_auth); | |
313 | msgr->set_require_authorizer(false); | |
314 | msgr->start(); | |
315 | ||
316 | return msgr; | |
317 | } | |
318 | ||
319 | static Messenger *single_msgr; | |
320 | static std::atomic<int> single_msgr_ref; | |
321 | static vector<FioDispatcher *> single_msgr_disps; | |
322 | ||
323 | static void init_messenger(struct ceph_msgr_data *data) | |
324 | { | |
325 | struct ceph_msgr_options *o = data->o; | |
326 | FioDispatcher *disp; | |
327 | Messenger *msgr; | |
328 | ||
329 | disp = new FioDispatcher(data); | |
330 | if (o->is_single) { | |
331 | /* | |
332 | * Single messenger instance for the whole FIO | |
333 | */ | |
334 | ||
335 | if (!single_msgr) { | |
336 | msgr = create_messenger(o); | |
337 | single_msgr = msgr; | |
338 | } else { | |
339 | msgr = single_msgr; | |
340 | } | |
341 | single_msgr_disps.push_back(disp); | |
342 | single_msgr_ref++; | |
343 | } else { | |
344 | /* | |
345 | * Messenger instance per FIO thread | |
346 | */ | |
347 | msgr = create_messenger(o); | |
348 | } | |
349 | msgr->add_dispatcher_head(disp); | |
350 | ||
351 | data->disp = disp; | |
352 | data->msgr = msgr; | |
353 | } | |
354 | ||
355 | static void free_messenger(struct ceph_msgr_data *data) | |
356 | { | |
357 | data->msgr->shutdown(); | |
358 | data->msgr->wait(); | |
359 | delete data->msgr; | |
360 | } | |
361 | ||
362 | static void put_messenger(struct ceph_msgr_data *data) | |
363 | { | |
364 | struct ceph_msgr_options *o = data->o; | |
365 | ||
366 | if (o->is_single) { | |
367 | if (--single_msgr_ref == 0) { | |
368 | free_messenger(data); | |
369 | /* | |
370 | * In case of a single messenger instance we have to | |
371 | * free dispatchers after actual messenger destruction. | |
372 | */ | |
373 | for (auto disp : single_msgr_disps) | |
374 | delete disp; | |
375 | single_msgr = NULL; | |
376 | } | |
377 | } else { | |
378 | free_messenger(data); | |
379 | delete data->disp; | |
380 | } | |
381 | data->disp = NULL; | |
382 | data->msgr = NULL; | |
383 | } | |
384 | ||
385 | static int fio_ceph_msgr_setup(struct thread_data *td) | |
386 | { | |
387 | struct ceph_msgr_options *o = (decltype(o))td->eo; | |
388 | o->td__ = td; | |
389 | ceph_msgr_data *data; | |
390 | ||
391 | /* We have to manage global resources so we use threads */ | |
392 | td->o.use_thread = 1; | |
393 | ||
394 | create_or_get_ceph_context(o); | |
395 | ||
396 | if (!td->io_ops_data) { | |
397 | data = new ceph_msgr_data(o, td->o.iodepth); | |
398 | init_messenger(data); | |
399 | td->io_ops_data = (void *)data; | |
400 | } | |
401 | ||
402 | return 0; | |
403 | } | |
404 | ||
405 | static void fio_ceph_msgr_cleanup(struct thread_data *td) | |
406 | { | |
407 | struct ceph_msgr_data *data; | |
408 | unsigned nr; | |
409 | ||
410 | data = (decltype(data))td->io_ops_data; | |
411 | put_messenger(data); | |
412 | ||
413 | nr = ring_buffer_used_size(&data->io_completed_q); | |
414 | if (nr) | |
415 | fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n", | |
416 | nr); | |
417 | if (data->io_inflight_nr) | |
418 | fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n", | |
419 | data->io_inflight_nr); | |
420 | if (data->io_pending_nr) | |
421 | fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n", | |
422 | data->io_pending_nr); | |
423 | if (!flist_empty(&data->io_inflight_list)) | |
424 | fprintf(stderr, "fio: io_inflight_list is not empty\n"); | |
425 | if (!flist_empty(&data->io_pending_list)) | |
426 | fprintf(stderr, "fio: io_pending_list is not empty\n"); | |
427 | ||
428 | ring_buffer_deinit(&data->io_completed_q); | |
429 | delete data; | |
430 | put_ceph_context(); | |
431 | } | |
432 | ||
433 | static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u) | |
434 | { | |
435 | struct ceph_msgr_options *o = (decltype(o))td->eo; | |
436 | struct ceph_msgr_io *io; | |
437 | MOSDOp *req_msg = NULL; | |
438 | ||
439 | io = (decltype(io))malloc(sizeof(*io)); | |
440 | io->io_u = io_u; | |
441 | io->data = (decltype(io->data))td->io_ops_data; | |
442 | ||
443 | if (!o->is_receiver) { | |
444 | object_t oid(ptr_to_str(io)); | |
445 | pg_t pgid; | |
446 | object_locator_t oloc; | |
447 | hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), | |
448 | pgid.pool(), oloc.nspace); | |
449 | spg_t spgid(pgid); | |
450 | entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o)); | |
451 | ||
452 | Messenger *msgr = io->data->msgr; | |
453 | ConnectionRef con = msgr->connect_to(dest.name.type(), | |
454 | entity_addrvec_t(dest.addr)); | |
455 | ||
456 | req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); | |
457 | req_msg->set_connection(con); | |
458 | } | |
459 | ||
460 | io->req_msg = req_msg; | |
461 | io_u->engine_data = (void *)io; | |
462 | ||
463 | return 0; | |
464 | } | |
465 | ||
466 | static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u) | |
467 | { | |
468 | struct ceph_msgr_io *io; | |
469 | ||
470 | io = (decltype(io))io_u->engine_data; | |
471 | if (io) { | |
472 | io_u->engine_data = NULL; | |
473 | if (io->req_msg) | |
474 | io->req_msg->put(); | |
475 | free(io); | |
476 | } | |
477 | } | |
478 | ||
479 | static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td, | |
480 | struct io_u *io_u) | |
481 | { | |
482 | struct ceph_msgr_data *data; | |
483 | struct ceph_msgr_io *io; | |
484 | ||
485 | bufferlist buflist = bufferlist::static_from_mem( | |
486 | (char *)io_u->buf, io_u->buflen); | |
487 | ||
488 | io = (decltype(io))io_u->engine_data; | |
489 | data = (decltype(data))td->io_ops_data; | |
490 | ||
491 | /* No handy method to clear ops before reusage? Ok */ | |
492 | io->req_msg->ops.clear(); | |
493 | ||
494 | /* Here we do not care about direction, always send as write */ | |
495 | io->req_msg->write(0, io_u->buflen, buflist); | |
496 | /* Keep message alive */ | |
497 | io->req_msg->get(); | |
498 | io->req_msg->get_connection()->send_message(io->req_msg); | |
499 | ||
500 | return FIO_Q_QUEUED; | |
501 | } | |
502 | ||
503 | static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min, | |
504 | unsigned int max, const struct timespec *ts) | |
505 | { | |
506 | struct ceph_msgr_data *data; | |
507 | unsigned int nr; | |
508 | ||
509 | data = (decltype(data))td->io_ops_data; | |
510 | ||
511 | /* | |
512 | * Check io_u.c : if min == 0 -> ts is valid and equal to zero, | |
513 | * if min != 0 -> ts is NULL. | |
514 | */ | |
515 | assert(!min ^ !ts); | |
516 | ||
517 | nr = ring_buffer_used_size(&data->io_completed_q); | |
518 | if (nr >= min) | |
519 | /* We got something */ | |
520 | return min(nr, max); | |
521 | ||
522 | /* Here we are only if min != 0 and ts == NULL */ | |
523 | assert(min && !ts); | |
524 | ||
525 | while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min && | |
526 | !td->terminate) { | |
527 | /* Poll, no disk IO, so we expect response immediately. */ | |
528 | usleep(10); | |
529 | } | |
530 | ||
531 | return min(nr, max); | |
532 | } | |
533 | ||
534 | static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event) | |
535 | { | |
536 | struct ceph_msgr_data *data; | |
537 | struct ceph_msgr_io *io; | |
538 | ||
539 | data = (decltype(data))td->io_ops_data; | |
540 | io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q); | |
541 | ||
542 | return io->io_u; | |
543 | } | |
544 | ||
545 | static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td, | |
546 | struct io_u *io_u) | |
547 | { | |
548 | struct ceph_msgr_data *data; | |
549 | struct ceph_msgr_io *io; | |
550 | ||
551 | io = (decltype(io))io_u->engine_data; | |
552 | data = io->data; | |
553 | pthread_spin_lock(&data->spin); | |
554 | if (data->io_pending_nr) { | |
555 | struct ceph_msgr_reply_io *rep_io; | |
556 | MOSDOpReply *rep; | |
557 | ||
558 | data->io_pending_nr--; | |
559 | rep_io = flist_first_entry(&data->io_pending_list, | |
560 | struct ceph_msgr_reply_io, | |
561 | list); | |
562 | flist_del(&rep_io->list); | |
563 | rep = rep_io->rep; | |
564 | pthread_spin_unlock(&data->spin); | |
565 | free(rep_io); | |
566 | ||
567 | rep->set_completion_hook(new ReplyCompletion(rep, io)); | |
568 | rep->get_connection()->send_message(rep); | |
569 | } else { | |
570 | data->io_inflight_nr++; | |
571 | flist_add_tail(&io->list, &data->io_inflight_list); | |
572 | pthread_spin_unlock(&data->spin); | |
573 | } | |
574 | ||
575 | return FIO_Q_QUEUED; | |
576 | } | |
577 | ||
578 | static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td, | |
579 | struct io_u *io_u) | |
580 | { | |
581 | struct ceph_msgr_options *o = (decltype(o))td->eo; | |
582 | ||
583 | if (o->is_receiver) | |
584 | return ceph_msgr_receiver_queue(td, io_u); | |
585 | else | |
586 | return ceph_msgr_sender_queue(td, io_u); | |
587 | } | |
588 | ||
589 | static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f) | |
590 | { | |
591 | return 0; | |
592 | } | |
593 | ||
594 | static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *) | |
595 | { | |
596 | return 0; | |
597 | } | |
598 | ||
599 | template <class Func> | |
600 | fio_option make_option(Func&& func) | |
601 | { | |
602 | auto o = fio_option{}; | |
603 | o.category = FIO_OPT_C_ENGINE; | |
604 | func(std::ref(o)); | |
605 | return o; | |
606 | } | |
607 | ||
608 | static std::vector<fio_option> options { | |
609 | make_option([] (fio_option& o) { | |
610 | o.name = "receiver"; | |
611 | o.lname = "CEPH messenger is receiver"; | |
612 | o.type = FIO_OPT_BOOL; | |
613 | o.off1 = offsetof(struct ceph_msgr_options, is_receiver); | |
614 | o.help = "CEPH messenger is sender or receiver"; | |
615 | o.def = "0"; | |
616 | }), | |
617 | make_option([] (fio_option& o) { | |
618 | o.name = "single_instance"; | |
619 | o.lname = "Single instance of CEPH messenger "; | |
620 | o.type = FIO_OPT_BOOL; | |
621 | o.off1 = offsetof(struct ceph_msgr_options, is_single); | |
622 | o.help = "CEPH messenger is a created once for all threads"; | |
623 | o.def = "0"; | |
624 | }), | |
625 | make_option([] (fio_option& o) { | |
626 | o.name = "hostname"; | |
627 | o.lname = "CEPH messenger hostname"; | |
628 | o.type = FIO_OPT_STR_STORE; | |
629 | o.off1 = offsetof(struct ceph_msgr_options, hostname); | |
630 | o.help = "Hostname for CEPH messenger engine"; | |
631 | }), | |
632 | make_option([] (fio_option& o) { | |
633 | o.name = "port"; | |
634 | o.lname = "CEPH messenger engine port"; | |
635 | o.type = FIO_OPT_INT; | |
636 | o.off1 = offsetof(struct ceph_msgr_options, port); | |
637 | o.maxval = 65535; | |
638 | o.minval = 1; | |
639 | o.help = "Port to use for CEPH messenger"; | |
640 | }), | |
641 | make_option([] (fio_option& o) { | |
642 | o.name = "ms_type"; | |
643 | o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma"; | |
644 | o.type = FIO_OPT_STR; | |
645 | o.off1 = offsetof(struct ceph_msgr_options, ms_type); | |
646 | o.help = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page"; | |
647 | o.def = "undef"; | |
648 | ||
649 | o.posval[0].ival = "undef"; | |
650 | o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF; | |
651 | ||
652 | o.posval[1].ival = "async+posix"; | |
653 | o.posval[1].oval = CEPH_MSGR_TYPE_POSIX; | |
654 | o.posval[1].help = "POSIX API"; | |
655 | ||
656 | o.posval[2].ival = "async+dpdk"; | |
657 | o.posval[2].oval = CEPH_MSGR_TYPE_DPDK; | |
658 | o.posval[2].help = "DPDK"; | |
659 | ||
660 | o.posval[3].ival = "async+rdma"; | |
661 | o.posval[3].oval = CEPH_MSGR_TYPE_RDMA; | |
662 | o.posval[3].help = "RDMA"; | |
663 | }), | |
664 | make_option([] (fio_option& o) { | |
665 | o.name = "ceph_conf_file"; | |
666 | o.lname = "CEPH configuration file"; | |
667 | o.type = FIO_OPT_STR_STORE; | |
668 | o.off1 = offsetof(struct ceph_msgr_options, conffile); | |
669 | o.help = "Path to CEPH configuration file"; | |
670 | }), | |
671 | {} /* Last NULL */ | |
672 | }; | |
673 | ||
674 | static struct ioengine_ops ioengine; | |
675 | ||
676 | extern "C" { | |
677 | ||
678 | void get_ioengine(struct ioengine_ops** ioengine_ptr) | |
679 | { | |
680 | /* | |
681 | * Main ioengine structure | |
682 | */ | |
683 | ioengine.name = "ceph-msgr"; | |
684 | ioengine.version = FIO_IOOPS_VERSION; | |
685 | ioengine.flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO; | |
686 | ioengine.setup = fio_ceph_msgr_setup; | |
687 | ioengine.queue = fio_ceph_msgr_queue; | |
688 | ioengine.getevents = fio_ceph_msgr_getevents; | |
689 | ioengine.event = fio_ceph_msgr_event; | |
690 | ioengine.cleanup = fio_ceph_msgr_cleanup; | |
691 | ioengine.open_file = fio_ceph_msgr_open_file; | |
692 | ioengine.close_file = fio_ceph_msgr_close_file; | |
693 | ioengine.io_u_init = fio_ceph_msgr_io_u_init; | |
694 | ioengine.io_u_free = fio_ceph_msgr_io_u_free; | |
695 | ioengine.option_struct_size = sizeof(struct ceph_msgr_options); | |
696 | ioengine.options = options.data(); | |
697 | ||
698 | *ioengine_ptr = &ioengine; | |
699 | } | |
700 | } // extern "C" |