]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/xio/XioMessenger.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / xio / XioMessenger.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include <arpa/inet.h>
17#include <boost/lexical_cast.hpp>
18#include <set>
19#include <stdlib.h>
20#include <memory>
21
22#include "XioMsg.h"
23#include "XioMessenger.h"
24#include "common/address_helper.h"
25#include "common/code_environment.h"
26#include "messages/MNop.h"
27
28#define dout_subsys ceph_subsys_xio
29#undef dout_prefix
30#define dout_prefix *_dout << "xio."
31
32Mutex mtx("XioMessenger Package Lock");
31f18b77 33std::atomic<bool> initialized = { false };
7c673cae 34
31f18b77 35std::atomic<unsigned> XioMessenger::nInstances = { 0 };
7c673cae
FG
36
37struct xio_mempool *xio_msgr_noreg_mpool;
38
39static struct xio_session_ops xio_msgr_ops;
40
41/* Accelio API callouts */
42
43namespace xio_log
44{
45typedef pair<const char*, int> level_pair;
46static const level_pair LEVELS[] = {
47 make_pair("fatal", 0),
48 make_pair("error", 0),
49 make_pair("warn", 1),
50 make_pair("info", 1),
51 make_pair("debug", 2),
52 make_pair("trace", 20)
53};
54
55static CephContext *context;
56
57int get_level()
58{
59 int level = 0;
60 for (size_t i = 0; i < sizeof(LEVELS); i++) {
61 if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
62 break;
63 level++;
64 }
65 return level;
66}
67
68void log_dout(const char *file, unsigned line,
69 const char *function, unsigned level,
70 const char *fmt, ...)
71{
72 char buffer[2048];
73 va_list args;
74 va_start(args, fmt);
75 int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
76 va_end(args);
77
78 if (n > 0) {
79 const char *short_file = strrchr(file, '/');
80 short_file = (short_file == NULL) ? file : short_file + 1;
81
82 const level_pair &lvl = LEVELS[level];
83 ldout(context, lvl.second) << '[' << lvl.first << "] "
84 << short_file << ':' << line << ' '
85 << function << " - " << buffer << dendl;
86 }
87}
88}
89
90static int on_session_event(struct xio_session *session,
91 struct xio_session_event_data *event_data,
92 void *cb_user_context)
93{
94 XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
95 CephContext *cct = msgr->cct;
96
97 ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
98 << ". reason: " << xio_strerror(event_data->reason) << dendl;
99
100 return msgr->session_event(session, event_data, cb_user_context);
101}
102
103static int on_new_session(struct xio_session *session,
104 struct xio_new_session_req *req,
105 void *cb_user_context)
106{
107 XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
108 CephContext *cct = msgr->cct;
109
110 ldout(cct,4) << "new session " << session
111 << " user_context " << cb_user_context << dendl;
112
113 return (msgr->new_session(session, req, cb_user_context));
114}
115
116static int on_msg(struct xio_session *session,
117 struct xio_msg *req,
118 int more_in_batch,
119 void *cb_user_context)
120{
121 XioConnection* xcon __attribute__((unused)) =
122 static_cast<XioConnection*>(cb_user_context);
123 CephContext *cct = xcon->get_messenger()->cct;
124
125 ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
126
127 if (unlikely(XioPool::trace_mempool)) {
128 static uint32_t nreqs;
129 if (unlikely((++nreqs % 65536) == 0)) {
130 xp_stats.dump(__func__, nreqs);
131 }
132 }
133
134 return xcon->on_msg(session, req, more_in_batch,
135 cb_user_context);
136}
137
138static int on_ow_msg_send_complete(struct xio_session *session,
139 struct xio_msg *msg,
140 void *conn_user_context)
141{
142 XioConnection *xcon =
143 static_cast<XioConnection*>(conn_user_context);
144 CephContext *cct = xcon->get_messenger()->cct;
145
146 ldout(cct,25) << "msg delivered session: " << session
147 << " msg: " << msg << " conn_user_context "
148 << conn_user_context << dendl;
149
150 return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
151}
152
153static int on_msg_error(struct xio_session *session,
154 enum xio_status error,
155 enum xio_msg_direction dir,
156 struct xio_msg *msg,
157 void *conn_user_context)
158{
159 /* XIO promises to flush back undelivered messages */
160 XioConnection *xcon =
161 static_cast<XioConnection*>(conn_user_context);
162 CephContext *cct = xcon->get_messenger()->cct;
163
164 ldout(cct,4) << "msg error session: " << session
165 << " error: " << xio_strerror(error) << " msg: " << msg
166 << " conn_user_context " << conn_user_context << dendl;
167
168 return xcon->on_msg_error(session, error, msg, conn_user_context);
169}
170
171static int on_cancel(struct xio_session *session,
172 struct xio_msg *msg,
173 enum xio_status result,
174 void *conn_user_context)
175{
176 XioConnection* xcon __attribute__((unused)) =
177 static_cast<XioConnection*>(conn_user_context);
178 CephContext *cct = xcon->get_messenger()->cct;
179
180 ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
181 << " conn_user_context " << conn_user_context << dendl;
182
183 return 0;
184}
185
186static int on_cancel_request(struct xio_session *session,
187 struct xio_msg *msg,
188 void *conn_user_context)
189{
190 XioConnection* xcon __attribute__((unused)) =
191 static_cast<XioConnection*>(conn_user_context);
192 CephContext *cct = xcon->get_messenger()->cct;
193
194 ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
195 << " conn_user_context " << conn_user_context << dendl;
196
197 return 0;
198}
199
200/* free functions */
201static string xio_uri_from_entity(const string &type,
202 const entity_addr_t& addr, bool want_port)
203{
204 const char *host = NULL;
205 char addr_buf[129];
206 string xio_uri;
207
208 switch(addr.get_family()) {
209 case AF_INET:
210 host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf,
211 INET_ADDRSTRLEN);
212 break;
213 case AF_INET6:
214 host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf,
215 INET6_ADDRSTRLEN);
216 break;
217 default:
218 abort();
219 break;
220 };
221
222 if (type == "rdma" || type == "tcp")
223 xio_uri = type + "://";
224 else
225 xio_uri = "rdma://";
226
227 /* The following can only succeed if the host is rdma-capable */
228 xio_uri += host;
229 if (want_port) {
230 xio_uri += ":";
231 xio_uri += boost::lexical_cast<std::string>(addr.get_port());
232 }
233
234 return xio_uri;
235} /* xio_uri_from_entity */
236
237void XioInit::package_init(CephContext *cct) {
31f18b77 238 if (! initialized) {
7c673cae
FG
239
240 mtx.Lock();
31f18b77 241 if (! initialized) {
7c673cae
FG
242
243 xio_init();
244
245 // claim a reference to the first context we see
246 xio_log::context = cct->get();
247
248 int xopt;
249 xopt = xio_log::get_level();
250 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
251 &xopt, sizeof(xopt));
252 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
253 (const void*)xio_log::log_dout, sizeof(xio_log_fn));
254
255 xopt = 1;
256 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
257 &xopt, sizeof(xopt));
258
259 if (g_code_env == CODE_ENVIRONMENT_DAEMON) {
260 xopt = 1;
261 xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT,
262 &xopt, sizeof(xopt));
263 }
264
265 xopt = XIO_MSGR_IOVLEN;
266 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
267 &xopt, sizeof(xopt));
268 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
269 &xopt, sizeof(xopt));
270
271 /* enable flow-control */
272 xopt = 1;
273 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
274 &xopt, sizeof(xopt));
275
276 /* and set threshold for buffer callouts */
277 xopt = max(cct->_conf->xio_max_send_inline, 512);
278 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA,
279 &xopt, sizeof(xopt));
280
281 xopt = XioMsgHdr::get_max_encoded_length();
282 ldout(cct,2) << "setting accelio max header size " << xopt << dendl;
283 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER,
284 &xopt, sizeof(xopt));
285
286 size_t queue_depth = cct->_conf->xio_queue_depth;
287 struct xio_mempool_config mempool_config = {
288 6,
289 {
290 {1024, 0, queue_depth, 262144},
291 {4096, 0, queue_depth, 262144},
292 {16384, 0, queue_depth, 262144},
293 {65536, 0, 128, 65536},
294 {262144, 0, 32, 16384},
295 {1048576, 0, 8, 8192}
296 }
297 };
298 xio_set_opt(NULL,
299 XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL,
300 &mempool_config, sizeof(mempool_config));
301
302 /* and unregisterd one */
303 #define XMSG_MEMPOOL_QUANTUM 4096
304
305 xio_msgr_noreg_mpool =
306 xio_mempool_create(-1 /* nodeid */,
307 XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
308
309 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64,
310 cct->_conf->xio_mp_min,
311 cct->_conf->xio_mp_max_64,
312 XMSG_MEMPOOL_QUANTUM, 0);
313 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256,
314 cct->_conf->xio_mp_min,
315 cct->_conf->xio_mp_max_256,
316 XMSG_MEMPOOL_QUANTUM, 0);
317 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024,
318 cct->_conf->xio_mp_min,
319 cct->_conf->xio_mp_max_1k,
320 XMSG_MEMPOOL_QUANTUM, 0);
321 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(),
322 cct->_conf->xio_mp_min,
323 cct->_conf->xio_mp_max_page,
324 XMSG_MEMPOOL_QUANTUM, 0);
325
326 /* initialize ops singleton */
327 xio_msgr_ops.on_session_event = on_session_event;
328 xio_msgr_ops.on_new_session = on_new_session;
329 xio_msgr_ops.on_session_established = NULL;
330 xio_msgr_ops.on_msg = on_msg;
331 xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
332 xio_msgr_ops.on_msg_error = on_msg_error;
333 xio_msgr_ops.on_cancel = on_cancel;
334 xio_msgr_ops.on_cancel_request = on_cancel_request;
335
336 /* mark initialized */
31f18b77 337 initialized = true;
7c673cae
FG
338 }
339 mtx.Unlock();
340 }
341 }
342
343/* XioMessenger */
344#undef dout_prefix
345#define dout_prefix _prefix(_dout, this)
346static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
11fdf7f2 347 return *_dout << "-- " << msgr->get_myaddr_legacy() << " ";
7c673cae
FG
348}
349
350XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
351 string mname, uint64_t _nonce,
352 uint64_t cflags, DispatchStrategy *ds)
353 : SimplePolicyMessenger(cct, name, mname, _nonce),
354 XioInit(cct),
7c673cae
FG
355 portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
356 dispatch_strategy(ds),
357 loop_con(new XioLoopbackConnection(this)),
358 special_handling(0),
359 sh_mtx("XioMessenger session mutex"),
360 sh_cond(),
361 need_addr(true),
362 did_bind(false),
363 nonce(_nonce)
364{
365
366 if (cct->_conf->xio_trace_xcon)
367 magic |= MSG_MAGIC_TRACE_XCON;
368
369 XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
370 XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
371
372 dispatch_strategy->set_messenger(this);
373
374 /* update class instance count */
31f18b77 375 nInstances++;
7c673cae
FG
376
377 loop_con->set_features(CEPH_FEATURES_ALL);
378
379 ldout(cct,2) << "Create msgr: " << this << " instance: "
31f18b77 380 << nInstances << " type: " << name.type_str()
7c673cae
FG
381 << " subtype: " << mname << " nportals: " << get_nportals(cflags)
382 << " nconns_per_portal: " << get_nconns_per_portal(cflags)
383 << dendl;
384
385} /* ctor */
386
387int XioMessenger::pool_hint(uint32_t dsize) {
388 if (dsize > 1024*1024)
389 return 0;
390
391 /* if dsize is already present, returns -EEXIST */
392 return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0,
393 cct->_conf->xio_mp_max_hint,
394 XMSG_MEMPOOL_QUANTUM, 0);
395}
396
397int XioMessenger::get_nconns_per_portal(uint64_t cflags)
398{
399 const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
400 int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
401
402 if (cflags & Messenger::HAS_MANY_CONNECTIONS)
403 nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
404 else if (cflags & Messenger::HEARTBEAT)
405 nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
406
407 return nconns;
408}
409
410int XioMessenger::get_nportals(uint64_t cflags)
411{
412 int nportals = 1;
413
414 if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
415 nportals = max(cct->_conf->xio_portal_threads, 1);
416
417 return nportals;
418}
419
420void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
421{
422 // be careful here: multiple threads may block here, and readers of
423 // my_inst.addr do NOT hold any lock.
424
425 // this always goes from true -> false under the protection of the
426 // mutex. if it is already false, we need not retake the mutex at
427 // all.
428 if (!need_addr)
429 return;
430
431 sh_mtx.Lock();
432 if (need_addr) {
433 entity_addr_t t = peer_addr_for_me;
434 t.set_port(my_inst.addr.get_port());
435 my_inst.addr.set_sockaddr(t.get_sockaddr());
436 ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
437 need_addr = false;
438 // init_local_connection();
439 }
440 sh_mtx.Unlock();
441
442}
443
444int XioMessenger::new_session(struct xio_session *session,
445 struct xio_new_session_req *req,
446 void *cb_user_context)
447{
31f18b77 448 if (shutdown_called) {
7c673cae
FG
449 return xio_reject(
450 session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
451 }
452 int code = portals.accept(session, req, cb_user_context);
453 if (! code)
31f18b77 454 nsessions++;
7c673cae
FG
455 return code;
456} /* new_session */
457
458int XioMessenger::session_event(struct xio_session *session,
459 struct xio_session_event_data *event_data,
460 void *cb_user_context)
461{
462 XioConnection *xcon;
463
464 switch (event_data->event) {
465 case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
466 {
467 struct xio_connection *conn = event_data->conn;
468 struct xio_connection_attr xcona;
469 entity_addr_t peer_addr_for_me, paddr;
470
471 xcon = static_cast<XioConnection*>(event_data->conn_user_context);
472
473 ldout(cct,2) << "connection established " << event_data->conn
474 << " session " << session << " xcon " << xcon << dendl;
475
476 (void) xio_query_connection(conn, &xcona,
477 XIO_CONNECTION_ATTR_LOCAL_ADDR|
478 XIO_CONNECTION_ATTR_PEER_ADDR);
479 peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
480 paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
481 //set_myaddr(peer_addr_for_me);
482 learned_addr(peer_addr_for_me);
483 ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
484
485 /* notify hook */
486 this->ms_deliver_handle_connect(xcon);
487 this->ms_deliver_handle_fast_connect(xcon);
488 }
489 break;
490
491 case XIO_SESSION_NEW_CONNECTION_EVENT:
492 {
493 struct xio_connection *conn = event_data->conn;
494 struct xio_connection_attr xcona;
495 entity_inst_t s_inst;
496 entity_addr_t peer_addr_for_me;
497
498 (void) xio_query_connection(conn, &xcona,
499 XIO_CONNECTION_ATTR_CTX|
500 XIO_CONNECTION_ATTR_PEER_ADDR|
501 XIO_CONNECTION_ATTR_LOCAL_ADDR);
502 /* XXX assumes RDMA */
503 s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
504 peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
505
506 xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
507 xcon->session = session;
508
509 struct xio_context_attr xctxa;
510 (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
511
512 xcon->conn = conn;
513 xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
11fdf7f2 514 ceph_assert(xcon->portal);
7c673cae
FG
515
516 xcona.user_context = xcon;
517 (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
518
31f18b77 519 xcon->connected = true;
7c673cae
FG
520
521 /* sentinel ref */
522 xcon->get(); /* xcon->nref == 1 */
523 conns_sp.lock();
524 conns_list.push_back(*xcon);
525 /* XXX we can't put xcon in conns_entity_map becase we don't yet know
526 * it's peer address */
527 conns_sp.unlock();
528
529 /* XXXX pre-merge of session startup negotiation ONLY! */
530 xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
531
532 ldout(cct,2) << "New connection session " << session
533 << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
534 ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
535 }
536 break;
537 case XIO_SESSION_CONNECTION_ERROR_EVENT:
538 case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
539 case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
540 case XIO_SESSION_CONNECTION_REFUSED_EVENT:
541 xcon = static_cast<XioConnection*>(event_data->conn_user_context);
542 ldout(cct,2) << xio_session_event_str(event_data->event)
543 << " xcon " << xcon << " session " << session << dendl;
544 if (likely(!!xcon)) {
545 unregister_xcon(xcon);
546 xcon->on_disconnect_event();
547 }
548 break;
549 case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
550 xcon = static_cast<XioConnection*>(event_data->conn_user_context);
551 ldout(cct,2) << xio_session_event_str(event_data->event)
552 << " xcon " << xcon << " session " << session << dendl;
553 /*
554 * There are flows where Accelio sends teardown event without going
555 * through disconnect event. so we make sure we cleaned the connection.
556 */
557 unregister_xcon(xcon);
558 xcon->on_teardown_event();
559 break;
560 case XIO_SESSION_TEARDOWN_EVENT:
561 ldout(cct,2) << xio_session_event_str(event_data->event)
562 << " session " << session << dendl;
563 if (unlikely(XioPool::trace_mempool)) {
564 xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
565 }
566 xio_session_destroy(session);
31f18b77 567 if (--nsessions == 0) {
7c673cae 568 Mutex::Locker lck(sh_mtx);
31f18b77 569 if (nsessions == 0)
7c673cae
FG
570 sh_cond.Signal();
571 }
572 break;
573 default:
574 break;
575 };
576
577 return 0;
578}
579
580enum bl_type
581{
582 BUFFER_PAYLOAD,
583 BUFFER_MIDDLE,
584 BUFFER_DATA
585};
586
587#define MAX_XIO_BUF_SIZE 1044480
588
589static inline int
590xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off)
591{
592
593 const std::list<buffer::ptr>& buffers = bl.buffers();
594 list<bufferptr>::const_iterator pb;
595 size_t size, off;
596 int result;
597 int first = 1;
598
599 off = size = 0;
600 result = 0;
601 for (;;) {
602 if (off >= size) {
603 if (first) pb = buffers.begin(); else ++pb;
604 if (pb == buffers.end()) {
605 break;
606 }
607 off = 0;
608 size = pb->length();
609 first = 0;
610 }
611 size_t count = size - off;
612 if (!count) continue;
613 if (req_size + count > MAX_XIO_BUF_SIZE) {
614 count = MAX_XIO_BUF_SIZE - req_size;
615 }
616
617 ++result;
618
619 /* advance iov and perhaps request */
620
621 off += count;
622 req_size += count;
623 ++msg_off;
624 if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
625 ++req_off;
626 msg_off = 0;
627 req_size = 0;
628 }
629 }
630
631 return result;
632}
633
634static inline void
635xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
636 struct xio_iovec_ex*& msg_iov, int& req_size,
637 int ex_cnt, int& msg_off, int& req_off, bl_type type)
638{
639
640 const std::list<buffer::ptr>& buffers = bl.buffers();
641 list<bufferptr>::const_iterator pb;
642 struct xio_iovec_ex* iov;
643 size_t size, off;
644 const char *data = NULL;
645 int first = 1;
646
647 off = size = 0;
648 for (;;) {
649 if (off >= size) {
650 if (first) pb = buffers.begin(); else ++pb;
651 if (pb == buffers.end()) {
652 break;
653 }
654 off = 0;
655 size = pb->length();
656 data = pb->c_str(); // is c_str() efficient?
657 first = 0;
658 }
659 size_t count = size - off;
660 if (!count) continue;
661 if (req_size + count > MAX_XIO_BUF_SIZE) {
662 count = MAX_XIO_BUF_SIZE - req_size;
663 }
664
665 /* assign buffer */
666 iov = &msg_iov[msg_off];
667 iov->iov_base = (void *) (&data[off]);
668 iov->iov_len = count;
669
670 switch (type) {
671 case BUFFER_DATA:
672 //break;
673 default:
674 {
675 struct xio_reg_mem *mp = get_xio_mp(*pb);
676 iov->mr = (mp) ? mp->mr : NULL;
677 }
678 break;
679 }
680
681 /* advance iov(s) */
682
683 off += count;
684 req_size += count;
685 ++msg_off;
686
687 /* next request if necessary */
688
689 if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
690 /* finish this request */
691 req->out.pdata_iov.nents = msg_off;
692 /* advance to next, and write in it if it's not the last one. */
693 if (++req_off >= ex_cnt) {
694 req = 0; /* poison. trap if we try to use it. */
695 msg_iov = NULL;
696 } else {
697 req = &xmsg->req_arr[req_off].msg;
698 msg_iov = req->out.pdata_iov.sglist;
699 }
700 msg_off = 0;
701 req_size = 0;
702 }
703 }
704}
705
706int XioMessenger::bind(const entity_addr_t& addr)
707{
708 if (addr.is_blank_ip()) {
709 lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl;
710 cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl;
711 return -1;
712 }
713
714 entity_addr_t shift_addr = addr;
715 string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
716 shift_addr, false /* want_port */);
717 ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
718 << base_uri << ':' << shift_addr.get_port() << dendl;
719
720 uint16_t port0;
721 int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
722 if (r == 0) {
723 shift_addr.set_port(port0);
724 shift_addr.nonce = nonce;
725 set_myaddr(shift_addr);
726 need_addr = false;
727 did_bind = true;
728 }
729 return r;
730} /* bind */
731
732int XioMessenger::rebind(const set<int>& avoid_ports)
733{
734 ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
735 return 0;
736} /* rebind */
737
738int XioMessenger::start()
739{
740 portals.start();
741 dispatch_strategy->start();
742 if (!did_bind) {
743 my_inst.addr.nonce = nonce;
744 }
745 started = true;
746 return 0;
747}
748
749void XioMessenger::wait()
750{
751 portals.join();
752 dispatch_strategy->wait();
753} /* wait */
754
755int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
756{
757 ConnectionRef conn = get_connection(dest);
758 if (conn)
759 return _send_message(m, &(*conn));
760 else
761 return EINVAL;
762} /* send_message(Message *, const entity_inst_t&) */
763
764static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
765 int ex_cnt)
766{
767 struct xio_reg_mem mp_mem;
768 int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
769 if (!!e)
770 return NULL;
771 XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
11fdf7f2 772 ceph_assert(!!xmsg);
7c673cae
FG
773 new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
774 return xmsg;
775}
776
777XioCommand* pool_alloc_xio_command(XioConnection *xcon)
778{
779 struct xio_reg_mem mp_mem;
780 int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
781 if (!!e)
782 return NULL;
783 XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
11fdf7f2 784 ceph_assert(!!xcmd);
7c673cae
FG
785 new (xcmd) XioCommand(xcon, mp_mem);
786 return xcmd;
787}
788
789int XioMessenger::_send_message(Message *m, Connection *con)
790{
791 if (con == loop_con.get() /* intrusive_ptr get() */) {
792 m->set_connection(con);
793 m->set_src(get_myinst().name);
794 m->set_seq(loop_con->next_seq());
795 ds_dispatch(m);
796 return 0;
797 }
798
799 XioConnection *xcon = static_cast<XioConnection*>(con);
800
801 /* If con is not in READY state, we have to enforce policy */
802 if (xcon->cstate.session_state.read() != XioConnection::UP) {
11fdf7f2
TL
803 std::lock_guard<decltype(xcon->sp) lg(xcon->sp);
804
7c673cae
FG
805 if (xcon->cstate.session_state.read() != XioConnection::UP) {
806 xcon->outgoing.mqueue.push_back(*m);
7c673cae
FG
807 return 0;
808 }
7c673cae
FG
809 }
810
811 return _send_message_impl(m, xcon);
812} /* send_message(Message* m, Connection *con) */
813
814int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
815{
816 int code = 0;
817
818 Mutex::Locker l(xcon->lock);
819 if (unlikely(XioPool::trace_mempool)) {
820 static uint32_t nreqs;
821 if (unlikely((++nreqs % 65536) == 0)) {
822 xp_stats.dump(__func__, nreqs);
823 }
824 }
825
826 m->set_seq(xcon->state.next_out_seq());
827 m->set_magic(magic); // trace flags and special handling
828
829 m->encode(xcon->get_features(), this->crcflags);
830
831 buffer::list &payload = m->get_payload();
832 buffer::list &middle = m->get_middle();
833 buffer::list &data = m->get_data();
834
835 int msg_off = 0;
836 int req_off = 0;
837 int req_size = 0;
838 int nbuffers =
839 xio_count_buffers(payload, req_size, msg_off, req_off) +
840 xio_count_buffers(middle, req_size, msg_off, req_off) +
841 xio_count_buffers(data, req_size, msg_off, req_off);
842
843 int ex_cnt = req_off;
844 if (msg_off == 0 && ex_cnt > 0) {
845 // no buffers for last msg
846 ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
847 ex_cnt--;
848 }
849
850 /* get an XioMsg frame */
851 XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
852 if (! xmsg) {
853 /* could happen if Accelio has been shutdown */
854 return ENOMEM;
855 }
856
857 ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
858 << " tag " << (int)xmsg->hdr.tag
859 << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type()
860 << " features: " << xcon->get_features()
861 << " conn " << xcon->conn << " sess " << xcon->session << dendl;
862
863 if (magic & (MSG_MAGIC_XIO)) {
864
865 /* XXXX verify */
866 switch (m->get_type()) {
867 case 43:
868 // case 15:
11fdf7f2 869 ldout(cct,4) << __func__ << " stop 43 " << m->get_type() << " " << *m << dendl;
7c673cae 870 buffer::list &payload = m->get_payload();
11fdf7f2 871 ldout(cct,4) << __func__ << " payload dump:" << dendl;
7c673cae
FG
872 payload.hexdump(cout);
873 }
874 }
875
876 struct xio_msg *req = xmsg->get_xio_msg();
877 struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
878
879 if (magic & (MSG_MAGIC_XIO)) {
880 ldout(cct,4) << "payload: " << payload.buffers().size() <<
881 " middle: " << middle.buffers().size() <<
882 " data: " << data.buffers().size() <<
883 dendl;
884 }
885
886 if (unlikely(ex_cnt > 0)) {
887 ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
888 ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
889 }
890
891 /* do the invariant part */
892 msg_off = 0;
893 req_off = -1; /* most often, not used */
894 req_size = 0;
895
896 xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
897 req_off, BUFFER_PAYLOAD);
898
899 xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
900 req_off, BUFFER_MIDDLE);
901
902 xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
903 req_off, BUFFER_DATA);
904 ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
905 << ", msg_cnt " << xmsg->get_msg_count() << dendl;
906
907 /* finalize request */
908 if (msg_off)
909 req->out.pdata_iov.nents = msg_off;
910
911 /* fixup first msg */
912 req = xmsg->get_xio_msg();
913
914 const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
11fdf7f2 915 ceph_assert(header.size() == 1); /* XXX */
7c673cae
FG
916 list<bufferptr>::const_iterator pb = header.begin();
917 req->out.header.iov_base = (char*) pb->c_str();
918 req->out.header.iov_len = pb->length();
919
920 /* deliver via xio, preserve ordering */
921 if (xmsg->get_msg_count() > 1) {
922 struct xio_msg *head = xmsg->get_xio_msg();
923 struct xio_msg *tail = head;
924 for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) {
925 req = &xmsg->req_arr[req_off].msg;
926assert(!req->in.pdata_iov.nents);
927assert(req->out.pdata_iov.nents || !nbuffers);
928 tail->next = req;
929 tail = req;
930 }
931 tail->next = NULL;
932 }
933 xmsg->trace = m->trace;
934 m->trace.event("xio portal enqueue for send");
935 m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt);
936 xcon->portal->enqueue_for_send(xcon, xmsg);
937
938 return code;
939} /* send_message(Message *, Connection *) */
940
941int XioMessenger::shutdown()
942{
31f18b77 943 shutdown_called = true;
7c673cae
FG
944 conns_sp.lock();
945 XioConnection::ConnList::iterator iter;
946 iter = conns_list.begin();
947 for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
948 (void) iter->disconnect(); // XXX mark down?
949 }
950 conns_sp.unlock();
31f18b77 951 while(nsessions > 0) {
7c673cae 952 Mutex::Locker lck(sh_mtx);
31f18b77 953 if (nsessions > 0)
7c673cae
FG
954 sh_cond.Wait(sh_mtx);
955 }
956 portals.shutdown();
957 dispatch_strategy->shutdown();
958 did_bind = false;
959 started = false;
960 return 0;
961} /* shutdown */
962
963ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
964{
31f18b77 965 if (shutdown_called)
7c673cae
FG
966 return NULL;
967
968 const entity_inst_t& self_inst = get_myinst();
969 if ((&dest == &self_inst) ||
970 (dest == self_inst)) {
971 return get_loopback_connection();
972 }
973
974 conns_sp.lock();
975 XioConnection::EntitySet::iterator conn_iter =
976 conns_entity_map.find(dest, XioConnection::EntityComp());
977 if (conn_iter != conns_entity_map.end()) {
978 ConnectionRef cref = &(*conn_iter);
979 conns_sp.unlock();
980 return cref;
981 }
982 else {
983 conns_sp.unlock();
984 string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
985 dest.addr, true /* want_port */);
986
987 ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
988 << xio_uri << dendl;
989
990 /* XXX client session creation parameters */
991 struct xio_session_params params = {};
992 params.type = XIO_SESSION_CLIENT;
993 params.ses_ops = &xio_msgr_ops;
994 params.user_context = this;
995 params.uri = xio_uri.c_str();
996
997 XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
998 dest);
999
1000 xcon->session = xio_session_create(&params);
1001 if (! xcon->session) {
1002 delete xcon;
1003 return NULL;
1004 }
1005
1006 /* this should cause callbacks with user context of conn, but
1007 * we can always set it explicitly */
1008 struct xio_connection_params xcp = {};
1009 xcp.session = xcon->session;
1010 xcp.ctx = xcon->portal->ctx;
1011 xcp.conn_user_context = xcon;
1012
1013 xcon->conn = xio_connect(&xcp);
1014 if (!xcon->conn) {
1015 xio_session_destroy(xcon->session);
1016 delete xcon;
1017 return NULL;
1018 }
1019
31f18b77
FG
1020 nsessions++;
1021 xcon->connected = true;
7c673cae
FG
1022
1023 /* sentinel ref */
1024 xcon->get(); /* xcon->nref == 1 */
1025 conns_sp.lock();
1026 conns_list.push_back(*xcon);
1027 conns_entity_map.insert(*xcon);
1028 conns_sp.unlock();
1029
1030 /* XXXX pre-merge of session startup negotiation ONLY! */
1031 xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
1032
1033 ldout(cct,2) << "New connection xcon: " << xcon <<
1034 " up_ready on session " << xcon->session <<
1035 " on msgr: " << this << " portal: " << xcon->portal << dendl;
1036
1037 return xcon->get(); /* nref +1 */
1038 }
1039} /* get_connection */
1040
1041ConnectionRef XioMessenger::get_loopback_connection()
1042{
1043 return (loop_con.get());
1044} /* get_loopback_connection */
1045
1046void XioMessenger::unregister_xcon(XioConnection *xcon)
1047{
11fdf7f2 1048 std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
7c673cae
FG
1049
1050 XioConnection::EntitySet::iterator conn_iter =
1051 conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
1052 if (conn_iter != conns_entity_map.end()) {
1053 XioConnection *xcon2 = &(*conn_iter);
1054 if (xcon == xcon2) {
1055 conns_entity_map.erase(conn_iter);
1056 }
1057 }
1058
1059 /* check if citer on conn_list */
1060 if (xcon->conns_hook.is_linked()) {
1061 /* now find xcon on conns_list and erase */
1062 XioConnection::ConnList::iterator citer =
1063 XioConnection::ConnList::s_iterator_to(*xcon);
1064 conns_list.erase(citer);
1065 }
1066}
1067
1068void XioMessenger::mark_down(const entity_addr_t& addr)
1069{
1070 entity_inst_t inst(entity_name_t(), addr);
11fdf7f2 1071 std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
7c673cae
FG
1072 XioConnection::EntitySet::iterator conn_iter =
1073 conns_entity_map.find(inst, XioConnection::EntityComp());
1074 if (conn_iter != conns_entity_map.end()) {
1075 (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
1076 }
1077} /* mark_down(const entity_addr_t& */
1078
1079void XioMessenger::mark_down(Connection* con)
1080{
1081 XioConnection *xcon = static_cast<XioConnection*>(con);
1082 xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
1083} /* mark_down(Connection*) */
1084
1085void XioMessenger::mark_down_all()
1086{
11fdf7f2 1087 std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
7c673cae
FG
1088 XioConnection::EntitySet::iterator conn_iter;
1089 for (conn_iter = conns_entity_map.begin(); conn_iter !=
1090 conns_entity_map.begin(); ++conn_iter) {
1091 (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
1092 }
1093} /* mark_down_all */
1094
1095static inline XioMarkDownHook* pool_alloc_markdown_hook(
1096 XioConnection *xcon, Message *m)
1097{
1098 struct xio_reg_mem mp_mem;
1099 int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
1100 sizeof(XioMarkDownHook), &mp_mem);
1101 if (!!e)
1102 return NULL;
1103 XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
1104 new (hook) XioMarkDownHook(xcon, m, mp_mem);
1105 return hook;
1106}
1107
1108void XioMessenger::mark_down_on_empty(Connection* con)
1109{
1110 XioConnection *xcon = static_cast<XioConnection*>(con);
1111 MNop* m = new MNop();
1112 m->tag = XIO_NOP_TAG_MARKDOWN;
1113 m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
1114 // stall new messages
31f18b77 1115 xcon->cstate.session_state = XioConnection::session_states::BARRIER;
7c673cae
FG
1116 (void) _send_message_impl(m, xcon);
1117}
1118
1119void XioMessenger::mark_disposable(Connection *con)
1120{
1121 XioConnection *xcon = static_cast<XioConnection*>(con);
1122 xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
1123}
1124
1125void XioMessenger::try_insert(XioConnection *xcon)
1126{
11fdf7f2 1127 std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
7c673cae
FG
1128 /* already resident in conns_list */
1129 conns_entity_map.insert(*xcon);
1130}
1131
1132XioMessenger::~XioMessenger()
1133{
1134 delete dispatch_strategy;
31f18b77 1135 nInstances--;
7c673cae 1136} /* dtor */