]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/fio/fio_ceph_messenger.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / fio / fio_ceph_messenger.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * CEPH messenger engine
5 *
6 * FIO engine which uses ceph messenger as a transport. See corresponding
7 * FIO client and server jobs for details.
8 */
9
10#include "global/global_init.h"
11#include "msg/Messenger.h"
12#include "messages/MOSDOp.h"
13#include "messages/MOSDOpReply.h"
14#include "common/perf_counters.h"
15#include "auth/DummyAuth.h"
16#include "ring_buffer.h"
17
18#include <fio.h>
19#include <flist.h>
20#include <optgroup.h>
21
22#define dout_context g_ceph_context
23#define dout_subsys ceph_subsys_
24
20effc67
TL
25using namespace std;
26
9f95a23c
TL
27enum ceph_msgr_type {
28 CEPH_MSGR_TYPE_UNDEF,
29 CEPH_MSGR_TYPE_POSIX,
30 CEPH_MSGR_TYPE_DPDK,
31 CEPH_MSGR_TYPE_RDMA,
32};
33
34const char *ceph_msgr_types[] = { "undef", "async+posix",
35 "async+dpdk", "async+rdma" };
36
37struct ceph_msgr_options {
38 struct thread_data *td__;
39 unsigned int is_receiver;
40 unsigned int is_single;
41 unsigned int port;
42 const char *hostname;
43 const char *conffile;
44 enum ceph_msgr_type ms_type;
45};
46
47class FioDispatcher;
48
49struct ceph_msgr_data {
50 ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) :
51 o(o_) {
52 INIT_FLIST_HEAD(&io_inflight_list);
53 INIT_FLIST_HEAD(&io_pending_list);
54 ring_buffer_init(&io_completed_q, iodepth);
55 pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
56 }
57
58 struct ceph_msgr_options *o;
59 Messenger *msgr = NULL;
60 FioDispatcher *disp = NULL;
61 pthread_spinlock_t spin;
62 struct ring_buffer io_completed_q;
63 struct flist_head io_inflight_list;
64 struct flist_head io_pending_list;
65 unsigned int io_inflight_nr = 0;
66 unsigned int io_pending_nr = 0;
67};
68
69struct ceph_msgr_io {
70 struct flist_head list;
71 struct ceph_msgr_data *data;
72 struct io_u *io_u;
73 MOSDOp *req_msg; /** Cached request, valid only for sender */
74};
75
76struct ceph_msgr_reply_io {
77 struct flist_head list;
78 MOSDOpReply *rep;
79};
80
81static void *str_to_ptr(const std::string &str)
82{
1e59de90
TL
83 // str is assumed to be a valid ptr string
84 return reinterpret_cast<void*>(ceph::parse<uintptr_t>(str, 16).value());
9f95a23c
TL
85}
86
87static std::string ptr_to_str(void *ptr)
88{
89 char buf[32];
90
91 snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr);
92 return std::string(buf);
93}
94
95/*
96 * Used for refcounters print on the last context put, almost duplicates
97 * global context refcounter, sigh.
98 */
99static std::atomic<int> ctx_ref(1);
100static DummyAuthClientServer *g_dummy_auth;
101
102static void create_or_get_ceph_context(struct ceph_msgr_options *o)
103{
104 if (g_ceph_context) {
105 g_ceph_context->get();
106 ctx_ref++;
107 return;
108 }
109
110 boost::intrusive_ptr<CephContext> cct;
111 vector<const char*> args;
112
113 if (o->conffile)
114 args = { "--conf", o->conffile };
115
116 cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
117 CODE_ENVIRONMENT_UTILITY,
118 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
119 /* Will use g_ceph_context instead */
120 cct.detach();
121
122 common_init_finish(g_ceph_context);
123 g_ceph_context->_conf.apply_changes(NULL);
124 g_dummy_auth = new DummyAuthClientServer(g_ceph_context);
125 g_dummy_auth->auth_registry.refresh_config();
126}
127
128static void put_ceph_context(void)
129{
130 if (--ctx_ref == 0) {
131 ostringstream ostr;
132 Formatter* f;
133
134 f = Formatter::create("json-pretty");
1e59de90 135 g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false, false);
9f95a23c
TL
136 ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl;
137 f->flush(ostr);
138 ostr << ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl;
139
140 delete f;
141 delete g_dummy_auth;
142 dout(0) << ostr.str() << dendl;
143 }
144
145 g_ceph_context->put();
146}
147
148static void ceph_msgr_sender_on_reply(const object_t &oid)
149{
150 struct ceph_msgr_data *data;
151 struct ceph_msgr_io *io;
152
153 /*
154 * Here we abuse object and use it as a raw pointer. Since this is
155 * only for benchmarks and testing we do not care about anything
156 * but performance. So no need to use global structure in order
157 * to search for reply, just send a pointer and get it back.
158 */
159
160 io = (decltype(io))str_to_ptr(oid.name);
161 data = io->data;
162 ring_buffer_enqueue(&data->io_completed_q, (void *)io);
163}
164
165
166class ReplyCompletion : public Message::CompletionHook {
167 struct ceph_msgr_io *m_io;
168
169public:
170 ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) :
171 Message::CompletionHook(rep),
172 m_io(io) {
173 }
174 void finish(int err) override {
175 struct ceph_msgr_data *data = m_io->data;
176
177 ring_buffer_enqueue(&data->io_completed_q, (void *)m_io);
178 }
179};
180
181static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data,
182 MOSDOp *req)
183{
184 MOSDOpReply *rep;
185
186 rep = new MOSDOpReply(req, 0, 0, 0, false);
187 rep->set_connection(req->get_connection());
188
189 pthread_spin_lock(&data->spin);
190 if (data->io_inflight_nr) {
191 struct ceph_msgr_io *io;
192
193 data->io_inflight_nr--;
194 io = flist_first_entry(&data->io_inflight_list,
195 struct ceph_msgr_io, list);
196 flist_del(&io->list);
197 pthread_spin_unlock(&data->spin);
198
199 rep->set_completion_hook(new ReplyCompletion(rep, io));
200 rep->get_connection()->send_message(rep);
201 } else {
202 struct ceph_msgr_reply_io *rep_io;
203
204 rep_io = (decltype(rep_io))malloc(sizeof(*rep_io));
205 rep_io->rep = rep;
206
207 data->io_pending_nr++;
208 flist_add_tail(&rep_io->list, &data->io_pending_list);
209 pthread_spin_unlock(&data->spin);
210 }
211}
212
213class FioDispatcher : public Dispatcher {
214 struct ceph_msgr_data *m_data;
215
216public:
217 FioDispatcher(struct ceph_msgr_data *data):
218 Dispatcher(g_ceph_context),
219 m_data(data) {
220 }
221 bool ms_can_fast_dispatch_any() const override {
222 return true;
223 }
224 bool ms_can_fast_dispatch(const Message *m) const override {
225 switch (m->get_type()) {
226 case CEPH_MSG_OSD_OP:
227 return m_data->o->is_receiver;
228 case CEPH_MSG_OSD_OPREPLY:
229 return !m_data->o->is_receiver;
230 default:
231 return false;
232 }
233 }
234 void ms_handle_fast_connect(Connection *con) override {
235 }
236 void ms_handle_fast_accept(Connection *con) override {
237 }
238 bool ms_dispatch(Message *m) override {
239 return true;
240 }
241 void ms_fast_dispatch(Message *m) override {
242 if (m_data->o->is_receiver) {
243 MOSDOp *req;
244
245 /*
246 * Server side, handle request.
247 */
248
249 req = static_cast<MOSDOp*>(m);
250 req->finish_decode();
251
252 ceph_msgr_receiver_on_request(m_data, req);
253 } else {
254 MOSDOpReply *rep;
255
256 /*
257 * Client side, get reply, extract objid and mark
258 * IO as completed.
259 */
260
261 rep = static_cast<MOSDOpReply*>(m);
262 ceph_msgr_sender_on_reply(rep->get_oid());
263 }
264 m->put();
265 }
266 bool ms_handle_reset(Connection *con) override {
267 return true;
268 }
269 void ms_handle_remote_reset(Connection *con) override {
270 }
271 bool ms_handle_refused(Connection *con) override {
272 return false;
273 }
aee94f69 274 int ms_handle_fast_authentication(Connection *con) override {
9f95a23c
TL
275 return 1;
276 }
277};
278
279static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o)
280{
281 entity_addr_t addr;
282
283 addr.parse(o->hostname);
284 addr.set_port(o->port);
285 addr.set_nonce(0);
286
287 return addr;
288}
289
290static Messenger *create_messenger(struct ceph_msgr_options *o)
291{
292 entity_name_t ename = o->is_receiver ?
293 entity_name_t::OSD(0) : entity_name_t::CLIENT(0);
294 std::string lname = o->is_receiver ?
295 "receiver" : "sender";
296
9f95a23c
TL
297 std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ?
298 ceph_msgr_types[o->ms_type] :
299 g_ceph_context->_conf.get_val<std::string>("ms_type");
300
301 /* o->td__>pid doesn't set value, so use getpid() instead*/
302 auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number);
303 Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(),
f67539c2 304 ename, lname, nonce);
9f95a23c
TL
305 if (o->is_receiver) {
306 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
307 msgr->bind(hostname_to_addr(o));
308 } else {
309 msgr->set_default_policy(Messenger::Policy::lossless_client(0));
310 }
311 msgr->set_auth_client(g_dummy_auth);
312 msgr->set_auth_server(g_dummy_auth);
313 msgr->set_require_authorizer(false);
314 msgr->start();
315
316 return msgr;
317}
318
319static Messenger *single_msgr;
320static std::atomic<int> single_msgr_ref;
321static vector<FioDispatcher *> single_msgr_disps;
322
323static void init_messenger(struct ceph_msgr_data *data)
324{
325 struct ceph_msgr_options *o = data->o;
326 FioDispatcher *disp;
327 Messenger *msgr;
328
329 disp = new FioDispatcher(data);
330 if (o->is_single) {
331 /*
332 * Single messenger instance for the whole FIO
333 */
334
335 if (!single_msgr) {
336 msgr = create_messenger(o);
337 single_msgr = msgr;
338 } else {
339 msgr = single_msgr;
340 }
341 single_msgr_disps.push_back(disp);
342 single_msgr_ref++;
343 } else {
344 /*
345 * Messenger instance per FIO thread
346 */
347 msgr = create_messenger(o);
348 }
349 msgr->add_dispatcher_head(disp);
350
351 data->disp = disp;
352 data->msgr = msgr;
353}
354
355static void free_messenger(struct ceph_msgr_data *data)
356{
357 data->msgr->shutdown();
358 data->msgr->wait();
359 delete data->msgr;
360}
361
362static void put_messenger(struct ceph_msgr_data *data)
363{
364 struct ceph_msgr_options *o = data->o;
365
366 if (o->is_single) {
367 if (--single_msgr_ref == 0) {
368 free_messenger(data);
369 /*
370 * In case of a single messenger instance we have to
371 * free dispatchers after actual messenger destruction.
372 */
373 for (auto disp : single_msgr_disps)
374 delete disp;
375 single_msgr = NULL;
376 }
377 } else {
378 free_messenger(data);
379 delete data->disp;
380 }
381 data->disp = NULL;
382 data->msgr = NULL;
383}
384
385static int fio_ceph_msgr_setup(struct thread_data *td)
386{
387 struct ceph_msgr_options *o = (decltype(o))td->eo;
388 o->td__ = td;
389 ceph_msgr_data *data;
390
391 /* We have to manage global resources so we use threads */
392 td->o.use_thread = 1;
393
394 create_or_get_ceph_context(o);
395
396 if (!td->io_ops_data) {
397 data = new ceph_msgr_data(o, td->o.iodepth);
398 init_messenger(data);
399 td->io_ops_data = (void *)data;
400 }
401
402 return 0;
403}
404
405static void fio_ceph_msgr_cleanup(struct thread_data *td)
406{
407 struct ceph_msgr_data *data;
408 unsigned nr;
409
410 data = (decltype(data))td->io_ops_data;
411 put_messenger(data);
412
413 nr = ring_buffer_used_size(&data->io_completed_q);
414 if (nr)
415 fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n",
416 nr);
417 if (data->io_inflight_nr)
418 fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n",
419 data->io_inflight_nr);
420 if (data->io_pending_nr)
421 fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n",
422 data->io_pending_nr);
423 if (!flist_empty(&data->io_inflight_list))
424 fprintf(stderr, "fio: io_inflight_list is not empty\n");
425 if (!flist_empty(&data->io_pending_list))
426 fprintf(stderr, "fio: io_pending_list is not empty\n");
427
428 ring_buffer_deinit(&data->io_completed_q);
429 delete data;
430 put_ceph_context();
431}
432
433static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u)
434{
435 struct ceph_msgr_options *o = (decltype(o))td->eo;
436 struct ceph_msgr_io *io;
437 MOSDOp *req_msg = NULL;
438
439 io = (decltype(io))malloc(sizeof(*io));
440 io->io_u = io_u;
441 io->data = (decltype(io->data))td->io_ops_data;
442
443 if (!o->is_receiver) {
444 object_t oid(ptr_to_str(io));
445 pg_t pgid;
446 object_locator_t oloc;
447 hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(),
448 pgid.pool(), oloc.nspace);
449 spg_t spgid(pgid);
450 entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o));
451
452 Messenger *msgr = io->data->msgr;
453 ConnectionRef con = msgr->connect_to(dest.name.type(),
454 entity_addrvec_t(dest.addr));
455
456 req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
457 req_msg->set_connection(con);
458 }
459
460 io->req_msg = req_msg;
461 io_u->engine_data = (void *)io;
462
463 return 0;
464}
465
466static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u)
467{
468 struct ceph_msgr_io *io;
469
470 io = (decltype(io))io_u->engine_data;
471 if (io) {
472 io_u->engine_data = NULL;
473 if (io->req_msg)
474 io->req_msg->put();
475 free(io);
476 }
477}
478
479static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td,
480 struct io_u *io_u)
481{
482 struct ceph_msgr_data *data;
483 struct ceph_msgr_io *io;
484
485 bufferlist buflist = bufferlist::static_from_mem(
486 (char *)io_u->buf, io_u->buflen);
487
488 io = (decltype(io))io_u->engine_data;
489 data = (decltype(data))td->io_ops_data;
490
491 /* No handy method to clear ops before reusage? Ok */
492 io->req_msg->ops.clear();
493
494 /* Here we do not care about direction, always send as write */
495 io->req_msg->write(0, io_u->buflen, buflist);
496 /* Keep message alive */
497 io->req_msg->get();
498 io->req_msg->get_connection()->send_message(io->req_msg);
499
500 return FIO_Q_QUEUED;
501}
502
503static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min,
504 unsigned int max, const struct timespec *ts)
505{
506 struct ceph_msgr_data *data;
507 unsigned int nr;
508
509 data = (decltype(data))td->io_ops_data;
510
511 /*
512 * Check io_u.c : if min == 0 -> ts is valid and equal to zero,
513 * if min != 0 -> ts is NULL.
514 */
515 assert(!min ^ !ts);
516
517 nr = ring_buffer_used_size(&data->io_completed_q);
518 if (nr >= min)
519 /* We got something */
520 return min(nr, max);
521
522 /* Here we are only if min != 0 and ts == NULL */
523 assert(min && !ts);
524
525 while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min &&
526 !td->terminate) {
527 /* Poll, no disk IO, so we expect response immediately. */
528 usleep(10);
529 }
530
531 return min(nr, max);
532}
533
534static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event)
535{
536 struct ceph_msgr_data *data;
537 struct ceph_msgr_io *io;
538
539 data = (decltype(data))td->io_ops_data;
540 io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q);
541
542 return io->io_u;
543}
544
545static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td,
546 struct io_u *io_u)
547{
548 struct ceph_msgr_data *data;
549 struct ceph_msgr_io *io;
550
551 io = (decltype(io))io_u->engine_data;
552 data = io->data;
553 pthread_spin_lock(&data->spin);
554 if (data->io_pending_nr) {
555 struct ceph_msgr_reply_io *rep_io;
556 MOSDOpReply *rep;
557
558 data->io_pending_nr--;
559 rep_io = flist_first_entry(&data->io_pending_list,
560 struct ceph_msgr_reply_io,
561 list);
562 flist_del(&rep_io->list);
563 rep = rep_io->rep;
564 pthread_spin_unlock(&data->spin);
565 free(rep_io);
566
567 rep->set_completion_hook(new ReplyCompletion(rep, io));
568 rep->get_connection()->send_message(rep);
569 } else {
570 data->io_inflight_nr++;
571 flist_add_tail(&io->list, &data->io_inflight_list);
572 pthread_spin_unlock(&data->spin);
573 }
574
575 return FIO_Q_QUEUED;
576}
577
578static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td,
579 struct io_u *io_u)
580{
581 struct ceph_msgr_options *o = (decltype(o))td->eo;
582
583 if (o->is_receiver)
584 return ceph_msgr_receiver_queue(td, io_u);
585 else
586 return ceph_msgr_sender_queue(td, io_u);
587}
588
589static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f)
590{
591 return 0;
592}
593
594static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *)
595{
596 return 0;
597}
598
599template <class Func>
600fio_option make_option(Func&& func)
601{
602 auto o = fio_option{};
603 o.category = FIO_OPT_C_ENGINE;
604 func(std::ref(o));
605 return o;
606}
607
608static std::vector<fio_option> options {
609 make_option([] (fio_option& o) {
610 o.name = "receiver";
611 o.lname = "CEPH messenger is receiver";
612 o.type = FIO_OPT_BOOL;
613 o.off1 = offsetof(struct ceph_msgr_options, is_receiver);
614 o.help = "CEPH messenger is sender or receiver";
615 o.def = "0";
616 }),
617 make_option([] (fio_option& o) {
618 o.name = "single_instance";
619 o.lname = "Single instance of CEPH messenger ";
620 o.type = FIO_OPT_BOOL;
621 o.off1 = offsetof(struct ceph_msgr_options, is_single);
622 o.help = "CEPH messenger is a created once for all threads";
623 o.def = "0";
624 }),
625 make_option([] (fio_option& o) {
626 o.name = "hostname";
627 o.lname = "CEPH messenger hostname";
628 o.type = FIO_OPT_STR_STORE;
629 o.off1 = offsetof(struct ceph_msgr_options, hostname);
630 o.help = "Hostname for CEPH messenger engine";
631 }),
632 make_option([] (fio_option& o) {
633 o.name = "port";
634 o.lname = "CEPH messenger engine port";
635 o.type = FIO_OPT_INT;
636 o.off1 = offsetof(struct ceph_msgr_options, port);
637 o.maxval = 65535;
638 o.minval = 1;
639 o.help = "Port to use for CEPH messenger";
640 }),
641 make_option([] (fio_option& o) {
642 o.name = "ms_type";
643 o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma";
644 o.type = FIO_OPT_STR;
645 o.off1 = offsetof(struct ceph_msgr_options, ms_type);
646 o.help = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page";
647 o.def = "undef";
648
649 o.posval[0].ival = "undef";
650 o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF;
651
652 o.posval[1].ival = "async+posix";
653 o.posval[1].oval = CEPH_MSGR_TYPE_POSIX;
654 o.posval[1].help = "POSIX API";
655
656 o.posval[2].ival = "async+dpdk";
657 o.posval[2].oval = CEPH_MSGR_TYPE_DPDK;
658 o.posval[2].help = "DPDK";
659
660 o.posval[3].ival = "async+rdma";
661 o.posval[3].oval = CEPH_MSGR_TYPE_RDMA;
662 o.posval[3].help = "RDMA";
663 }),
664 make_option([] (fio_option& o) {
665 o.name = "ceph_conf_file";
666 o.lname = "CEPH configuration file";
667 o.type = FIO_OPT_STR_STORE;
668 o.off1 = offsetof(struct ceph_msgr_options, conffile);
669 o.help = "Path to CEPH configuration file";
670 }),
671 {} /* Last NULL */
672};
673
674static struct ioengine_ops ioengine;
675
676extern "C" {
677
678void get_ioengine(struct ioengine_ops** ioengine_ptr)
679{
680 /*
681 * Main ioengine structure
682 */
683 ioengine.name = "ceph-msgr";
684 ioengine.version = FIO_IOOPS_VERSION;
685 ioengine.flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO;
686 ioengine.setup = fio_ceph_msgr_setup;
687 ioengine.queue = fio_ceph_msgr_queue;
688 ioengine.getevents = fio_ceph_msgr_getevents;
689 ioengine.event = fio_ceph_msgr_event;
690 ioengine.cleanup = fio_ceph_msgr_cleanup;
691 ioengine.open_file = fio_ceph_msgr_open_file;
692 ioengine.close_file = fio_ceph_msgr_close_file;
693 ioengine.io_u_init = fio_ceph_msgr_io_u_init;
694 ioengine.io_u_free = fio_ceph_msgr_io_u_free;
695 ioengine.option_struct_size = sizeof(struct ceph_msgr_options);
696 ioengine.options = options.data();
697
698 *ioengine_ptr = &ioengine;
699}
700} // extern "C"