]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/fio/fio_ceph_messenger.cc
cab2d3db698d03aef55b0f78435d2499c87e0098
[ceph.git] / ceph / src / test / fio / fio_ceph_messenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * CEPH messenger engine
5 *
6 * FIO engine which uses ceph messenger as a transport. See corresponding
7 * FIO client and server jobs for details.
8 */
9
10 #include "global/global_init.h"
11 #include "msg/Messenger.h"
12 #include "messages/MOSDOp.h"
13 #include "messages/MOSDOpReply.h"
14 #include "common/perf_counters.h"
15 #include "auth/DummyAuth.h"
16 #include "ring_buffer.h"
17
18 #include <fio.h>
19 #include <flist.h>
20 #include <optgroup.h>
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_
24
25 using namespace std;
26
27 enum ceph_msgr_type {
28 CEPH_MSGR_TYPE_UNDEF,
29 CEPH_MSGR_TYPE_POSIX,
30 CEPH_MSGR_TYPE_DPDK,
31 CEPH_MSGR_TYPE_RDMA,
32 };
33
34 const char *ceph_msgr_types[] = { "undef", "async+posix",
35 "async+dpdk", "async+rdma" };
36
37 struct ceph_msgr_options {
38 struct thread_data *td__;
39 unsigned int is_receiver;
40 unsigned int is_single;
41 unsigned int port;
42 const char *hostname;
43 const char *conffile;
44 enum ceph_msgr_type ms_type;
45 };
46
47 class FioDispatcher;
48
49 struct ceph_msgr_data {
50 ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) :
51 o(o_) {
52 INIT_FLIST_HEAD(&io_inflight_list);
53 INIT_FLIST_HEAD(&io_pending_list);
54 ring_buffer_init(&io_completed_q, iodepth);
55 pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
56 }
57
58 struct ceph_msgr_options *o;
59 Messenger *msgr = NULL;
60 FioDispatcher *disp = NULL;
61 pthread_spinlock_t spin;
62 struct ring_buffer io_completed_q;
63 struct flist_head io_inflight_list;
64 struct flist_head io_pending_list;
65 unsigned int io_inflight_nr = 0;
66 unsigned int io_pending_nr = 0;
67 };
68
69 struct ceph_msgr_io {
70 struct flist_head list;
71 struct ceph_msgr_data *data;
72 struct io_u *io_u;
73 MOSDOp *req_msg; /** Cached request, valid only for sender */
74 };
75
76 struct ceph_msgr_reply_io {
77 struct flist_head list;
78 MOSDOpReply *rep;
79 };
80
81 static void *str_to_ptr(const std::string &str)
82 {
83 return (void *)strtoul(str.c_str(), NULL, 16);
84 }
85
86 static std::string ptr_to_str(void *ptr)
87 {
88 char buf[32];
89
90 snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr);
91 return std::string(buf);
92 }
93
94 /*
95 * Used for refcounters print on the last context put, almost duplicates
96 * global context refcounter, sigh.
97 */
98 static std::atomic<int> ctx_ref(1);
99 static DummyAuthClientServer *g_dummy_auth;
100
101 static void create_or_get_ceph_context(struct ceph_msgr_options *o)
102 {
103 if (g_ceph_context) {
104 g_ceph_context->get();
105 ctx_ref++;
106 return;
107 }
108
109 boost::intrusive_ptr<CephContext> cct;
110 vector<const char*> args;
111
112 if (o->conffile)
113 args = { "--conf", o->conffile };
114
115 cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
116 CODE_ENVIRONMENT_UTILITY,
117 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
118 /* Will use g_ceph_context instead */
119 cct.detach();
120
121 common_init_finish(g_ceph_context);
122 g_ceph_context->_conf.apply_changes(NULL);
123 g_dummy_auth = new DummyAuthClientServer(g_ceph_context);
124 g_dummy_auth->auth_registry.refresh_config();
125 }
126
127 static void put_ceph_context(void)
128 {
129 if (--ctx_ref == 0) {
130 ostringstream ostr;
131 Formatter* f;
132
133 f = Formatter::create("json-pretty");
134 g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false);
135 ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl;
136 f->flush(ostr);
137 ostr << ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl;
138
139 delete f;
140 delete g_dummy_auth;
141 dout(0) << ostr.str() << dendl;
142 }
143
144 g_ceph_context->put();
145 }
146
147 static void ceph_msgr_sender_on_reply(const object_t &oid)
148 {
149 struct ceph_msgr_data *data;
150 struct ceph_msgr_io *io;
151
152 /*
153 * Here we abuse object and use it as a raw pointer. Since this is
154 * only for benchmarks and testing we do not care about anything
155 * but performance. So no need to use global structure in order
156 * to search for reply, just send a pointer and get it back.
157 */
158
159 io = (decltype(io))str_to_ptr(oid.name);
160 data = io->data;
161 ring_buffer_enqueue(&data->io_completed_q, (void *)io);
162 }
163
164
165 class ReplyCompletion : public Message::CompletionHook {
166 struct ceph_msgr_io *m_io;
167
168 public:
169 ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) :
170 Message::CompletionHook(rep),
171 m_io(io) {
172 }
173 void finish(int err) override {
174 struct ceph_msgr_data *data = m_io->data;
175
176 ring_buffer_enqueue(&data->io_completed_q, (void *)m_io);
177 }
178 };
179
180 static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data,
181 MOSDOp *req)
182 {
183 MOSDOpReply *rep;
184
185 rep = new MOSDOpReply(req, 0, 0, 0, false);
186 rep->set_connection(req->get_connection());
187
188 pthread_spin_lock(&data->spin);
189 if (data->io_inflight_nr) {
190 struct ceph_msgr_io *io;
191
192 data->io_inflight_nr--;
193 io = flist_first_entry(&data->io_inflight_list,
194 struct ceph_msgr_io, list);
195 flist_del(&io->list);
196 pthread_spin_unlock(&data->spin);
197
198 rep->set_completion_hook(new ReplyCompletion(rep, io));
199 rep->get_connection()->send_message(rep);
200 } else {
201 struct ceph_msgr_reply_io *rep_io;
202
203 rep_io = (decltype(rep_io))malloc(sizeof(*rep_io));
204 rep_io->rep = rep;
205
206 data->io_pending_nr++;
207 flist_add_tail(&rep_io->list, &data->io_pending_list);
208 pthread_spin_unlock(&data->spin);
209 }
210 }
211
212 class FioDispatcher : public Dispatcher {
213 struct ceph_msgr_data *m_data;
214
215 public:
216 FioDispatcher(struct ceph_msgr_data *data):
217 Dispatcher(g_ceph_context),
218 m_data(data) {
219 }
220 bool ms_can_fast_dispatch_any() const override {
221 return true;
222 }
223 bool ms_can_fast_dispatch(const Message *m) const override {
224 switch (m->get_type()) {
225 case CEPH_MSG_OSD_OP:
226 return m_data->o->is_receiver;
227 case CEPH_MSG_OSD_OPREPLY:
228 return !m_data->o->is_receiver;
229 default:
230 return false;
231 }
232 }
233 void ms_handle_fast_connect(Connection *con) override {
234 }
235 void ms_handle_fast_accept(Connection *con) override {
236 }
237 bool ms_dispatch(Message *m) override {
238 return true;
239 }
240 void ms_fast_dispatch(Message *m) override {
241 if (m_data->o->is_receiver) {
242 MOSDOp *req;
243
244 /*
245 * Server side, handle request.
246 */
247
248 req = static_cast<MOSDOp*>(m);
249 req->finish_decode();
250
251 ceph_msgr_receiver_on_request(m_data, req);
252 } else {
253 MOSDOpReply *rep;
254
255 /*
256 * Client side, get reply, extract objid and mark
257 * IO as completed.
258 */
259
260 rep = static_cast<MOSDOpReply*>(m);
261 ceph_msgr_sender_on_reply(rep->get_oid());
262 }
263 m->put();
264 }
265 bool ms_handle_reset(Connection *con) override {
266 return true;
267 }
268 void ms_handle_remote_reset(Connection *con) override {
269 }
270 bool ms_handle_refused(Connection *con) override {
271 return false;
272 }
273 int ms_handle_authentication(Connection *con) override {
274 return 1;
275 }
276 };
277
278 static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o)
279 {
280 entity_addr_t addr;
281
282 addr.parse(o->hostname);
283 addr.set_port(o->port);
284 addr.set_nonce(0);
285
286 return addr;
287 }
288
289 static Messenger *create_messenger(struct ceph_msgr_options *o)
290 {
291 entity_name_t ename = o->is_receiver ?
292 entity_name_t::OSD(0) : entity_name_t::CLIENT(0);
293 std::string lname = o->is_receiver ?
294 "receiver" : "sender";
295
296 std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ?
297 ceph_msgr_types[o->ms_type] :
298 g_ceph_context->_conf.get_val<std::string>("ms_type");
299
300 /* o->td__>pid doesn't set value, so use getpid() instead*/
301 auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number);
302 Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(),
303 ename, lname, nonce);
304 if (o->is_receiver) {
305 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
306 msgr->bind(hostname_to_addr(o));
307 } else {
308 msgr->set_default_policy(Messenger::Policy::lossless_client(0));
309 }
310 msgr->set_auth_client(g_dummy_auth);
311 msgr->set_auth_server(g_dummy_auth);
312 msgr->set_require_authorizer(false);
313 msgr->start();
314
315 return msgr;
316 }
317
318 static Messenger *single_msgr;
319 static std::atomic<int> single_msgr_ref;
320 static vector<FioDispatcher *> single_msgr_disps;
321
322 static void init_messenger(struct ceph_msgr_data *data)
323 {
324 struct ceph_msgr_options *o = data->o;
325 FioDispatcher *disp;
326 Messenger *msgr;
327
328 disp = new FioDispatcher(data);
329 if (o->is_single) {
330 /*
331 * Single messenger instance for the whole FIO
332 */
333
334 if (!single_msgr) {
335 msgr = create_messenger(o);
336 single_msgr = msgr;
337 } else {
338 msgr = single_msgr;
339 }
340 single_msgr_disps.push_back(disp);
341 single_msgr_ref++;
342 } else {
343 /*
344 * Messenger instance per FIO thread
345 */
346 msgr = create_messenger(o);
347 }
348 msgr->add_dispatcher_head(disp);
349
350 data->disp = disp;
351 data->msgr = msgr;
352 }
353
354 static void free_messenger(struct ceph_msgr_data *data)
355 {
356 data->msgr->shutdown();
357 data->msgr->wait();
358 delete data->msgr;
359 }
360
361 static void put_messenger(struct ceph_msgr_data *data)
362 {
363 struct ceph_msgr_options *o = data->o;
364
365 if (o->is_single) {
366 if (--single_msgr_ref == 0) {
367 free_messenger(data);
368 /*
369 * In case of a single messenger instance we have to
370 * free dispatchers after actual messenger destruction.
371 */
372 for (auto disp : single_msgr_disps)
373 delete disp;
374 single_msgr = NULL;
375 }
376 } else {
377 free_messenger(data);
378 delete data->disp;
379 }
380 data->disp = NULL;
381 data->msgr = NULL;
382 }
383
384 static int fio_ceph_msgr_setup(struct thread_data *td)
385 {
386 struct ceph_msgr_options *o = (decltype(o))td->eo;
387 o->td__ = td;
388 ceph_msgr_data *data;
389
390 /* We have to manage global resources so we use threads */
391 td->o.use_thread = 1;
392
393 create_or_get_ceph_context(o);
394
395 if (!td->io_ops_data) {
396 data = new ceph_msgr_data(o, td->o.iodepth);
397 init_messenger(data);
398 td->io_ops_data = (void *)data;
399 }
400
401 return 0;
402 }
403
404 static void fio_ceph_msgr_cleanup(struct thread_data *td)
405 {
406 struct ceph_msgr_data *data;
407 unsigned nr;
408
409 data = (decltype(data))td->io_ops_data;
410 put_messenger(data);
411
412 nr = ring_buffer_used_size(&data->io_completed_q);
413 if (nr)
414 fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n",
415 nr);
416 if (data->io_inflight_nr)
417 fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n",
418 data->io_inflight_nr);
419 if (data->io_pending_nr)
420 fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n",
421 data->io_pending_nr);
422 if (!flist_empty(&data->io_inflight_list))
423 fprintf(stderr, "fio: io_inflight_list is not empty\n");
424 if (!flist_empty(&data->io_pending_list))
425 fprintf(stderr, "fio: io_pending_list is not empty\n");
426
427 ring_buffer_deinit(&data->io_completed_q);
428 delete data;
429 put_ceph_context();
430 }
431
432 static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u)
433 {
434 struct ceph_msgr_options *o = (decltype(o))td->eo;
435 struct ceph_msgr_io *io;
436 MOSDOp *req_msg = NULL;
437
438 io = (decltype(io))malloc(sizeof(*io));
439 io->io_u = io_u;
440 io->data = (decltype(io->data))td->io_ops_data;
441
442 if (!o->is_receiver) {
443 object_t oid(ptr_to_str(io));
444 pg_t pgid;
445 object_locator_t oloc;
446 hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(),
447 pgid.pool(), oloc.nspace);
448 spg_t spgid(pgid);
449 entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o));
450
451 Messenger *msgr = io->data->msgr;
452 ConnectionRef con = msgr->connect_to(dest.name.type(),
453 entity_addrvec_t(dest.addr));
454
455 req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
456 req_msg->set_connection(con);
457 }
458
459 io->req_msg = req_msg;
460 io_u->engine_data = (void *)io;
461
462 return 0;
463 }
464
465 static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u)
466 {
467 struct ceph_msgr_io *io;
468
469 io = (decltype(io))io_u->engine_data;
470 if (io) {
471 io_u->engine_data = NULL;
472 if (io->req_msg)
473 io->req_msg->put();
474 free(io);
475 }
476 }
477
478 static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td,
479 struct io_u *io_u)
480 {
481 struct ceph_msgr_data *data;
482 struct ceph_msgr_io *io;
483
484 bufferlist buflist = bufferlist::static_from_mem(
485 (char *)io_u->buf, io_u->buflen);
486
487 io = (decltype(io))io_u->engine_data;
488 data = (decltype(data))td->io_ops_data;
489
490 /* No handy method to clear ops before reusage? Ok */
491 io->req_msg->ops.clear();
492
493 /* Here we do not care about direction, always send as write */
494 io->req_msg->write(0, io_u->buflen, buflist);
495 /* Keep message alive */
496 io->req_msg->get();
497 io->req_msg->get_connection()->send_message(io->req_msg);
498
499 return FIO_Q_QUEUED;
500 }
501
502 static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min,
503 unsigned int max, const struct timespec *ts)
504 {
505 struct ceph_msgr_data *data;
506 unsigned int nr;
507
508 data = (decltype(data))td->io_ops_data;
509
510 /*
511 * Check io_u.c : if min == 0 -> ts is valid and equal to zero,
512 * if min != 0 -> ts is NULL.
513 */
514 assert(!min ^ !ts);
515
516 nr = ring_buffer_used_size(&data->io_completed_q);
517 if (nr >= min)
518 /* We got something */
519 return min(nr, max);
520
521 /* Here we are only if min != 0 and ts == NULL */
522 assert(min && !ts);
523
524 while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min &&
525 !td->terminate) {
526 /* Poll, no disk IO, so we expect response immediately. */
527 usleep(10);
528 }
529
530 return min(nr, max);
531 }
532
533 static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event)
534 {
535 struct ceph_msgr_data *data;
536 struct ceph_msgr_io *io;
537
538 data = (decltype(data))td->io_ops_data;
539 io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q);
540
541 return io->io_u;
542 }
543
544 static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td,
545 struct io_u *io_u)
546 {
547 struct ceph_msgr_data *data;
548 struct ceph_msgr_io *io;
549
550 io = (decltype(io))io_u->engine_data;
551 data = io->data;
552 pthread_spin_lock(&data->spin);
553 if (data->io_pending_nr) {
554 struct ceph_msgr_reply_io *rep_io;
555 MOSDOpReply *rep;
556
557 data->io_pending_nr--;
558 rep_io = flist_first_entry(&data->io_pending_list,
559 struct ceph_msgr_reply_io,
560 list);
561 flist_del(&rep_io->list);
562 rep = rep_io->rep;
563 pthread_spin_unlock(&data->spin);
564 free(rep_io);
565
566 rep->set_completion_hook(new ReplyCompletion(rep, io));
567 rep->get_connection()->send_message(rep);
568 } else {
569 data->io_inflight_nr++;
570 flist_add_tail(&io->list, &data->io_inflight_list);
571 pthread_spin_unlock(&data->spin);
572 }
573
574 return FIO_Q_QUEUED;
575 }
576
577 static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td,
578 struct io_u *io_u)
579 {
580 struct ceph_msgr_options *o = (decltype(o))td->eo;
581
582 if (o->is_receiver)
583 return ceph_msgr_receiver_queue(td, io_u);
584 else
585 return ceph_msgr_sender_queue(td, io_u);
586 }
587
588 static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f)
589 {
590 return 0;
591 }
592
593 static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *)
594 {
595 return 0;
596 }
597
598 template <class Func>
599 fio_option make_option(Func&& func)
600 {
601 auto o = fio_option{};
602 o.category = FIO_OPT_C_ENGINE;
603 func(std::ref(o));
604 return o;
605 }
606
607 static std::vector<fio_option> options {
608 make_option([] (fio_option& o) {
609 o.name = "receiver";
610 o.lname = "CEPH messenger is receiver";
611 o.type = FIO_OPT_BOOL;
612 o.off1 = offsetof(struct ceph_msgr_options, is_receiver);
613 o.help = "CEPH messenger is sender or receiver";
614 o.def = "0";
615 }),
616 make_option([] (fio_option& o) {
617 o.name = "single_instance";
618 o.lname = "Single instance of CEPH messenger ";
619 o.type = FIO_OPT_BOOL;
620 o.off1 = offsetof(struct ceph_msgr_options, is_single);
621 o.help = "CEPH messenger is a created once for all threads";
622 o.def = "0";
623 }),
624 make_option([] (fio_option& o) {
625 o.name = "hostname";
626 o.lname = "CEPH messenger hostname";
627 o.type = FIO_OPT_STR_STORE;
628 o.off1 = offsetof(struct ceph_msgr_options, hostname);
629 o.help = "Hostname for CEPH messenger engine";
630 }),
631 make_option([] (fio_option& o) {
632 o.name = "port";
633 o.lname = "CEPH messenger engine port";
634 o.type = FIO_OPT_INT;
635 o.off1 = offsetof(struct ceph_msgr_options, port);
636 o.maxval = 65535;
637 o.minval = 1;
638 o.help = "Port to use for CEPH messenger";
639 }),
640 make_option([] (fio_option& o) {
641 o.name = "ms_type";
642 o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma";
643 o.type = FIO_OPT_STR;
644 o.off1 = offsetof(struct ceph_msgr_options, ms_type);
645 o.help = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page";
646 o.def = "undef";
647
648 o.posval[0].ival = "undef";
649 o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF;
650
651 o.posval[1].ival = "async+posix";
652 o.posval[1].oval = CEPH_MSGR_TYPE_POSIX;
653 o.posval[1].help = "POSIX API";
654
655 o.posval[2].ival = "async+dpdk";
656 o.posval[2].oval = CEPH_MSGR_TYPE_DPDK;
657 o.posval[2].help = "DPDK";
658
659 o.posval[3].ival = "async+rdma";
660 o.posval[3].oval = CEPH_MSGR_TYPE_RDMA;
661 o.posval[3].help = "RDMA";
662 }),
663 make_option([] (fio_option& o) {
664 o.name = "ceph_conf_file";
665 o.lname = "CEPH configuration file";
666 o.type = FIO_OPT_STR_STORE;
667 o.off1 = offsetof(struct ceph_msgr_options, conffile);
668 o.help = "Path to CEPH configuration file";
669 }),
670 {} /* Last NULL */
671 };
672
673 static struct ioengine_ops ioengine;
674
675 extern "C" {
676
677 void get_ioengine(struct ioengine_ops** ioengine_ptr)
678 {
679 /*
680 * Main ioengine structure
681 */
682 ioengine.name = "ceph-msgr";
683 ioengine.version = FIO_IOOPS_VERSION;
684 ioengine.flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO;
685 ioengine.setup = fio_ceph_msgr_setup;
686 ioengine.queue = fio_ceph_msgr_queue;
687 ioengine.getevents = fio_ceph_msgr_getevents;
688 ioengine.event = fio_ceph_msgr_event;
689 ioengine.cleanup = fio_ceph_msgr_cleanup;
690 ioengine.open_file = fio_ceph_msgr_open_file;
691 ioengine.close_file = fio_ceph_msgr_close_file;
692 ioengine.io_u_init = fio_ceph_msgr_io_u_init;
693 ioengine.io_u_free = fio_ceph_msgr_io_u_free;
694 ioengine.option_struct_size = sizeof(struct ceph_msgr_options);
695 ioengine.options = options.data();
696
697 *ioengine_ptr = &ioengine;
698 }
699 } // extern "C"