]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * CEPH messenger engine | |
5 | * | |
6 | * FIO engine which uses ceph messenger as a transport. See corresponding | |
7 | * FIO client and server jobs for details. | |
8 | */ | |
9 | ||
10 | #include "global/global_init.h" | |
11 | #include "msg/Messenger.h" | |
12 | #include "messages/MOSDOp.h" | |
13 | #include "messages/MOSDOpReply.h" | |
14 | #include "common/perf_counters.h" | |
15 | #include "auth/DummyAuth.h" | |
16 | #include "ring_buffer.h" | |
17 | ||
18 | #include <fio.h> | |
19 | #include <flist.h> | |
20 | #include <optgroup.h> | |
21 | ||
22 | #define dout_context g_ceph_context | |
23 | #define dout_subsys ceph_subsys_ | |
24 | ||
20effc67 TL |
25 | using namespace std; |
26 | ||
9f95a23c TL |
27 | enum ceph_msgr_type { |
28 | CEPH_MSGR_TYPE_UNDEF, | |
29 | CEPH_MSGR_TYPE_POSIX, | |
30 | CEPH_MSGR_TYPE_DPDK, | |
31 | CEPH_MSGR_TYPE_RDMA, | |
32 | }; | |
33 | ||
34 | const char *ceph_msgr_types[] = { "undef", "async+posix", | |
35 | "async+dpdk", "async+rdma" }; | |
36 | ||
37 | struct ceph_msgr_options { | |
38 | struct thread_data *td__; | |
39 | unsigned int is_receiver; | |
40 | unsigned int is_single; | |
41 | unsigned int port; | |
42 | const char *hostname; | |
43 | const char *conffile; | |
44 | enum ceph_msgr_type ms_type; | |
45 | }; | |
46 | ||
47 | class FioDispatcher; | |
48 | ||
49 | struct ceph_msgr_data { | |
50 | ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) : | |
51 | o(o_) { | |
52 | INIT_FLIST_HEAD(&io_inflight_list); | |
53 | INIT_FLIST_HEAD(&io_pending_list); | |
54 | ring_buffer_init(&io_completed_q, iodepth); | |
55 | pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE); | |
56 | } | |
57 | ||
58 | struct ceph_msgr_options *o; | |
59 | Messenger *msgr = NULL; | |
60 | FioDispatcher *disp = NULL; | |
61 | pthread_spinlock_t spin; | |
62 | struct ring_buffer io_completed_q; | |
63 | struct flist_head io_inflight_list; | |
64 | struct flist_head io_pending_list; | |
65 | unsigned int io_inflight_nr = 0; | |
66 | unsigned int io_pending_nr = 0; | |
67 | }; | |
68 | ||
69 | struct ceph_msgr_io { | |
70 | struct flist_head list; | |
71 | struct ceph_msgr_data *data; | |
72 | struct io_u *io_u; | |
73 | MOSDOp *req_msg; /** Cached request, valid only for sender */ | |
74 | }; | |
75 | ||
76 | struct ceph_msgr_reply_io { | |
77 | struct flist_head list; | |
78 | MOSDOpReply *rep; | |
79 | }; | |
80 | ||
81 | static void *str_to_ptr(const std::string &str) | |
82 | { | |
83 | return (void *)strtoul(str.c_str(), NULL, 16); | |
84 | } | |
85 | ||
86 | static std::string ptr_to_str(void *ptr) | |
87 | { | |
88 | char buf[32]; | |
89 | ||
90 | snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr); | |
91 | return std::string(buf); | |
92 | } | |
93 | ||
94 | /* | |
95 | * Used for refcounters print on the last context put, almost duplicates | |
96 | * global context refcounter, sigh. | |
97 | */ | |
98 | static std::atomic<int> ctx_ref(1); | |
99 | static DummyAuthClientServer *g_dummy_auth; | |
100 | ||
101 | static void create_or_get_ceph_context(struct ceph_msgr_options *o) | |
102 | { | |
103 | if (g_ceph_context) { | |
104 | g_ceph_context->get(); | |
105 | ctx_ref++; | |
106 | return; | |
107 | } | |
108 | ||
109 | boost::intrusive_ptr<CephContext> cct; | |
110 | vector<const char*> args; | |
111 | ||
112 | if (o->conffile) | |
113 | args = { "--conf", o->conffile }; | |
114 | ||
115 | cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, | |
116 | CODE_ENVIRONMENT_UTILITY, | |
117 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); | |
118 | /* Will use g_ceph_context instead */ | |
119 | cct.detach(); | |
120 | ||
121 | common_init_finish(g_ceph_context); | |
122 | g_ceph_context->_conf.apply_changes(NULL); | |
123 | g_dummy_auth = new DummyAuthClientServer(g_ceph_context); | |
124 | g_dummy_auth->auth_registry.refresh_config(); | |
125 | } | |
126 | ||
127 | static void put_ceph_context(void) | |
128 | { | |
129 | if (--ctx_ref == 0) { | |
130 | ostringstream ostr; | |
131 | Formatter* f; | |
132 | ||
133 | f = Formatter::create("json-pretty"); | |
134 | g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false); | |
135 | ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl; | |
136 | f->flush(ostr); | |
137 | ostr << ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl; | |
138 | ||
139 | delete f; | |
140 | delete g_dummy_auth; | |
141 | dout(0) << ostr.str() << dendl; | |
142 | } | |
143 | ||
144 | g_ceph_context->put(); | |
145 | } | |
146 | ||
147 | static void ceph_msgr_sender_on_reply(const object_t &oid) | |
148 | { | |
149 | struct ceph_msgr_data *data; | |
150 | struct ceph_msgr_io *io; | |
151 | ||
152 | /* | |
153 | * Here we abuse object and use it as a raw pointer. Since this is | |
154 | * only for benchmarks and testing we do not care about anything | |
155 | * but performance. So no need to use global structure in order | |
156 | * to search for reply, just send a pointer and get it back. | |
157 | */ | |
158 | ||
159 | io = (decltype(io))str_to_ptr(oid.name); | |
160 | data = io->data; | |
161 | ring_buffer_enqueue(&data->io_completed_q, (void *)io); | |
162 | } | |
163 | ||
164 | ||
165 | class ReplyCompletion : public Message::CompletionHook { | |
166 | struct ceph_msgr_io *m_io; | |
167 | ||
168 | public: | |
169 | ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) : | |
170 | Message::CompletionHook(rep), | |
171 | m_io(io) { | |
172 | } | |
173 | void finish(int err) override { | |
174 | struct ceph_msgr_data *data = m_io->data; | |
175 | ||
176 | ring_buffer_enqueue(&data->io_completed_q, (void *)m_io); | |
177 | } | |
178 | }; | |
179 | ||
180 | static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data, | |
181 | MOSDOp *req) | |
182 | { | |
183 | MOSDOpReply *rep; | |
184 | ||
185 | rep = new MOSDOpReply(req, 0, 0, 0, false); | |
186 | rep->set_connection(req->get_connection()); | |
187 | ||
188 | pthread_spin_lock(&data->spin); | |
189 | if (data->io_inflight_nr) { | |
190 | struct ceph_msgr_io *io; | |
191 | ||
192 | data->io_inflight_nr--; | |
193 | io = flist_first_entry(&data->io_inflight_list, | |
194 | struct ceph_msgr_io, list); | |
195 | flist_del(&io->list); | |
196 | pthread_spin_unlock(&data->spin); | |
197 | ||
198 | rep->set_completion_hook(new ReplyCompletion(rep, io)); | |
199 | rep->get_connection()->send_message(rep); | |
200 | } else { | |
201 | struct ceph_msgr_reply_io *rep_io; | |
202 | ||
203 | rep_io = (decltype(rep_io))malloc(sizeof(*rep_io)); | |
204 | rep_io->rep = rep; | |
205 | ||
206 | data->io_pending_nr++; | |
207 | flist_add_tail(&rep_io->list, &data->io_pending_list); | |
208 | pthread_spin_unlock(&data->spin); | |
209 | } | |
210 | } | |
211 | ||
212 | class FioDispatcher : public Dispatcher { | |
213 | struct ceph_msgr_data *m_data; | |
214 | ||
215 | public: | |
216 | FioDispatcher(struct ceph_msgr_data *data): | |
217 | Dispatcher(g_ceph_context), | |
218 | m_data(data) { | |
219 | } | |
220 | bool ms_can_fast_dispatch_any() const override { | |
221 | return true; | |
222 | } | |
223 | bool ms_can_fast_dispatch(const Message *m) const override { | |
224 | switch (m->get_type()) { | |
225 | case CEPH_MSG_OSD_OP: | |
226 | return m_data->o->is_receiver; | |
227 | case CEPH_MSG_OSD_OPREPLY: | |
228 | return !m_data->o->is_receiver; | |
229 | default: | |
230 | return false; | |
231 | } | |
232 | } | |
233 | void ms_handle_fast_connect(Connection *con) override { | |
234 | } | |
235 | void ms_handle_fast_accept(Connection *con) override { | |
236 | } | |
237 | bool ms_dispatch(Message *m) override { | |
238 | return true; | |
239 | } | |
240 | void ms_fast_dispatch(Message *m) override { | |
241 | if (m_data->o->is_receiver) { | |
242 | MOSDOp *req; | |
243 | ||
244 | /* | |
245 | * Server side, handle request. | |
246 | */ | |
247 | ||
248 | req = static_cast<MOSDOp*>(m); | |
249 | req->finish_decode(); | |
250 | ||
251 | ceph_msgr_receiver_on_request(m_data, req); | |
252 | } else { | |
253 | MOSDOpReply *rep; | |
254 | ||
255 | /* | |
256 | * Client side, get reply, extract objid and mark | |
257 | * IO as completed. | |
258 | */ | |
259 | ||
260 | rep = static_cast<MOSDOpReply*>(m); | |
261 | ceph_msgr_sender_on_reply(rep->get_oid()); | |
262 | } | |
263 | m->put(); | |
264 | } | |
265 | bool ms_handle_reset(Connection *con) override { | |
266 | return true; | |
267 | } | |
268 | void ms_handle_remote_reset(Connection *con) override { | |
269 | } | |
270 | bool ms_handle_refused(Connection *con) override { | |
271 | return false; | |
272 | } | |
273 | int ms_handle_authentication(Connection *con) override { | |
274 | return 1; | |
275 | } | |
276 | }; | |
277 | ||
278 | static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o) | |
279 | { | |
280 | entity_addr_t addr; | |
281 | ||
282 | addr.parse(o->hostname); | |
283 | addr.set_port(o->port); | |
284 | addr.set_nonce(0); | |
285 | ||
286 | return addr; | |
287 | } | |
288 | ||
289 | static Messenger *create_messenger(struct ceph_msgr_options *o) | |
290 | { | |
291 | entity_name_t ename = o->is_receiver ? | |
292 | entity_name_t::OSD(0) : entity_name_t::CLIENT(0); | |
293 | std::string lname = o->is_receiver ? | |
294 | "receiver" : "sender"; | |
295 | ||
9f95a23c TL |
296 | std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ? |
297 | ceph_msgr_types[o->ms_type] : | |
298 | g_ceph_context->_conf.get_val<std::string>("ms_type"); | |
299 | ||
300 | /* o->td__>pid doesn't set value, so use getpid() instead*/ | |
301 | auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number); | |
302 | Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(), | |
f67539c2 | 303 | ename, lname, nonce); |
9f95a23c TL |
304 | if (o->is_receiver) { |
305 | msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
306 | msgr->bind(hostname_to_addr(o)); | |
307 | } else { | |
308 | msgr->set_default_policy(Messenger::Policy::lossless_client(0)); | |
309 | } | |
310 | msgr->set_auth_client(g_dummy_auth); | |
311 | msgr->set_auth_server(g_dummy_auth); | |
312 | msgr->set_require_authorizer(false); | |
313 | msgr->start(); | |
314 | ||
315 | return msgr; | |
316 | } | |
317 | ||
318 | static Messenger *single_msgr; | |
319 | static std::atomic<int> single_msgr_ref; | |
320 | static vector<FioDispatcher *> single_msgr_disps; | |
321 | ||
322 | static void init_messenger(struct ceph_msgr_data *data) | |
323 | { | |
324 | struct ceph_msgr_options *o = data->o; | |
325 | FioDispatcher *disp; | |
326 | Messenger *msgr; | |
327 | ||
328 | disp = new FioDispatcher(data); | |
329 | if (o->is_single) { | |
330 | /* | |
331 | * Single messenger instance for the whole FIO | |
332 | */ | |
333 | ||
334 | if (!single_msgr) { | |
335 | msgr = create_messenger(o); | |
336 | single_msgr = msgr; | |
337 | } else { | |
338 | msgr = single_msgr; | |
339 | } | |
340 | single_msgr_disps.push_back(disp); | |
341 | single_msgr_ref++; | |
342 | } else { | |
343 | /* | |
344 | * Messenger instance per FIO thread | |
345 | */ | |
346 | msgr = create_messenger(o); | |
347 | } | |
348 | msgr->add_dispatcher_head(disp); | |
349 | ||
350 | data->disp = disp; | |
351 | data->msgr = msgr; | |
352 | } | |
353 | ||
354 | static void free_messenger(struct ceph_msgr_data *data) | |
355 | { | |
356 | data->msgr->shutdown(); | |
357 | data->msgr->wait(); | |
358 | delete data->msgr; | |
359 | } | |
360 | ||
361 | static void put_messenger(struct ceph_msgr_data *data) | |
362 | { | |
363 | struct ceph_msgr_options *o = data->o; | |
364 | ||
365 | if (o->is_single) { | |
366 | if (--single_msgr_ref == 0) { | |
367 | free_messenger(data); | |
368 | /* | |
369 | * In case of a single messenger instance we have to | |
370 | * free dispatchers after actual messenger destruction. | |
371 | */ | |
372 | for (auto disp : single_msgr_disps) | |
373 | delete disp; | |
374 | single_msgr = NULL; | |
375 | } | |
376 | } else { | |
377 | free_messenger(data); | |
378 | delete data->disp; | |
379 | } | |
380 | data->disp = NULL; | |
381 | data->msgr = NULL; | |
382 | } | |
383 | ||
384 | static int fio_ceph_msgr_setup(struct thread_data *td) | |
385 | { | |
386 | struct ceph_msgr_options *o = (decltype(o))td->eo; | |
387 | o->td__ = td; | |
388 | ceph_msgr_data *data; | |
389 | ||
390 | /* We have to manage global resources so we use threads */ | |
391 | td->o.use_thread = 1; | |
392 | ||
393 | create_or_get_ceph_context(o); | |
394 | ||
395 | if (!td->io_ops_data) { | |
396 | data = new ceph_msgr_data(o, td->o.iodepth); | |
397 | init_messenger(data); | |
398 | td->io_ops_data = (void *)data; | |
399 | } | |
400 | ||
401 | return 0; | |
402 | } | |
403 | ||
404 | static void fio_ceph_msgr_cleanup(struct thread_data *td) | |
405 | { | |
406 | struct ceph_msgr_data *data; | |
407 | unsigned nr; | |
408 | ||
409 | data = (decltype(data))td->io_ops_data; | |
410 | put_messenger(data); | |
411 | ||
412 | nr = ring_buffer_used_size(&data->io_completed_q); | |
413 | if (nr) | |
414 | fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n", | |
415 | nr); | |
416 | if (data->io_inflight_nr) | |
417 | fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n", | |
418 | data->io_inflight_nr); | |
419 | if (data->io_pending_nr) | |
420 | fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n", | |
421 | data->io_pending_nr); | |
422 | if (!flist_empty(&data->io_inflight_list)) | |
423 | fprintf(stderr, "fio: io_inflight_list is not empty\n"); | |
424 | if (!flist_empty(&data->io_pending_list)) | |
425 | fprintf(stderr, "fio: io_pending_list is not empty\n"); | |
426 | ||
427 | ring_buffer_deinit(&data->io_completed_q); | |
428 | delete data; | |
429 | put_ceph_context(); | |
430 | } | |
431 | ||
432 | static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u) | |
433 | { | |
434 | struct ceph_msgr_options *o = (decltype(o))td->eo; | |
435 | struct ceph_msgr_io *io; | |
436 | MOSDOp *req_msg = NULL; | |
437 | ||
438 | io = (decltype(io))malloc(sizeof(*io)); | |
439 | io->io_u = io_u; | |
440 | io->data = (decltype(io->data))td->io_ops_data; | |
441 | ||
442 | if (!o->is_receiver) { | |
443 | object_t oid(ptr_to_str(io)); | |
444 | pg_t pgid; | |
445 | object_locator_t oloc; | |
446 | hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), | |
447 | pgid.pool(), oloc.nspace); | |
448 | spg_t spgid(pgid); | |
449 | entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o)); | |
450 | ||
451 | Messenger *msgr = io->data->msgr; | |
452 | ConnectionRef con = msgr->connect_to(dest.name.type(), | |
453 | entity_addrvec_t(dest.addr)); | |
454 | ||
455 | req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); | |
456 | req_msg->set_connection(con); | |
457 | } | |
458 | ||
459 | io->req_msg = req_msg; | |
460 | io_u->engine_data = (void *)io; | |
461 | ||
462 | return 0; | |
463 | } | |
464 | ||
465 | static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u) | |
466 | { | |
467 | struct ceph_msgr_io *io; | |
468 | ||
469 | io = (decltype(io))io_u->engine_data; | |
470 | if (io) { | |
471 | io_u->engine_data = NULL; | |
472 | if (io->req_msg) | |
473 | io->req_msg->put(); | |
474 | free(io); | |
475 | } | |
476 | } | |
477 | ||
478 | static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td, | |
479 | struct io_u *io_u) | |
480 | { | |
481 | struct ceph_msgr_data *data; | |
482 | struct ceph_msgr_io *io; | |
483 | ||
484 | bufferlist buflist = bufferlist::static_from_mem( | |
485 | (char *)io_u->buf, io_u->buflen); | |
486 | ||
487 | io = (decltype(io))io_u->engine_data; | |
488 | data = (decltype(data))td->io_ops_data; | |
489 | ||
490 | /* No handy method to clear ops before reusage? Ok */ | |
491 | io->req_msg->ops.clear(); | |
492 | ||
493 | /* Here we do not care about direction, always send as write */ | |
494 | io->req_msg->write(0, io_u->buflen, buflist); | |
495 | /* Keep message alive */ | |
496 | io->req_msg->get(); | |
497 | io->req_msg->get_connection()->send_message(io->req_msg); | |
498 | ||
499 | return FIO_Q_QUEUED; | |
500 | } | |
501 | ||
502 | static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min, | |
503 | unsigned int max, const struct timespec *ts) | |
504 | { | |
505 | struct ceph_msgr_data *data; | |
506 | unsigned int nr; | |
507 | ||
508 | data = (decltype(data))td->io_ops_data; | |
509 | ||
510 | /* | |
511 | * Check io_u.c : if min == 0 -> ts is valid and equal to zero, | |
512 | * if min != 0 -> ts is NULL. | |
513 | */ | |
514 | assert(!min ^ !ts); | |
515 | ||
516 | nr = ring_buffer_used_size(&data->io_completed_q); | |
517 | if (nr >= min) | |
518 | /* We got something */ | |
519 | return min(nr, max); | |
520 | ||
521 | /* Here we are only if min != 0 and ts == NULL */ | |
522 | assert(min && !ts); | |
523 | ||
524 | while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min && | |
525 | !td->terminate) { | |
526 | /* Poll, no disk IO, so we expect response immediately. */ | |
527 | usleep(10); | |
528 | } | |
529 | ||
530 | return min(nr, max); | |
531 | } | |
532 | ||
533 | static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event) | |
534 | { | |
535 | struct ceph_msgr_data *data; | |
536 | struct ceph_msgr_io *io; | |
537 | ||
538 | data = (decltype(data))td->io_ops_data; | |
539 | io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q); | |
540 | ||
541 | return io->io_u; | |
542 | } | |
543 | ||
544 | static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td, | |
545 | struct io_u *io_u) | |
546 | { | |
547 | struct ceph_msgr_data *data; | |
548 | struct ceph_msgr_io *io; | |
549 | ||
550 | io = (decltype(io))io_u->engine_data; | |
551 | data = io->data; | |
552 | pthread_spin_lock(&data->spin); | |
553 | if (data->io_pending_nr) { | |
554 | struct ceph_msgr_reply_io *rep_io; | |
555 | MOSDOpReply *rep; | |
556 | ||
557 | data->io_pending_nr--; | |
558 | rep_io = flist_first_entry(&data->io_pending_list, | |
559 | struct ceph_msgr_reply_io, | |
560 | list); | |
561 | flist_del(&rep_io->list); | |
562 | rep = rep_io->rep; | |
563 | pthread_spin_unlock(&data->spin); | |
564 | free(rep_io); | |
565 | ||
566 | rep->set_completion_hook(new ReplyCompletion(rep, io)); | |
567 | rep->get_connection()->send_message(rep); | |
568 | } else { | |
569 | data->io_inflight_nr++; | |
570 | flist_add_tail(&io->list, &data->io_inflight_list); | |
571 | pthread_spin_unlock(&data->spin); | |
572 | } | |
573 | ||
574 | return FIO_Q_QUEUED; | |
575 | } | |
576 | ||
577 | static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td, | |
578 | struct io_u *io_u) | |
579 | { | |
580 | struct ceph_msgr_options *o = (decltype(o))td->eo; | |
581 | ||
582 | if (o->is_receiver) | |
583 | return ceph_msgr_receiver_queue(td, io_u); | |
584 | else | |
585 | return ceph_msgr_sender_queue(td, io_u); | |
586 | } | |
587 | ||
588 | static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f) | |
589 | { | |
590 | return 0; | |
591 | } | |
592 | ||
593 | static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *) | |
594 | { | |
595 | return 0; | |
596 | } | |
597 | ||
598 | template <class Func> | |
599 | fio_option make_option(Func&& func) | |
600 | { | |
601 | auto o = fio_option{}; | |
602 | o.category = FIO_OPT_C_ENGINE; | |
603 | func(std::ref(o)); | |
604 | return o; | |
605 | } | |
606 | ||
607 | static std::vector<fio_option> options { | |
608 | make_option([] (fio_option& o) { | |
609 | o.name = "receiver"; | |
610 | o.lname = "CEPH messenger is receiver"; | |
611 | o.type = FIO_OPT_BOOL; | |
612 | o.off1 = offsetof(struct ceph_msgr_options, is_receiver); | |
613 | o.help = "CEPH messenger is sender or receiver"; | |
614 | o.def = "0"; | |
615 | }), | |
616 | make_option([] (fio_option& o) { | |
617 | o.name = "single_instance"; | |
618 | o.lname = "Single instance of CEPH messenger "; | |
619 | o.type = FIO_OPT_BOOL; | |
620 | o.off1 = offsetof(struct ceph_msgr_options, is_single); | |
621 | o.help = "CEPH messenger is a created once for all threads"; | |
622 | o.def = "0"; | |
623 | }), | |
624 | make_option([] (fio_option& o) { | |
625 | o.name = "hostname"; | |
626 | o.lname = "CEPH messenger hostname"; | |
627 | o.type = FIO_OPT_STR_STORE; | |
628 | o.off1 = offsetof(struct ceph_msgr_options, hostname); | |
629 | o.help = "Hostname for CEPH messenger engine"; | |
630 | }), | |
631 | make_option([] (fio_option& o) { | |
632 | o.name = "port"; | |
633 | o.lname = "CEPH messenger engine port"; | |
634 | o.type = FIO_OPT_INT; | |
635 | o.off1 = offsetof(struct ceph_msgr_options, port); | |
636 | o.maxval = 65535; | |
637 | o.minval = 1; | |
638 | o.help = "Port to use for CEPH messenger"; | |
639 | }), | |
640 | make_option([] (fio_option& o) { | |
641 | o.name = "ms_type"; | |
642 | o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma"; | |
643 | o.type = FIO_OPT_STR; | |
644 | o.off1 = offsetof(struct ceph_msgr_options, ms_type); | |
645 | o.help = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page"; | |
646 | o.def = "undef"; | |
647 | ||
648 | o.posval[0].ival = "undef"; | |
649 | o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF; | |
650 | ||
651 | o.posval[1].ival = "async+posix"; | |
652 | o.posval[1].oval = CEPH_MSGR_TYPE_POSIX; | |
653 | o.posval[1].help = "POSIX API"; | |
654 | ||
655 | o.posval[2].ival = "async+dpdk"; | |
656 | o.posval[2].oval = CEPH_MSGR_TYPE_DPDK; | |
657 | o.posval[2].help = "DPDK"; | |
658 | ||
659 | o.posval[3].ival = "async+rdma"; | |
660 | o.posval[3].oval = CEPH_MSGR_TYPE_RDMA; | |
661 | o.posval[3].help = "RDMA"; | |
662 | }), | |
663 | make_option([] (fio_option& o) { | |
664 | o.name = "ceph_conf_file"; | |
665 | o.lname = "CEPH configuration file"; | |
666 | o.type = FIO_OPT_STR_STORE; | |
667 | o.off1 = offsetof(struct ceph_msgr_options, conffile); | |
668 | o.help = "Path to CEPH configuration file"; | |
669 | }), | |
670 | {} /* Last NULL */ | |
671 | }; | |
672 | ||
673 | static struct ioengine_ops ioengine; | |
674 | ||
675 | extern "C" { | |
676 | ||
677 | void get_ioengine(struct ioengine_ops** ioengine_ptr) | |
678 | { | |
679 | /* | |
680 | * Main ioengine structure | |
681 | */ | |
682 | ioengine.name = "ceph-msgr"; | |
683 | ioengine.version = FIO_IOOPS_VERSION; | |
684 | ioengine.flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO; | |
685 | ioengine.setup = fio_ceph_msgr_setup; | |
686 | ioengine.queue = fio_ceph_msgr_queue; | |
687 | ioengine.getevents = fio_ceph_msgr_getevents; | |
688 | ioengine.event = fio_ceph_msgr_event; | |
689 | ioengine.cleanup = fio_ceph_msgr_cleanup; | |
690 | ioengine.open_file = fio_ceph_msgr_open_file; | |
691 | ioengine.close_file = fio_ceph_msgr_close_file; | |
692 | ioengine.io_u_init = fio_ceph_msgr_io_u_init; | |
693 | ioengine.io_u_free = fio_ceph_msgr_io_u_free; | |
694 | ioengine.option_struct_size = sizeof(struct ceph_msgr_options); | |
695 | ioengine.options = options.data(); | |
696 | ||
697 | *ioengine_ptr = &ioengine; | |
698 | } | |
699 | } // extern "C" |