1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * CEPH messenger engine
6 * FIO engine which uses ceph messenger as a transport. See corresponding
7 * FIO client and server jobs for details.
10 #include "global/global_init.h"
11 #include "msg/Messenger.h"
12 #include "messages/MOSDOp.h"
13 #include "messages/MOSDOpReply.h"
14 #include "common/perf_counters.h"
15 #include "auth/DummyAuth.h"
16 #include "ring_buffer.h"
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_
34 const char *ceph_msgr_types
[] = { "undef", "async+posix",
35 "async+dpdk", "async+rdma" };
37 struct ceph_msgr_options
{
38 struct thread_data
*td__
;
39 unsigned int is_receiver
;
40 unsigned int is_single
;
44 enum ceph_msgr_type ms_type
;
49 struct ceph_msgr_data
{
50 ceph_msgr_data(struct ceph_msgr_options
*o_
, unsigned iodepth
) :
52 INIT_FLIST_HEAD(&io_inflight_list
);
53 INIT_FLIST_HEAD(&io_pending_list
);
54 ring_buffer_init(&io_completed_q
, iodepth
);
55 pthread_spin_init(&spin
, PTHREAD_PROCESS_PRIVATE
);
58 struct ceph_msgr_options
*o
;
59 Messenger
*msgr
= NULL
;
60 FioDispatcher
*disp
= NULL
;
61 pthread_spinlock_t spin
;
62 struct ring_buffer io_completed_q
;
63 struct flist_head io_inflight_list
;
64 struct flist_head io_pending_list
;
65 unsigned int io_inflight_nr
= 0;
66 unsigned int io_pending_nr
= 0;
70 struct flist_head list
;
71 struct ceph_msgr_data
*data
;
73 MOSDOp
*req_msg
; /** Cached request, valid only for sender */
76 struct ceph_msgr_reply_io
{
77 struct flist_head list
;
81 static void *str_to_ptr(const std::string
&str
)
83 return (void *)strtoul(str
.c_str(), NULL
, 16);
86 static std::string
ptr_to_str(void *ptr
)
90 snprintf(buf
, sizeof(buf
), "%llx", (unsigned long long)ptr
);
91 return std::string(buf
);
95 * Used for refcounters print on the last context put, almost duplicates
96 * global context refcounter, sigh.
98 static std::atomic
<int> ctx_ref(1);
99 static DummyAuthClientServer
*g_dummy_auth
;
101 static void create_or_get_ceph_context(struct ceph_msgr_options
*o
)
103 if (g_ceph_context
) {
104 g_ceph_context
->get();
109 boost::intrusive_ptr
<CephContext
> cct
;
110 vector
<const char*> args
;
113 args
= { "--conf", o
->conffile
};
115 cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
116 CODE_ENVIRONMENT_UTILITY
,
117 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
118 /* Will use g_ceph_context instead */
121 common_init_finish(g_ceph_context
);
122 g_ceph_context
->_conf
.apply_changes(NULL
);
123 g_dummy_auth
= new DummyAuthClientServer(g_ceph_context
);
124 g_dummy_auth
->auth_registry
.refresh_config();
127 static void put_ceph_context(void)
129 if (--ctx_ref
== 0) {
133 f
= Formatter::create("json-pretty");
134 g_ceph_context
->get_perfcounters_collection()->dump_formatted(f
, false);
135 ostr
<< ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl
;
137 ostr
<< ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl
;
141 dout(0) << ostr
.str() << dendl
;
144 g_ceph_context
->put();
147 static void ceph_msgr_sender_on_reply(const object_t
&oid
)
149 struct ceph_msgr_data
*data
;
150 struct ceph_msgr_io
*io
;
153 * Here we abuse object and use it as a raw pointer. Since this is
154 * only for benchmarks and testing we do not care about anything
155 * but performance. So no need to use global structure in order
156 * to search for reply, just send a pointer and get it back.
159 io
= (decltype(io
))str_to_ptr(oid
.name
);
161 ring_buffer_enqueue(&data
->io_completed_q
, (void *)io
);
165 class ReplyCompletion
: public Message::CompletionHook
{
166 struct ceph_msgr_io
*m_io
;
169 ReplyCompletion(MOSDOpReply
*rep
, struct ceph_msgr_io
*io
) :
170 Message::CompletionHook(rep
),
173 void finish(int err
) override
{
174 struct ceph_msgr_data
*data
= m_io
->data
;
176 ring_buffer_enqueue(&data
->io_completed_q
, (void *)m_io
);
180 static void ceph_msgr_receiver_on_request(struct ceph_msgr_data
*data
,
185 rep
= new MOSDOpReply(req
, 0, 0, 0, false);
186 rep
->set_connection(req
->get_connection());
188 pthread_spin_lock(&data
->spin
);
189 if (data
->io_inflight_nr
) {
190 struct ceph_msgr_io
*io
;
192 data
->io_inflight_nr
--;
193 io
= flist_first_entry(&data
->io_inflight_list
,
194 struct ceph_msgr_io
, list
);
195 flist_del(&io
->list
);
196 pthread_spin_unlock(&data
->spin
);
198 rep
->set_completion_hook(new ReplyCompletion(rep
, io
));
199 rep
->get_connection()->send_message(rep
);
201 struct ceph_msgr_reply_io
*rep_io
;
203 rep_io
= (decltype(rep_io
))malloc(sizeof(*rep_io
));
206 data
->io_pending_nr
++;
207 flist_add_tail(&rep_io
->list
, &data
->io_pending_list
);
208 pthread_spin_unlock(&data
->spin
);
212 class FioDispatcher
: public Dispatcher
{
213 struct ceph_msgr_data
*m_data
;
216 FioDispatcher(struct ceph_msgr_data
*data
):
217 Dispatcher(g_ceph_context
),
220 bool ms_can_fast_dispatch_any() const override
{
223 bool ms_can_fast_dispatch(const Message
*m
) const override
{
224 switch (m
->get_type()) {
225 case CEPH_MSG_OSD_OP
:
226 return m_data
->o
->is_receiver
;
227 case CEPH_MSG_OSD_OPREPLY
:
228 return !m_data
->o
->is_receiver
;
233 void ms_handle_fast_connect(Connection
*con
) override
{
235 void ms_handle_fast_accept(Connection
*con
) override
{
237 bool ms_dispatch(Message
*m
) override
{
240 void ms_fast_dispatch(Message
*m
) override
{
241 if (m_data
->o
->is_receiver
) {
245 * Server side, handle request.
248 req
= static_cast<MOSDOp
*>(m
);
249 req
->finish_decode();
251 ceph_msgr_receiver_on_request(m_data
, req
);
256 * Client side, get reply, extract objid and mark
260 rep
= static_cast<MOSDOpReply
*>(m
);
261 ceph_msgr_sender_on_reply(rep
->get_oid());
265 bool ms_handle_reset(Connection
*con
) override
{
268 void ms_handle_remote_reset(Connection
*con
) override
{
270 bool ms_handle_refused(Connection
*con
) override
{
273 int ms_handle_authentication(Connection
*con
) override
{
278 static entity_addr_t
hostname_to_addr(struct ceph_msgr_options
*o
)
282 addr
.parse(o
->hostname
);
283 addr
.set_port(o
->port
);
289 static Messenger
*create_messenger(struct ceph_msgr_options
*o
)
291 entity_name_t ename
= o
->is_receiver
?
292 entity_name_t::OSD(0) : entity_name_t::CLIENT(0);
293 std::string lname
= o
->is_receiver
?
294 "receiver" : "sender";
296 std::string ms_type
= o
->ms_type
!= CEPH_MSGR_TYPE_UNDEF
?
297 ceph_msgr_types
[o
->ms_type
] :
298 g_ceph_context
->_conf
.get_val
<std::string
>("ms_type");
300 /* o->td__>pid doesn't set value, so use getpid() instead*/
301 auto nonce
= o
->is_receiver
? 0 : (getpid() + o
->td__
->thread_number
);
302 Messenger
*msgr
= Messenger::create(g_ceph_context
, ms_type
.c_str(),
303 ename
, lname
, nonce
);
304 if (o
->is_receiver
) {
305 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
306 msgr
->bind(hostname_to_addr(o
));
308 msgr
->set_default_policy(Messenger::Policy::lossless_client(0));
310 msgr
->set_auth_client(g_dummy_auth
);
311 msgr
->set_auth_server(g_dummy_auth
);
312 msgr
->set_require_authorizer(false);
318 static Messenger
*single_msgr
;
319 static std::atomic
<int> single_msgr_ref
;
320 static vector
<FioDispatcher
*> single_msgr_disps
;
322 static void init_messenger(struct ceph_msgr_data
*data
)
324 struct ceph_msgr_options
*o
= data
->o
;
328 disp
= new FioDispatcher(data
);
331 * Single messenger instance for the whole FIO
335 msgr
= create_messenger(o
);
340 single_msgr_disps
.push_back(disp
);
344 * Messenger instance per FIO thread
346 msgr
= create_messenger(o
);
348 msgr
->add_dispatcher_head(disp
);
354 static void free_messenger(struct ceph_msgr_data
*data
)
356 data
->msgr
->shutdown();
361 static void put_messenger(struct ceph_msgr_data
*data
)
363 struct ceph_msgr_options
*o
= data
->o
;
366 if (--single_msgr_ref
== 0) {
367 free_messenger(data
);
369 * In case of a single messenger instance we have to
370 * free dispatchers after actual messenger destruction.
372 for (auto disp
: single_msgr_disps
)
377 free_messenger(data
);
384 static int fio_ceph_msgr_setup(struct thread_data
*td
)
386 struct ceph_msgr_options
*o
= (decltype(o
))td
->eo
;
388 ceph_msgr_data
*data
;
390 /* We have to manage global resources so we use threads */
391 td
->o
.use_thread
= 1;
393 create_or_get_ceph_context(o
);
395 if (!td
->io_ops_data
) {
396 data
= new ceph_msgr_data(o
, td
->o
.iodepth
);
397 init_messenger(data
);
398 td
->io_ops_data
= (void *)data
;
404 static void fio_ceph_msgr_cleanup(struct thread_data
*td
)
406 struct ceph_msgr_data
*data
;
409 data
= (decltype(data
))td
->io_ops_data
;
412 nr
= ring_buffer_used_size(&data
->io_completed_q
);
414 fprintf(stderr
, "fio: io_completed_nr==%d, but should be zero\n",
416 if (data
->io_inflight_nr
)
417 fprintf(stderr
, "fio: io_inflight_nr==%d, but should be zero\n",
418 data
->io_inflight_nr
);
419 if (data
->io_pending_nr
)
420 fprintf(stderr
, "fio: io_pending_nr==%d, but should be zero\n",
421 data
->io_pending_nr
);
422 if (!flist_empty(&data
->io_inflight_list
))
423 fprintf(stderr
, "fio: io_inflight_list is not empty\n");
424 if (!flist_empty(&data
->io_pending_list
))
425 fprintf(stderr
, "fio: io_pending_list is not empty\n");
427 ring_buffer_deinit(&data
->io_completed_q
);
432 static int fio_ceph_msgr_io_u_init(struct thread_data
*td
, struct io_u
*io_u
)
434 struct ceph_msgr_options
*o
= (decltype(o
))td
->eo
;
435 struct ceph_msgr_io
*io
;
436 MOSDOp
*req_msg
= NULL
;
438 io
= (decltype(io
))malloc(sizeof(*io
));
440 io
->data
= (decltype(io
->data
))td
->io_ops_data
;
442 if (!o
->is_receiver
) {
443 object_t
oid(ptr_to_str(io
));
445 object_locator_t oloc
;
446 hobject_t
hobj(oid
, oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
447 pgid
.pool(), oloc
.nspace
);
449 entity_inst_t
dest(entity_name_t::OSD(0), hostname_to_addr(o
));
451 Messenger
*msgr
= io
->data
->msgr
;
452 ConnectionRef con
= msgr
->connect_to(dest
.name
.type(),
453 entity_addrvec_t(dest
.addr
));
455 req_msg
= new MOSDOp(0, 0, hobj
, spgid
, 0, 0, 0);
456 req_msg
->set_connection(con
);
459 io
->req_msg
= req_msg
;
460 io_u
->engine_data
= (void *)io
;
465 static void fio_ceph_msgr_io_u_free(struct thread_data
*td
, struct io_u
*io_u
)
467 struct ceph_msgr_io
*io
;
469 io
= (decltype(io
))io_u
->engine_data
;
471 io_u
->engine_data
= NULL
;
478 static enum fio_q_status
ceph_msgr_sender_queue(struct thread_data
*td
,
481 struct ceph_msgr_data
*data
;
482 struct ceph_msgr_io
*io
;
484 bufferlist buflist
= bufferlist::static_from_mem(
485 (char *)io_u
->buf
, io_u
->buflen
);
487 io
= (decltype(io
))io_u
->engine_data
;
488 data
= (decltype(data
))td
->io_ops_data
;
490 /* No handy method to clear ops before reusage? Ok */
491 io
->req_msg
->ops
.clear();
493 /* Here we do not care about direction, always send as write */
494 io
->req_msg
->write(0, io_u
->buflen
, buflist
);
495 /* Keep message alive */
497 io
->req_msg
->get_connection()->send_message(io
->req_msg
);
502 static int fio_ceph_msgr_getevents(struct thread_data
*td
, unsigned int min
,
503 unsigned int max
, const struct timespec
*ts
)
505 struct ceph_msgr_data
*data
;
508 data
= (decltype(data
))td
->io_ops_data
;
511 * Check io_u.c : if min == 0 -> ts is valid and equal to zero,
512 * if min != 0 -> ts is NULL.
516 nr
= ring_buffer_used_size(&data
->io_completed_q
);
518 /* We got something */
521 /* Here we are only if min != 0 and ts == NULL */
524 while ((nr
= ring_buffer_used_size(&data
->io_completed_q
)) < min
&&
526 /* Poll, no disk IO, so we expect response immediately. */
533 static struct io_u
*fio_ceph_msgr_event(struct thread_data
*td
, int event
)
535 struct ceph_msgr_data
*data
;
536 struct ceph_msgr_io
*io
;
538 data
= (decltype(data
))td
->io_ops_data
;
539 io
= (decltype(io
))ring_buffer_dequeue(&data
->io_completed_q
);
544 static enum fio_q_status
ceph_msgr_receiver_queue(struct thread_data
*td
,
547 struct ceph_msgr_data
*data
;
548 struct ceph_msgr_io
*io
;
550 io
= (decltype(io
))io_u
->engine_data
;
552 pthread_spin_lock(&data
->spin
);
553 if (data
->io_pending_nr
) {
554 struct ceph_msgr_reply_io
*rep_io
;
557 data
->io_pending_nr
--;
558 rep_io
= flist_first_entry(&data
->io_pending_list
,
559 struct ceph_msgr_reply_io
,
561 flist_del(&rep_io
->list
);
563 pthread_spin_unlock(&data
->spin
);
566 rep
->set_completion_hook(new ReplyCompletion(rep
, io
));
567 rep
->get_connection()->send_message(rep
);
569 data
->io_inflight_nr
++;
570 flist_add_tail(&io
->list
, &data
->io_inflight_list
);
571 pthread_spin_unlock(&data
->spin
);
577 static enum fio_q_status
fio_ceph_msgr_queue(struct thread_data
*td
,
580 struct ceph_msgr_options
*o
= (decltype(o
))td
->eo
;
583 return ceph_msgr_receiver_queue(td
, io_u
);
585 return ceph_msgr_sender_queue(td
, io_u
);
588 static int fio_ceph_msgr_open_file(struct thread_data
*td
, struct fio_file
*f
)
593 static int fio_ceph_msgr_close_file(struct thread_data
*, struct fio_file
*)
598 template <class Func
>
599 fio_option
make_option(Func
&& func
)
601 auto o
= fio_option
{};
602 o
.category
= FIO_OPT_C_ENGINE
;
607 static std::vector
<fio_option
> options
{
608 make_option([] (fio_option
& o
) {
610 o
.lname
= "CEPH messenger is receiver";
611 o
.type
= FIO_OPT_BOOL
;
612 o
.off1
= offsetof(struct ceph_msgr_options
, is_receiver
);
613 o
.help
= "CEPH messenger is sender or receiver";
616 make_option([] (fio_option
& o
) {
617 o
.name
= "single_instance";
618 o
.lname
= "Single instance of CEPH messenger ";
619 o
.type
= FIO_OPT_BOOL
;
620 o
.off1
= offsetof(struct ceph_msgr_options
, is_single
);
621 o
.help
= "CEPH messenger is a created once for all threads";
624 make_option([] (fio_option
& o
) {
626 o
.lname
= "CEPH messenger hostname";
627 o
.type
= FIO_OPT_STR_STORE
;
628 o
.off1
= offsetof(struct ceph_msgr_options
, hostname
);
629 o
.help
= "Hostname for CEPH messenger engine";
631 make_option([] (fio_option
& o
) {
633 o
.lname
= "CEPH messenger engine port";
634 o
.type
= FIO_OPT_INT
;
635 o
.off1
= offsetof(struct ceph_msgr_options
, port
);
638 o
.help
= "Port to use for CEPH messenger";
640 make_option([] (fio_option
& o
) {
642 o
.lname
= "CEPH messenger transport type: async+posix, async+dpdk, async+rdma";
643 o
.type
= FIO_OPT_STR
;
644 o
.off1
= offsetof(struct ceph_msgr_options
, ms_type
);
645 o
.help
= "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page";
648 o
.posval
[0].ival
= "undef";
649 o
.posval
[0].oval
= CEPH_MSGR_TYPE_UNDEF
;
651 o
.posval
[1].ival
= "async+posix";
652 o
.posval
[1].oval
= CEPH_MSGR_TYPE_POSIX
;
653 o
.posval
[1].help
= "POSIX API";
655 o
.posval
[2].ival
= "async+dpdk";
656 o
.posval
[2].oval
= CEPH_MSGR_TYPE_DPDK
;
657 o
.posval
[2].help
= "DPDK";
659 o
.posval
[3].ival
= "async+rdma";
660 o
.posval
[3].oval
= CEPH_MSGR_TYPE_RDMA
;
661 o
.posval
[3].help
= "RDMA";
663 make_option([] (fio_option
& o
) {
664 o
.name
= "ceph_conf_file";
665 o
.lname
= "CEPH configuration file";
666 o
.type
= FIO_OPT_STR_STORE
;
667 o
.off1
= offsetof(struct ceph_msgr_options
, conffile
);
668 o
.help
= "Path to CEPH configuration file";
673 static struct ioengine_ops ioengine
;
677 void get_ioengine(struct ioengine_ops
** ioengine_ptr
)
680 * Main ioengine structure
682 ioengine
.name
= "ceph-msgr";
683 ioengine
.version
= FIO_IOOPS_VERSION
;
684 ioengine
.flags
= FIO_DISKLESSIO
| FIO_UNIDIR
| FIO_PIPEIO
;
685 ioengine
.setup
= fio_ceph_msgr_setup
;
686 ioengine
.queue
= fio_ceph_msgr_queue
;
687 ioengine
.getevents
= fio_ceph_msgr_getevents
;
688 ioengine
.event
= fio_ceph_msgr_event
;
689 ioengine
.cleanup
= fio_ceph_msgr_cleanup
;
690 ioengine
.open_file
= fio_ceph_msgr_open_file
;
691 ioengine
.close_file
= fio_ceph_msgr_close_file
;
692 ioengine
.io_u_init
= fio_ceph_msgr_io_u_init
;
693 ioengine
.io_u_free
= fio_ceph_msgr_io_u_free
;
694 ioengine
.option_struct_size
= sizeof(struct ceph_msgr_options
);
695 ioengine
.options
= options
.data();
697 *ioengine_ptr
= &ioengine
;