1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
25 #include <boost/lexical_cast.hpp>
26 #include "msg/SimplePolicyMessenger.h"
27 #include "XioConnection.h"
30 #include "include/spinlock.h"
32 #include "include/ceph_assert.h"
33 #include "common/dout.h"
35 #ifndef CACHE_LINE_SIZE
36 #define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
38 #define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
40 class XioPortal
: public Thread
46 const static int nlanes
= 7;
58 int ix
; /* atomicity by portal thread */
65 for (ix
= 0; ix
< nlanes
; ++ix
) {
71 inline Lane
* get_lane(XioConnection
*xcon
)
73 return &qlane
[(((uint64_t) xcon
) / 16) % nlanes
];
76 void enq(XioConnection
*xcon
, XioSubmit
* xs
)
78 Lane
* lane
= get_lane(xcon
);
79 std::lock_guard
<decltype(lane
->sp
)> lg(lane
->sp
);
80 lane
->q
.push_back(*xs
);
84 void enq(XioConnection
*xcon
, XioSubmit::Queue
& requeue_q
)
86 int size
= requeue_q
.size();
87 Lane
* lane
= get_lane(xcon
);
88 std::lock_guard
<decltype(lane
->sp
)> lg(lane
->sp
);
89 XioSubmit::Queue::const_iterator i1
= lane
->q
.end();
90 lane
->q
.splice(i1
, requeue_q
);
94 void deq(XioSubmit::Queue
& send_q
)
99 for (cnt
= 0; cnt
< nlanes
; ++cnt
, ++ix
, ix
= ix
% nlanes
) {
100 std::lock_guard
<decltype(lane
->sp
)> lg(lane
->sp
);
102 if (lane
->size
> 0) {
103 XioSubmit::Queue::const_iterator i1
= send_q
.end();
104 send_q
.splice(i1
, lane
->q
);
106 ++ix
, ix
= ix
% nlanes
;
115 struct xio_context
*ctx
;
116 struct xio_server
*server
;
117 SubmitQueue submit_q
;
125 uint32_t special_handling
;
127 friend class XioPortals
;
128 friend class XioMessenger
;
131 explicit XioPortal(Messenger
*_msgr
, int max_conns
) :
132 msgr(_msgr
), ctx(NULL
), server(NULL
), submit_q(), xio_uri(""),
133 portal_id(NULL
), _shutdown(false), drained(false),
137 struct xio_context_params ctx_params
;
138 memset(&ctx_params
, 0, sizeof(ctx_params
));
139 ctx_params
.user_context
= this;
141 * hint to Accelio the total number of connections that will share
142 * this context's resources: internal primary task pool...
144 ctx_params
.max_conns_per_ctx
= max_conns
;
146 /* a portal is an xio_context and event loop */
147 ctx
= xio_context_create(&ctx_params
, 0 /* poll timeout */, -1 /* cpu hint */);
148 ceph_assert(ctx
&& "Whoops, failed to create portal/ctx");
151 int bind(struct xio_session_ops
*ops
, const string
&base_uri
,
152 uint16_t port
, uint16_t *assigned_port
);
154 inline void release_xio_msg(XioCompletion
* xcmp
) {
155 struct xio_msg
*msg
= xcmp
->dequeue();
156 struct xio_msg
*next_msg
= NULL
;
158 if (unlikely(!xcmp
->xcon
->conn
)) {
159 // NOTE: msg is not safe to dereference if the connection was torn down
160 xcmp
->xcon
->msg_release_fail(msg
, ENOTCONN
);
163 next_msg
= static_cast<struct xio_msg
*>(msg
->user_context
);
164 code
= xio_release_msg(msg
);
165 if (unlikely(code
)) /* very unlikely, so log it */
166 xcmp
->xcon
->msg_release_fail(msg
, code
);
169 xcmp
->trace
.event("xio_release_msg");
170 xcmp
->finalize(); /* unconditional finalize */
173 void enqueue(XioConnection
*xcon
, XioSubmit
*xs
)
176 submit_q
.enq(xcon
, xs
);
177 xio_context_stop_loop(ctx
);
183 case XioSubmit::OUTGOING_MSG
: /* it was an outgoing 1-way */
185 XioSend
* xsend
= static_cast<XioSend
*>(xs
);
186 xs
->xcon
->msg_send_fail(xsend
, -EINVAL
);
190 /* INCOMING_MSG_RELEASE */
191 release_xio_msg(static_cast<XioCompletion
*>(xs
));
196 void requeue(XioConnection
* xcon
, XioSubmit::Queue
& send_q
) {
197 submit_q
.enq(xcon
, send_q
);
200 void requeue_all_xcon(XioConnection
* xcon
,
201 XioSubmit::Queue::iterator
& q_iter
,
202 XioSubmit::Queue
& send_q
) {
203 // XXX gather all already-dequeued outgoing messages for xcon
204 // and push them in FIFO order to front of the input queue,
205 // and mark the connection as flow-controlled
206 XioSubmit::Queue requeue_q
;
208 while (q_iter
!= send_q
.end()) {
209 XioSubmit
*xs
= &(*q_iter
);
210 // skip retires and anything for other connections
211 if (xs
->xcon
!= xcon
) {
215 q_iter
= send_q
.erase(q_iter
);
216 requeue_q
.push_back(*xs
);
218 std::lock_guard
<decltype(xcon
->sp
)> lg(xcon
->sp
);
219 XioSubmit::Queue::const_iterator i1
= xcon
->outgoing
.requeue
.begin();
220 xcon
->outgoing
.requeue
.splice(i1
, requeue_q
);
221 xcon
->cstate
.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED
);
227 uint32_t xio_qdepth_high
;
228 XioSubmit::Queue send_q
;
229 XioSubmit::Queue::iterator q_iter
;
230 struct xio_msg
*msg
= NULL
;
236 submit_q
.deq(send_q
);
238 /* shutdown() barrier */
239 std::lock_guard
<decltype(sp
)> lg(sp
);
242 size
= send_q
.size();
245 // XXX XioSend queues for flow-controlled connections may require
251 q_iter
= send_q
.begin();
252 while (q_iter
!= send_q
.end()) {
257 case XioSubmit::OUTGOING_MSG
: /* it was an outgoing 1-way */
258 xsend
= static_cast<XioSend
*>(xs
);
259 if (unlikely(!xcon
->conn
|| !xcon
->is_connected()))
262 /* XXX guard Accelio send queue (should be safe to rely
263 * on Accelio's check on below, but this assures that
264 * all chained xio_msg are accounted) */
265 xio_qdepth_high
= xcon
->xio_qdepth_high_mark();
266 if (unlikely((xcon
->send_ctr
+ xsend
->get_msg_count()) >
268 requeue_all_xcon(xcon
, q_iter
, send_q
);
272 xs
->trace
.event("xio_send_msg");
273 msg
= xsend
->get_xio_msg();
274 code
= xio_send_msg(xcon
->conn
, msg
);
275 /* header trace moved here to capture xio serial# */
276 if (ldlog_p1(msgr
->cct
, ceph_subsys_xio
, 11)) {
277 xsend
->print_debug(msgr
->cct
, "xio_send_msg");
279 /* get the right Accelio's errno code */
280 if (unlikely(code
)) {
281 if ((code
== -1) && (xio_errno() == -1)) {
282 /* In case XIO does not have any credits to send,
283 * it would still queue up the message(s) for transmission,
284 * but would return -1 and errno would also be set to -1.
285 * This needs to be treated as a success.
294 if (unlikely(code
)) {
296 case XIO_E_TX_QUEUE_OVERFLOW
:
298 requeue_all_xcon(xcon
, q_iter
, send_q
);
303 q_iter
= send_q
.erase(q_iter
);
304 xcon
->msg_send_fail(xsend
, code
);
309 xcon
->send
.set(msg
->timestamp
); // need atomic?
310 xcon
->send_ctr
+= xsend
->get_msg_count(); // only inc if cb promised
314 /* INCOMING_MSG_RELEASE */
315 q_iter
= send_q
.erase(q_iter
);
316 release_xio_msg(static_cast<XioCompletion
*>(xs
));
318 } /* switch (xs->type) */
319 q_iter
= send_q
.erase(q_iter
);
323 xio_context_run_loop(ctx
, 300);
325 } while ((!_shutdown
) || (!drained
));
331 xio_context_destroy(ctx
);
337 std::lock_guard
<decltype(sp
)> lg(sp
);
345 vector
<XioPortal
*> portals
;
351 XioPortals(Messenger
*msgr
, int _n
, int nconns
) : p_vec(NULL
), last_unused(0)
356 for (int i
= 0; i
< n
; i
++) {
358 portals
[i
] = new XioPortal(msgr
, nconns
);
359 ceph_assert(portals
[i
] != nullptr);
364 vector
<XioPortal
*>& get() { return portals
; }
366 const char **get_vec()
368 return (const char **) p_vec
;
371 int get_portals_len()
376 int get_last_unused()
378 int pix
= last_unused
;
379 if (++last_unused
>= get_portals_len())
384 XioPortal
* get_next_portal()
386 int pix
= get_last_unused();
390 int bind(struct xio_session_ops
*ops
, const string
& base_uri
,
391 uint16_t port
, uint16_t *port0
);
393 int accept(struct xio_session
*session
,
394 struct xio_new_session_req
*req
,
395 void *cb_user_context
)
397 const char **portals_vec
= get_vec();
398 int pix
= get_last_unused();
401 return xio_accept(session
, NULL
, 0, NULL
, 0);
403 return xio_accept(session
,
404 (const char **)&(portals_vec
[pix
]),
412 int p_ix
, nportals
= portals
.size();
414 p_vec
= new char*[nportals
];
415 for (p_ix
= 0; p_ix
< nportals
; ++p_ix
) {
416 portal
= portals
[p_ix
];
417 p_vec
[p_ix
] = (char*) /* portal->xio_uri.c_str() */
421 for (p_ix
= 0; p_ix
< nportals
; ++p_ix
) {
422 string thread_name
= "ms_xio_";
423 thread_name
.append(std::to_string(p_ix
));
424 portal
= portals
[p_ix
];
425 portal
->create(thread_name
.c_str());
431 int nportals
= portals
.size();
432 for (int p_ix
= 0; p_ix
< nportals
; ++p_ix
) {
433 XioPortal
*portal
= portals
[p_ix
];
440 int nportals
= portals
.size();
441 for (int p_ix
= 0; p_ix
< nportals
; ++p_ix
) {
442 XioPortal
*portal
= portals
[p_ix
];
449 int nportals
= portals
.size();
450 for (int ix
= 0; ix
< nportals
; ++ix
)
458 #endif /* XIO_PORTAL_H */