]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * Portions Copyright (C) 2013 CohortFS, LLC | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #include <arpa/inet.h> | |
17 | #include <boost/lexical_cast.hpp> | |
18 | #include <set> | |
19 | #include <stdlib.h> | |
20 | #include <memory> | |
21 | ||
22 | #include "XioMsg.h" | |
23 | #include "XioMessenger.h" | |
24 | #include "common/address_helper.h" | |
25 | #include "common/code_environment.h" | |
26 | #include "messages/MNop.h" | |
27 | ||
28 | #define dout_subsys ceph_subsys_xio | |
29 | #undef dout_prefix | |
30 | #define dout_prefix *_dout << "xio." | |
31 | ||
32 | Mutex mtx("XioMessenger Package Lock"); | |
31f18b77 | 33 | std::atomic<bool> initialized = { false }; |
7c673cae | 34 | |
31f18b77 | 35 | std::atomic<unsigned> XioMessenger::nInstances = { 0 }; |
7c673cae FG |
36 | |
37 | struct xio_mempool *xio_msgr_noreg_mpool; | |
38 | ||
39 | static struct xio_session_ops xio_msgr_ops; | |
40 | ||
41 | /* Accelio API callouts */ | |
42 | ||
43 | namespace xio_log | |
44 | { | |
45 | typedef pair<const char*, int> level_pair; | |
46 | static const level_pair LEVELS[] = { | |
47 | make_pair("fatal", 0), | |
48 | make_pair("error", 0), | |
49 | make_pair("warn", 1), | |
50 | make_pair("info", 1), | |
51 | make_pair("debug", 2), | |
52 | make_pair("trace", 20) | |
53 | }; | |
54 | ||
55 | static CephContext *context; | |
56 | ||
57 | int get_level() | |
58 | { | |
59 | int level = 0; | |
60 | for (size_t i = 0; i < sizeof(LEVELS); i++) { | |
61 | if (!ldlog_p1(context, dout_subsys, LEVELS[i].second)) | |
62 | break; | |
63 | level++; | |
64 | } | |
65 | return level; | |
66 | } | |
67 | ||
68 | void log_dout(const char *file, unsigned line, | |
69 | const char *function, unsigned level, | |
70 | const char *fmt, ...) | |
71 | { | |
72 | char buffer[2048]; | |
73 | va_list args; | |
74 | va_start(args, fmt); | |
75 | int n = vsnprintf(buffer, sizeof(buffer), fmt, args); | |
76 | va_end(args); | |
77 | ||
78 | if (n > 0) { | |
79 | const char *short_file = strrchr(file, '/'); | |
80 | short_file = (short_file == NULL) ? file : short_file + 1; | |
81 | ||
82 | const level_pair &lvl = LEVELS[level]; | |
83 | ldout(context, lvl.second) << '[' << lvl.first << "] " | |
84 | << short_file << ':' << line << ' ' | |
85 | << function << " - " << buffer << dendl; | |
86 | } | |
87 | } | |
88 | } | |
89 | ||
90 | static int on_session_event(struct xio_session *session, | |
91 | struct xio_session_event_data *event_data, | |
92 | void *cb_user_context) | |
93 | { | |
94 | XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context); | |
95 | CephContext *cct = msgr->cct; | |
96 | ||
97 | ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event) | |
98 | << ". reason: " << xio_strerror(event_data->reason) << dendl; | |
99 | ||
100 | return msgr->session_event(session, event_data, cb_user_context); | |
101 | } | |
102 | ||
103 | static int on_new_session(struct xio_session *session, | |
104 | struct xio_new_session_req *req, | |
105 | void *cb_user_context) | |
106 | { | |
107 | XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context); | |
108 | CephContext *cct = msgr->cct; | |
109 | ||
110 | ldout(cct,4) << "new session " << session | |
111 | << " user_context " << cb_user_context << dendl; | |
112 | ||
113 | return (msgr->new_session(session, req, cb_user_context)); | |
114 | } | |
115 | ||
116 | static int on_msg(struct xio_session *session, | |
117 | struct xio_msg *req, | |
118 | int more_in_batch, | |
119 | void *cb_user_context) | |
120 | { | |
121 | XioConnection* xcon __attribute__((unused)) = | |
122 | static_cast<XioConnection*>(cb_user_context); | |
123 | CephContext *cct = xcon->get_messenger()->cct; | |
124 | ||
125 | ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl; | |
126 | ||
127 | if (unlikely(XioPool::trace_mempool)) { | |
128 | static uint32_t nreqs; | |
129 | if (unlikely((++nreqs % 65536) == 0)) { | |
130 | xp_stats.dump(__func__, nreqs); | |
131 | } | |
132 | } | |
133 | ||
134 | return xcon->on_msg(session, req, more_in_batch, | |
135 | cb_user_context); | |
136 | } | |
137 | ||
138 | static int on_ow_msg_send_complete(struct xio_session *session, | |
139 | struct xio_msg *msg, | |
140 | void *conn_user_context) | |
141 | { | |
142 | XioConnection *xcon = | |
143 | static_cast<XioConnection*>(conn_user_context); | |
144 | CephContext *cct = xcon->get_messenger()->cct; | |
145 | ||
146 | ldout(cct,25) << "msg delivered session: " << session | |
147 | << " msg: " << msg << " conn_user_context " | |
148 | << conn_user_context << dendl; | |
149 | ||
150 | return xcon->on_ow_msg_send_complete(session, msg, conn_user_context); | |
151 | } | |
152 | ||
153 | static int on_msg_error(struct xio_session *session, | |
154 | enum xio_status error, | |
155 | enum xio_msg_direction dir, | |
156 | struct xio_msg *msg, | |
157 | void *conn_user_context) | |
158 | { | |
159 | /* XIO promises to flush back undelivered messages */ | |
160 | XioConnection *xcon = | |
161 | static_cast<XioConnection*>(conn_user_context); | |
162 | CephContext *cct = xcon->get_messenger()->cct; | |
163 | ||
164 | ldout(cct,4) << "msg error session: " << session | |
165 | << " error: " << xio_strerror(error) << " msg: " << msg | |
166 | << " conn_user_context " << conn_user_context << dendl; | |
167 | ||
168 | return xcon->on_msg_error(session, error, msg, conn_user_context); | |
169 | } | |
170 | ||
171 | static int on_cancel(struct xio_session *session, | |
172 | struct xio_msg *msg, | |
173 | enum xio_status result, | |
174 | void *conn_user_context) | |
175 | { | |
176 | XioConnection* xcon __attribute__((unused)) = | |
177 | static_cast<XioConnection*>(conn_user_context); | |
178 | CephContext *cct = xcon->get_messenger()->cct; | |
179 | ||
180 | ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg | |
181 | << " conn_user_context " << conn_user_context << dendl; | |
182 | ||
183 | return 0; | |
184 | } | |
185 | ||
186 | static int on_cancel_request(struct xio_session *session, | |
187 | struct xio_msg *msg, | |
188 | void *conn_user_context) | |
189 | { | |
190 | XioConnection* xcon __attribute__((unused)) = | |
191 | static_cast<XioConnection*>(conn_user_context); | |
192 | CephContext *cct = xcon->get_messenger()->cct; | |
193 | ||
194 | ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg | |
195 | << " conn_user_context " << conn_user_context << dendl; | |
196 | ||
197 | return 0; | |
198 | } | |
199 | ||
200 | /* free functions */ | |
201 | static string xio_uri_from_entity(const string &type, | |
202 | const entity_addr_t& addr, bool want_port) | |
203 | { | |
204 | const char *host = NULL; | |
205 | char addr_buf[129]; | |
206 | string xio_uri; | |
207 | ||
208 | switch(addr.get_family()) { | |
209 | case AF_INET: | |
210 | host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf, | |
211 | INET_ADDRSTRLEN); | |
212 | break; | |
213 | case AF_INET6: | |
214 | host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf, | |
215 | INET6_ADDRSTRLEN); | |
216 | break; | |
217 | default: | |
218 | abort(); | |
219 | break; | |
220 | }; | |
221 | ||
222 | if (type == "rdma" || type == "tcp") | |
223 | xio_uri = type + "://"; | |
224 | else | |
225 | xio_uri = "rdma://"; | |
226 | ||
227 | /* The following can only succeed if the host is rdma-capable */ | |
228 | xio_uri += host; | |
229 | if (want_port) { | |
230 | xio_uri += ":"; | |
231 | xio_uri += boost::lexical_cast<std::string>(addr.get_port()); | |
232 | } | |
233 | ||
234 | return xio_uri; | |
235 | } /* xio_uri_from_entity */ | |
236 | ||
237 | void XioInit::package_init(CephContext *cct) { | |
31f18b77 | 238 | if (! initialized) { |
7c673cae FG |
239 | |
240 | mtx.Lock(); | |
31f18b77 | 241 | if (! initialized) { |
7c673cae FG |
242 | |
243 | xio_init(); | |
244 | ||
245 | // claim a reference to the first context we see | |
246 | xio_log::context = cct->get(); | |
247 | ||
248 | int xopt; | |
249 | xopt = xio_log::get_level(); | |
250 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL, | |
251 | &xopt, sizeof(xopt)); | |
252 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN, | |
253 | (const void*)xio_log::log_dout, sizeof(xio_log_fn)); | |
254 | ||
255 | xopt = 1; | |
256 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL, | |
257 | &xopt, sizeof(xopt)); | |
258 | ||
259 | if (g_code_env == CODE_ENVIRONMENT_DAEMON) { | |
260 | xopt = 1; | |
261 | xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT, | |
262 | &xopt, sizeof(xopt)); | |
263 | } | |
264 | ||
265 | xopt = XIO_MSGR_IOVLEN; | |
266 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN, | |
267 | &xopt, sizeof(xopt)); | |
268 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN, | |
269 | &xopt, sizeof(xopt)); | |
270 | ||
271 | /* enable flow-control */ | |
272 | xopt = 1; | |
273 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL, | |
274 | &xopt, sizeof(xopt)); | |
275 | ||
276 | /* and set threshold for buffer callouts */ | |
277 | xopt = max(cct->_conf->xio_max_send_inline, 512); | |
278 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA, | |
279 | &xopt, sizeof(xopt)); | |
280 | ||
281 | xopt = XioMsgHdr::get_max_encoded_length(); | |
282 | ldout(cct,2) << "setting accelio max header size " << xopt << dendl; | |
283 | xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER, | |
284 | &xopt, sizeof(xopt)); | |
285 | ||
286 | size_t queue_depth = cct->_conf->xio_queue_depth; | |
287 | struct xio_mempool_config mempool_config = { | |
288 | 6, | |
289 | { | |
290 | {1024, 0, queue_depth, 262144}, | |
291 | {4096, 0, queue_depth, 262144}, | |
292 | {16384, 0, queue_depth, 262144}, | |
293 | {65536, 0, 128, 65536}, | |
294 | {262144, 0, 32, 16384}, | |
295 | {1048576, 0, 8, 8192} | |
296 | } | |
297 | }; | |
298 | xio_set_opt(NULL, | |
299 | XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL, | |
300 | &mempool_config, sizeof(mempool_config)); | |
301 | ||
302 | /* and unregisterd one */ | |
303 | #define XMSG_MEMPOOL_QUANTUM 4096 | |
304 | ||
305 | xio_msgr_noreg_mpool = | |
306 | xio_mempool_create(-1 /* nodeid */, | |
307 | XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC); | |
308 | ||
309 | (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64, | |
310 | cct->_conf->xio_mp_min, | |
311 | cct->_conf->xio_mp_max_64, | |
312 | XMSG_MEMPOOL_QUANTUM, 0); | |
313 | (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256, | |
314 | cct->_conf->xio_mp_min, | |
315 | cct->_conf->xio_mp_max_256, | |
316 | XMSG_MEMPOOL_QUANTUM, 0); | |
317 | (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024, | |
318 | cct->_conf->xio_mp_min, | |
319 | cct->_conf->xio_mp_max_1k, | |
320 | XMSG_MEMPOOL_QUANTUM, 0); | |
321 | (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(), | |
322 | cct->_conf->xio_mp_min, | |
323 | cct->_conf->xio_mp_max_page, | |
324 | XMSG_MEMPOOL_QUANTUM, 0); | |
325 | ||
326 | /* initialize ops singleton */ | |
327 | xio_msgr_ops.on_session_event = on_session_event; | |
328 | xio_msgr_ops.on_new_session = on_new_session; | |
329 | xio_msgr_ops.on_session_established = NULL; | |
330 | xio_msgr_ops.on_msg = on_msg; | |
331 | xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete; | |
332 | xio_msgr_ops.on_msg_error = on_msg_error; | |
333 | xio_msgr_ops.on_cancel = on_cancel; | |
334 | xio_msgr_ops.on_cancel_request = on_cancel_request; | |
335 | ||
336 | /* mark initialized */ | |
31f18b77 | 337 | initialized = true; |
7c673cae FG |
338 | } |
339 | mtx.Unlock(); | |
340 | } | |
341 | } | |
342 | ||
343 | /* XioMessenger */ | |
344 | #undef dout_prefix | |
345 | #define dout_prefix _prefix(_dout, this) | |
346 | static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) { | |
11fdf7f2 | 347 | return *_dout << "-- " << msgr->get_myaddr_legacy() << " "; |
7c673cae FG |
348 | } |
349 | ||
350 | XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, | |
351 | string mname, uint64_t _nonce, | |
352 | uint64_t cflags, DispatchStrategy *ds) | |
353 | : SimplePolicyMessenger(cct, name, mname, _nonce), | |
354 | XioInit(cct), | |
7c673cae FG |
355 | portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)), |
356 | dispatch_strategy(ds), | |
357 | loop_con(new XioLoopbackConnection(this)), | |
358 | special_handling(0), | |
359 | sh_mtx("XioMessenger session mutex"), | |
360 | sh_cond(), | |
361 | need_addr(true), | |
362 | did_bind(false), | |
363 | nonce(_nonce) | |
364 | { | |
365 | ||
366 | if (cct->_conf->xio_trace_xcon) | |
367 | magic |= MSG_MAGIC_TRACE_XCON; | |
368 | ||
369 | XioPool::trace_mempool = (cct->_conf->xio_trace_mempool); | |
370 | XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt); | |
371 | ||
372 | dispatch_strategy->set_messenger(this); | |
373 | ||
374 | /* update class instance count */ | |
31f18b77 | 375 | nInstances++; |
7c673cae FG |
376 | |
377 | loop_con->set_features(CEPH_FEATURES_ALL); | |
378 | ||
379 | ldout(cct,2) << "Create msgr: " << this << " instance: " | |
31f18b77 | 380 | << nInstances << " type: " << name.type_str() |
7c673cae FG |
381 | << " subtype: " << mname << " nportals: " << get_nportals(cflags) |
382 | << " nconns_per_portal: " << get_nconns_per_portal(cflags) | |
383 | << dendl; | |
384 | ||
385 | } /* ctor */ | |
386 | ||
387 | int XioMessenger::pool_hint(uint32_t dsize) { | |
388 | if (dsize > 1024*1024) | |
389 | return 0; | |
390 | ||
391 | /* if dsize is already present, returns -EEXIST */ | |
392 | return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0, | |
393 | cct->_conf->xio_mp_max_hint, | |
394 | XMSG_MEMPOOL_QUANTUM, 0); | |
395 | } | |
396 | ||
397 | int XioMessenger::get_nconns_per_portal(uint64_t cflags) | |
398 | { | |
399 | const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8; | |
400 | int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL; | |
401 | ||
402 | if (cflags & Messenger::HAS_MANY_CONNECTIONS) | |
403 | nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); | |
404 | else if (cflags & Messenger::HEARTBEAT) | |
405 | nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); | |
406 | ||
407 | return nconns; | |
408 | } | |
409 | ||
410 | int XioMessenger::get_nportals(uint64_t cflags) | |
411 | { | |
412 | int nportals = 1; | |
413 | ||
414 | if (cflags & Messenger::HAS_HEAVY_TRAFFIC) | |
415 | nportals = max(cct->_conf->xio_portal_threads, 1); | |
416 | ||
417 | return nportals; | |
418 | } | |
419 | ||
420 | void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) | |
421 | { | |
422 | // be careful here: multiple threads may block here, and readers of | |
423 | // my_inst.addr do NOT hold any lock. | |
424 | ||
425 | // this always goes from true -> false under the protection of the | |
426 | // mutex. if it is already false, we need not retake the mutex at | |
427 | // all. | |
428 | if (!need_addr) | |
429 | return; | |
430 | ||
431 | sh_mtx.Lock(); | |
432 | if (need_addr) { | |
433 | entity_addr_t t = peer_addr_for_me; | |
434 | t.set_port(my_inst.addr.get_port()); | |
435 | my_inst.addr.set_sockaddr(t.get_sockaddr()); | |
436 | ldout(cct,2) << "learned my addr " << my_inst.addr << dendl; | |
437 | need_addr = false; | |
438 | // init_local_connection(); | |
439 | } | |
440 | sh_mtx.Unlock(); | |
441 | ||
442 | } | |
443 | ||
444 | int XioMessenger::new_session(struct xio_session *session, | |
445 | struct xio_new_session_req *req, | |
446 | void *cb_user_context) | |
447 | { | |
31f18b77 | 448 | if (shutdown_called) { |
7c673cae FG |
449 | return xio_reject( |
450 | session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */); | |
451 | } | |
452 | int code = portals.accept(session, req, cb_user_context); | |
453 | if (! code) | |
31f18b77 | 454 | nsessions++; |
7c673cae FG |
455 | return code; |
456 | } /* new_session */ | |
457 | ||
458 | int XioMessenger::session_event(struct xio_session *session, | |
459 | struct xio_session_event_data *event_data, | |
460 | void *cb_user_context) | |
461 | { | |
462 | XioConnection *xcon; | |
463 | ||
464 | switch (event_data->event) { | |
465 | case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT: | |
466 | { | |
467 | struct xio_connection *conn = event_data->conn; | |
468 | struct xio_connection_attr xcona; | |
469 | entity_addr_t peer_addr_for_me, paddr; | |
470 | ||
471 | xcon = static_cast<XioConnection*>(event_data->conn_user_context); | |
472 | ||
473 | ldout(cct,2) << "connection established " << event_data->conn | |
474 | << " session " << session << " xcon " << xcon << dendl; | |
475 | ||
476 | (void) xio_query_connection(conn, &xcona, | |
477 | XIO_CONNECTION_ATTR_LOCAL_ADDR| | |
478 | XIO_CONNECTION_ATTR_PEER_ADDR); | |
479 | peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); | |
480 | paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); | |
481 | //set_myaddr(peer_addr_for_me); | |
482 | learned_addr(peer_addr_for_me); | |
483 | ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl; | |
484 | ||
485 | /* notify hook */ | |
486 | this->ms_deliver_handle_connect(xcon); | |
487 | this->ms_deliver_handle_fast_connect(xcon); | |
488 | } | |
489 | break; | |
490 | ||
491 | case XIO_SESSION_NEW_CONNECTION_EVENT: | |
492 | { | |
493 | struct xio_connection *conn = event_data->conn; | |
494 | struct xio_connection_attr xcona; | |
495 | entity_inst_t s_inst; | |
496 | entity_addr_t peer_addr_for_me; | |
497 | ||
498 | (void) xio_query_connection(conn, &xcona, | |
499 | XIO_CONNECTION_ATTR_CTX| | |
500 | XIO_CONNECTION_ATTR_PEER_ADDR| | |
501 | XIO_CONNECTION_ATTR_LOCAL_ADDR); | |
502 | /* XXX assumes RDMA */ | |
503 | s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); | |
504 | peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); | |
505 | ||
506 | xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst); | |
507 | xcon->session = session; | |
508 | ||
509 | struct xio_context_attr xctxa; | |
510 | (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX); | |
511 | ||
512 | xcon->conn = conn; | |
513 | xcon->portal = static_cast<XioPortal*>(xctxa.user_context); | |
11fdf7f2 | 514 | ceph_assert(xcon->portal); |
7c673cae FG |
515 | |
516 | xcona.user_context = xcon; | |
517 | (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX); | |
518 | ||
31f18b77 | 519 | xcon->connected = true; |
7c673cae FG |
520 | |
521 | /* sentinel ref */ | |
522 | xcon->get(); /* xcon->nref == 1 */ | |
523 | conns_sp.lock(); | |
524 | conns_list.push_back(*xcon); | |
525 | /* XXX we can't put xcon in conns_entity_map becase we don't yet know | |
526 | * it's peer address */ | |
527 | conns_sp.unlock(); | |
528 | ||
529 | /* XXXX pre-merge of session startup negotiation ONLY! */ | |
530 | xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); | |
531 | ||
532 | ldout(cct,2) << "New connection session " << session | |
533 | << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl; | |
534 | ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl; | |
535 | } | |
536 | break; | |
537 | case XIO_SESSION_CONNECTION_ERROR_EVENT: | |
538 | case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */ | |
539 | case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */ | |
540 | case XIO_SESSION_CONNECTION_REFUSED_EVENT: | |
541 | xcon = static_cast<XioConnection*>(event_data->conn_user_context); | |
542 | ldout(cct,2) << xio_session_event_str(event_data->event) | |
543 | << " xcon " << xcon << " session " << session << dendl; | |
544 | if (likely(!!xcon)) { | |
545 | unregister_xcon(xcon); | |
546 | xcon->on_disconnect_event(); | |
547 | } | |
548 | break; | |
549 | case XIO_SESSION_CONNECTION_TEARDOWN_EVENT: | |
550 | xcon = static_cast<XioConnection*>(event_data->conn_user_context); | |
551 | ldout(cct,2) << xio_session_event_str(event_data->event) | |
552 | << " xcon " << xcon << " session " << session << dendl; | |
553 | /* | |
554 | * There are flows where Accelio sends teardown event without going | |
555 | * through disconnect event. so we make sure we cleaned the connection. | |
556 | */ | |
557 | unregister_xcon(xcon); | |
558 | xcon->on_teardown_event(); | |
559 | break; | |
560 | case XIO_SESSION_TEARDOWN_EVENT: | |
561 | ldout(cct,2) << xio_session_event_str(event_data->event) | |
562 | << " session " << session << dendl; | |
563 | if (unlikely(XioPool::trace_mempool)) { | |
564 | xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session)); | |
565 | } | |
566 | xio_session_destroy(session); | |
31f18b77 | 567 | if (--nsessions == 0) { |
7c673cae | 568 | Mutex::Locker lck(sh_mtx); |
31f18b77 | 569 | if (nsessions == 0) |
7c673cae FG |
570 | sh_cond.Signal(); |
571 | } | |
572 | break; | |
573 | default: | |
574 | break; | |
575 | }; | |
576 | ||
577 | return 0; | |
578 | } | |
579 | ||
580 | enum bl_type | |
581 | { | |
582 | BUFFER_PAYLOAD, | |
583 | BUFFER_MIDDLE, | |
584 | BUFFER_DATA | |
585 | }; | |
586 | ||
587 | #define MAX_XIO_BUF_SIZE 1044480 | |
588 | ||
589 | static inline int | |
590 | xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off) | |
591 | { | |
592 | ||
593 | const std::list<buffer::ptr>& buffers = bl.buffers(); | |
594 | list<bufferptr>::const_iterator pb; | |
595 | size_t size, off; | |
596 | int result; | |
597 | int first = 1; | |
598 | ||
599 | off = size = 0; | |
600 | result = 0; | |
601 | for (;;) { | |
602 | if (off >= size) { | |
603 | if (first) pb = buffers.begin(); else ++pb; | |
604 | if (pb == buffers.end()) { | |
605 | break; | |
606 | } | |
607 | off = 0; | |
608 | size = pb->length(); | |
609 | first = 0; | |
610 | } | |
611 | size_t count = size - off; | |
612 | if (!count) continue; | |
613 | if (req_size + count > MAX_XIO_BUF_SIZE) { | |
614 | count = MAX_XIO_BUF_SIZE - req_size; | |
615 | } | |
616 | ||
617 | ++result; | |
618 | ||
619 | /* advance iov and perhaps request */ | |
620 | ||
621 | off += count; | |
622 | req_size += count; | |
623 | ++msg_off; | |
624 | if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { | |
625 | ++req_off; | |
626 | msg_off = 0; | |
627 | req_size = 0; | |
628 | } | |
629 | } | |
630 | ||
631 | return result; | |
632 | } | |
633 | ||
634 | static inline void | |
635 | xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req, | |
636 | struct xio_iovec_ex*& msg_iov, int& req_size, | |
637 | int ex_cnt, int& msg_off, int& req_off, bl_type type) | |
638 | { | |
639 | ||
640 | const std::list<buffer::ptr>& buffers = bl.buffers(); | |
641 | list<bufferptr>::const_iterator pb; | |
642 | struct xio_iovec_ex* iov; | |
643 | size_t size, off; | |
644 | const char *data = NULL; | |
645 | int first = 1; | |
646 | ||
647 | off = size = 0; | |
648 | for (;;) { | |
649 | if (off >= size) { | |
650 | if (first) pb = buffers.begin(); else ++pb; | |
651 | if (pb == buffers.end()) { | |
652 | break; | |
653 | } | |
654 | off = 0; | |
655 | size = pb->length(); | |
656 | data = pb->c_str(); // is c_str() efficient? | |
657 | first = 0; | |
658 | } | |
659 | size_t count = size - off; | |
660 | if (!count) continue; | |
661 | if (req_size + count > MAX_XIO_BUF_SIZE) { | |
662 | count = MAX_XIO_BUF_SIZE - req_size; | |
663 | } | |
664 | ||
665 | /* assign buffer */ | |
666 | iov = &msg_iov[msg_off]; | |
667 | iov->iov_base = (void *) (&data[off]); | |
668 | iov->iov_len = count; | |
669 | ||
670 | switch (type) { | |
671 | case BUFFER_DATA: | |
672 | //break; | |
673 | default: | |
674 | { | |
675 | struct xio_reg_mem *mp = get_xio_mp(*pb); | |
676 | iov->mr = (mp) ? mp->mr : NULL; | |
677 | } | |
678 | break; | |
679 | } | |
680 | ||
681 | /* advance iov(s) */ | |
682 | ||
683 | off += count; | |
684 | req_size += count; | |
685 | ++msg_off; | |
686 | ||
687 | /* next request if necessary */ | |
688 | ||
689 | if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { | |
690 | /* finish this request */ | |
691 | req->out.pdata_iov.nents = msg_off; | |
692 | /* advance to next, and write in it if it's not the last one. */ | |
693 | if (++req_off >= ex_cnt) { | |
694 | req = 0; /* poison. trap if we try to use it. */ | |
695 | msg_iov = NULL; | |
696 | } else { | |
697 | req = &xmsg->req_arr[req_off].msg; | |
698 | msg_iov = req->out.pdata_iov.sglist; | |
699 | } | |
700 | msg_off = 0; | |
701 | req_size = 0; | |
702 | } | |
703 | } | |
704 | } | |
705 | ||
706 | int XioMessenger::bind(const entity_addr_t& addr) | |
707 | { | |
708 | if (addr.is_blank_ip()) { | |
709 | lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl; | |
710 | cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl; | |
711 | return -1; | |
712 | } | |
713 | ||
714 | entity_addr_t shift_addr = addr; | |
715 | string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, | |
716 | shift_addr, false /* want_port */); | |
717 | ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri " | |
718 | << base_uri << ':' << shift_addr.get_port() << dendl; | |
719 | ||
720 | uint16_t port0; | |
721 | int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0); | |
722 | if (r == 0) { | |
723 | shift_addr.set_port(port0); | |
724 | shift_addr.nonce = nonce; | |
725 | set_myaddr(shift_addr); | |
726 | need_addr = false; | |
727 | did_bind = true; | |
728 | } | |
729 | return r; | |
730 | } /* bind */ | |
731 | ||
732 | int XioMessenger::rebind(const set<int>& avoid_ports) | |
733 | { | |
734 | ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl; | |
735 | return 0; | |
736 | } /* rebind */ | |
737 | ||
738 | int XioMessenger::start() | |
739 | { | |
740 | portals.start(); | |
741 | dispatch_strategy->start(); | |
742 | if (!did_bind) { | |
743 | my_inst.addr.nonce = nonce; | |
744 | } | |
745 | started = true; | |
746 | return 0; | |
747 | } | |
748 | ||
749 | void XioMessenger::wait() | |
750 | { | |
751 | portals.join(); | |
752 | dispatch_strategy->wait(); | |
753 | } /* wait */ | |
754 | ||
755 | int XioMessenger::_send_message(Message *m, const entity_inst_t& dest) | |
756 | { | |
757 | ConnectionRef conn = get_connection(dest); | |
758 | if (conn) | |
759 | return _send_message(m, &(*conn)); | |
760 | else | |
761 | return EINVAL; | |
762 | } /* send_message(Message *, const entity_inst_t&) */ | |
763 | ||
764 | static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon, | |
765 | int ex_cnt) | |
766 | { | |
767 | struct xio_reg_mem mp_mem; | |
768 | int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem); | |
769 | if (!!e) | |
770 | return NULL; | |
771 | XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr); | |
11fdf7f2 | 772 | ceph_assert(!!xmsg); |
7c673cae FG |
773 | new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL); |
774 | return xmsg; | |
775 | } | |
776 | ||
777 | XioCommand* pool_alloc_xio_command(XioConnection *xcon) | |
778 | { | |
779 | struct xio_reg_mem mp_mem; | |
780 | int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem); | |
781 | if (!!e) | |
782 | return NULL; | |
783 | XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr); | |
11fdf7f2 | 784 | ceph_assert(!!xcmd); |
7c673cae FG |
785 | new (xcmd) XioCommand(xcon, mp_mem); |
786 | return xcmd; | |
787 | } | |
788 | ||
789 | int XioMessenger::_send_message(Message *m, Connection *con) | |
790 | { | |
791 | if (con == loop_con.get() /* intrusive_ptr get() */) { | |
792 | m->set_connection(con); | |
793 | m->set_src(get_myinst().name); | |
794 | m->set_seq(loop_con->next_seq()); | |
795 | ds_dispatch(m); | |
796 | return 0; | |
797 | } | |
798 | ||
799 | XioConnection *xcon = static_cast<XioConnection*>(con); | |
800 | ||
801 | /* If con is not in READY state, we have to enforce policy */ | |
802 | if (xcon->cstate.session_state.read() != XioConnection::UP) { | |
11fdf7f2 TL |
803 | std::lock_guard<decltype(xcon->sp) lg(xcon->sp); |
804 | ||
7c673cae FG |
805 | if (xcon->cstate.session_state.read() != XioConnection::UP) { |
806 | xcon->outgoing.mqueue.push_back(*m); | |
7c673cae FG |
807 | return 0; |
808 | } | |
7c673cae FG |
809 | } |
810 | ||
811 | return _send_message_impl(m, xcon); | |
812 | } /* send_message(Message* m, Connection *con) */ | |
813 | ||
814 | int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon) | |
815 | { | |
816 | int code = 0; | |
817 | ||
818 | Mutex::Locker l(xcon->lock); | |
819 | if (unlikely(XioPool::trace_mempool)) { | |
820 | static uint32_t nreqs; | |
821 | if (unlikely((++nreqs % 65536) == 0)) { | |
822 | xp_stats.dump(__func__, nreqs); | |
823 | } | |
824 | } | |
825 | ||
826 | m->set_seq(xcon->state.next_out_seq()); | |
827 | m->set_magic(magic); // trace flags and special handling | |
828 | ||
829 | m->encode(xcon->get_features(), this->crcflags); | |
830 | ||
831 | buffer::list &payload = m->get_payload(); | |
832 | buffer::list &middle = m->get_middle(); | |
833 | buffer::list &data = m->get_data(); | |
834 | ||
835 | int msg_off = 0; | |
836 | int req_off = 0; | |
837 | int req_size = 0; | |
838 | int nbuffers = | |
839 | xio_count_buffers(payload, req_size, msg_off, req_off) + | |
840 | xio_count_buffers(middle, req_size, msg_off, req_off) + | |
841 | xio_count_buffers(data, req_size, msg_off, req_off); | |
842 | ||
843 | int ex_cnt = req_off; | |
844 | if (msg_off == 0 && ex_cnt > 0) { | |
845 | // no buffers for last msg | |
846 | ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl; | |
847 | ex_cnt--; | |
848 | } | |
849 | ||
850 | /* get an XioMsg frame */ | |
851 | XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt); | |
852 | if (! xmsg) { | |
853 | /* could happen if Accelio has been shutdown */ | |
854 | return ENOMEM; | |
855 | } | |
856 | ||
857 | ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg | |
858 | << " tag " << (int)xmsg->hdr.tag | |
859 | << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type() | |
860 | << " features: " << xcon->get_features() | |
861 | << " conn " << xcon->conn << " sess " << xcon->session << dendl; | |
862 | ||
863 | if (magic & (MSG_MAGIC_XIO)) { | |
864 | ||
865 | /* XXXX verify */ | |
866 | switch (m->get_type()) { | |
867 | case 43: | |
868 | // case 15: | |
11fdf7f2 | 869 | ldout(cct,4) << __func__ << " stop 43 " << m->get_type() << " " << *m << dendl; |
7c673cae | 870 | buffer::list &payload = m->get_payload(); |
11fdf7f2 | 871 | ldout(cct,4) << __func__ << " payload dump:" << dendl; |
7c673cae FG |
872 | payload.hexdump(cout); |
873 | } | |
874 | } | |
875 | ||
876 | struct xio_msg *req = xmsg->get_xio_msg(); | |
877 | struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist; | |
878 | ||
879 | if (magic & (MSG_MAGIC_XIO)) { | |
880 | ldout(cct,4) << "payload: " << payload.buffers().size() << | |
881 | " middle: " << middle.buffers().size() << | |
882 | " data: " << data.buffers().size() << | |
883 | dendl; | |
884 | } | |
885 | ||
886 | if (unlikely(ex_cnt > 0)) { | |
887 | ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" << | |
888 | ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl; | |
889 | } | |
890 | ||
891 | /* do the invariant part */ | |
892 | msg_off = 0; | |
893 | req_off = -1; /* most often, not used */ | |
894 | req_size = 0; | |
895 | ||
896 | xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, | |
897 | req_off, BUFFER_PAYLOAD); | |
898 | ||
899 | xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, | |
900 | req_off, BUFFER_MIDDLE); | |
901 | ||
902 | xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, | |
903 | req_off, BUFFER_DATA); | |
904 | ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off | |
905 | << ", msg_cnt " << xmsg->get_msg_count() << dendl; | |
906 | ||
907 | /* finalize request */ | |
908 | if (msg_off) | |
909 | req->out.pdata_iov.nents = msg_off; | |
910 | ||
911 | /* fixup first msg */ | |
912 | req = xmsg->get_xio_msg(); | |
913 | ||
914 | const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers(); | |
11fdf7f2 | 915 | ceph_assert(header.size() == 1); /* XXX */ |
7c673cae FG |
916 | list<bufferptr>::const_iterator pb = header.begin(); |
917 | req->out.header.iov_base = (char*) pb->c_str(); | |
918 | req->out.header.iov_len = pb->length(); | |
919 | ||
920 | /* deliver via xio, preserve ordering */ | |
921 | if (xmsg->get_msg_count() > 1) { | |
922 | struct xio_msg *head = xmsg->get_xio_msg(); | |
923 | struct xio_msg *tail = head; | |
924 | for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) { | |
925 | req = &xmsg->req_arr[req_off].msg; | |
926 | assert(!req->in.pdata_iov.nents); | |
927 | assert(req->out.pdata_iov.nents || !nbuffers); | |
928 | tail->next = req; | |
929 | tail = req; | |
930 | } | |
931 | tail->next = NULL; | |
932 | } | |
933 | xmsg->trace = m->trace; | |
934 | m->trace.event("xio portal enqueue for send"); | |
935 | m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt); | |
936 | xcon->portal->enqueue_for_send(xcon, xmsg); | |
937 | ||
938 | return code; | |
939 | } /* send_message(Message *, Connection *) */ | |
940 | ||
941 | int XioMessenger::shutdown() | |
942 | { | |
31f18b77 | 943 | shutdown_called = true; |
7c673cae FG |
944 | conns_sp.lock(); |
945 | XioConnection::ConnList::iterator iter; | |
946 | iter = conns_list.begin(); | |
947 | for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) { | |
948 | (void) iter->disconnect(); // XXX mark down? | |
949 | } | |
950 | conns_sp.unlock(); | |
31f18b77 | 951 | while(nsessions > 0) { |
7c673cae | 952 | Mutex::Locker lck(sh_mtx); |
31f18b77 | 953 | if (nsessions > 0) |
7c673cae FG |
954 | sh_cond.Wait(sh_mtx); |
955 | } | |
956 | portals.shutdown(); | |
957 | dispatch_strategy->shutdown(); | |
958 | did_bind = false; | |
959 | started = false; | |
960 | return 0; | |
961 | } /* shutdown */ | |
962 | ||
963 | ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest) | |
964 | { | |
31f18b77 | 965 | if (shutdown_called) |
7c673cae FG |
966 | return NULL; |
967 | ||
968 | const entity_inst_t& self_inst = get_myinst(); | |
969 | if ((&dest == &self_inst) || | |
970 | (dest == self_inst)) { | |
971 | return get_loopback_connection(); | |
972 | } | |
973 | ||
974 | conns_sp.lock(); | |
975 | XioConnection::EntitySet::iterator conn_iter = | |
976 | conns_entity_map.find(dest, XioConnection::EntityComp()); | |
977 | if (conn_iter != conns_entity_map.end()) { | |
978 | ConnectionRef cref = &(*conn_iter); | |
979 | conns_sp.unlock(); | |
980 | return cref; | |
981 | } | |
982 | else { | |
983 | conns_sp.unlock(); | |
984 | string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, | |
985 | dest.addr, true /* want_port */); | |
986 | ||
987 | ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri " | |
988 | << xio_uri << dendl; | |
989 | ||
990 | /* XXX client session creation parameters */ | |
991 | struct xio_session_params params = {}; | |
992 | params.type = XIO_SESSION_CLIENT; | |
993 | params.ses_ops = &xio_msgr_ops; | |
994 | params.user_context = this; | |
995 | params.uri = xio_uri.c_str(); | |
996 | ||
997 | XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE, | |
998 | dest); | |
999 | ||
1000 | xcon->session = xio_session_create(¶ms); | |
1001 | if (! xcon->session) { | |
1002 | delete xcon; | |
1003 | return NULL; | |
1004 | } | |
1005 | ||
1006 | /* this should cause callbacks with user context of conn, but | |
1007 | * we can always set it explicitly */ | |
1008 | struct xio_connection_params xcp = {}; | |
1009 | xcp.session = xcon->session; | |
1010 | xcp.ctx = xcon->portal->ctx; | |
1011 | xcp.conn_user_context = xcon; | |
1012 | ||
1013 | xcon->conn = xio_connect(&xcp); | |
1014 | if (!xcon->conn) { | |
1015 | xio_session_destroy(xcon->session); | |
1016 | delete xcon; | |
1017 | return NULL; | |
1018 | } | |
1019 | ||
31f18b77 FG |
1020 | nsessions++; |
1021 | xcon->connected = true; | |
7c673cae FG |
1022 | |
1023 | /* sentinel ref */ | |
1024 | xcon->get(); /* xcon->nref == 1 */ | |
1025 | conns_sp.lock(); | |
1026 | conns_list.push_back(*xcon); | |
1027 | conns_entity_map.insert(*xcon); | |
1028 | conns_sp.unlock(); | |
1029 | ||
1030 | /* XXXX pre-merge of session startup negotiation ONLY! */ | |
1031 | xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); | |
1032 | ||
1033 | ldout(cct,2) << "New connection xcon: " << xcon << | |
1034 | " up_ready on session " << xcon->session << | |
1035 | " on msgr: " << this << " portal: " << xcon->portal << dendl; | |
1036 | ||
1037 | return xcon->get(); /* nref +1 */ | |
1038 | } | |
1039 | } /* get_connection */ | |
1040 | ||
1041 | ConnectionRef XioMessenger::get_loopback_connection() | |
1042 | { | |
1043 | return (loop_con.get()); | |
1044 | } /* get_loopback_connection */ | |
1045 | ||
1046 | void XioMessenger::unregister_xcon(XioConnection *xcon) | |
1047 | { | |
11fdf7f2 | 1048 | std::lock_guard<decltype(conns_sp)> lckr(conns_sp); |
7c673cae FG |
1049 | |
1050 | XioConnection::EntitySet::iterator conn_iter = | |
1051 | conns_entity_map.find(xcon->peer, XioConnection::EntityComp()); | |
1052 | if (conn_iter != conns_entity_map.end()) { | |
1053 | XioConnection *xcon2 = &(*conn_iter); | |
1054 | if (xcon == xcon2) { | |
1055 | conns_entity_map.erase(conn_iter); | |
1056 | } | |
1057 | } | |
1058 | ||
1059 | /* check if citer on conn_list */ | |
1060 | if (xcon->conns_hook.is_linked()) { | |
1061 | /* now find xcon on conns_list and erase */ | |
1062 | XioConnection::ConnList::iterator citer = | |
1063 | XioConnection::ConnList::s_iterator_to(*xcon); | |
1064 | conns_list.erase(citer); | |
1065 | } | |
1066 | } | |
1067 | ||
1068 | void XioMessenger::mark_down(const entity_addr_t& addr) | |
1069 | { | |
1070 | entity_inst_t inst(entity_name_t(), addr); | |
11fdf7f2 | 1071 | std::lock_guard<decltype(conns_sp)> lckr(conns_sp); |
7c673cae FG |
1072 | XioConnection::EntitySet::iterator conn_iter = |
1073 | conns_entity_map.find(inst, XioConnection::EntityComp()); | |
1074 | if (conn_iter != conns_entity_map.end()) { | |
1075 | (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); | |
1076 | } | |
1077 | } /* mark_down(const entity_addr_t& */ | |
1078 | ||
1079 | void XioMessenger::mark_down(Connection* con) | |
1080 | { | |
1081 | XioConnection *xcon = static_cast<XioConnection*>(con); | |
1082 | xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE); | |
1083 | } /* mark_down(Connection*) */ | |
1084 | ||
1085 | void XioMessenger::mark_down_all() | |
1086 | { | |
11fdf7f2 | 1087 | std::lock_guard<decltype(conns_sp)> lckr(conns_sp); |
7c673cae FG |
1088 | XioConnection::EntitySet::iterator conn_iter; |
1089 | for (conn_iter = conns_entity_map.begin(); conn_iter != | |
1090 | conns_entity_map.begin(); ++conn_iter) { | |
1091 | (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); | |
1092 | } | |
1093 | } /* mark_down_all */ | |
1094 | ||
1095 | static inline XioMarkDownHook* pool_alloc_markdown_hook( | |
1096 | XioConnection *xcon, Message *m) | |
1097 | { | |
1098 | struct xio_reg_mem mp_mem; | |
1099 | int e = xio_mempool_alloc(xio_msgr_noreg_mpool, | |
1100 | sizeof(XioMarkDownHook), &mp_mem); | |
1101 | if (!!e) | |
1102 | return NULL; | |
1103 | XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr); | |
1104 | new (hook) XioMarkDownHook(xcon, m, mp_mem); | |
1105 | return hook; | |
1106 | } | |
1107 | ||
1108 | void XioMessenger::mark_down_on_empty(Connection* con) | |
1109 | { | |
1110 | XioConnection *xcon = static_cast<XioConnection*>(con); | |
1111 | MNop* m = new MNop(); | |
1112 | m->tag = XIO_NOP_TAG_MARKDOWN; | |
1113 | m->set_completion_hook(pool_alloc_markdown_hook(xcon, m)); | |
1114 | // stall new messages | |
31f18b77 | 1115 | xcon->cstate.session_state = XioConnection::session_states::BARRIER; |
7c673cae FG |
1116 | (void) _send_message_impl(m, xcon); |
1117 | } | |
1118 | ||
1119 | void XioMessenger::mark_disposable(Connection *con) | |
1120 | { | |
1121 | XioConnection *xcon = static_cast<XioConnection*>(con); | |
1122 | xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE); | |
1123 | } | |
1124 | ||
1125 | void XioMessenger::try_insert(XioConnection *xcon) | |
1126 | { | |
11fdf7f2 | 1127 | std::lock_guard<decltype(conns_sp)> lckr(conns_sp); |
7c673cae FG |
1128 | /* already resident in conns_list */ |
1129 | conns_entity_map.insert(*xcon); | |
1130 | } | |
1131 | ||
1132 | XioMessenger::~XioMessenger() | |
1133 | { | |
1134 | delete dispatch_strategy; | |
31f18b77 | 1135 | nInstances--; |
7c673cae | 1136 | } /* dtor */ |