]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/xio/XioPortal.h
update download target update for octopus release
[ceph.git] / ceph / src / msg / xio / XioPortal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
8 *s
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #ifndef XIO_PORTAL_H
17 #define XIO_PORTAL_H
18
19 #include <string>
20
21 extern "C" {
22 #include "libxio.h"
23 }
24 #include "XioInSeq.h"
25 #include <boost/lexical_cast.hpp>
26 #include "msg/SimplePolicyMessenger.h"
27 #include "XioConnection.h"
28 #include "XioMsg.h"
29
30 #include "include/spinlock.h"
31
32 #include "include/ceph_assert.h"
33 #include "common/dout.h"
34
35 #ifndef CACHE_LINE_SIZE
36 #define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
37 #endif
38 #define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
39
40 class XioPortal : public Thread
41 {
42 private:
43
44 struct SubmitQueue
45 {
46 const static int nlanes = 7;
47
48 struct Lane
49 {
50 uint32_t size;
51 XioSubmit::Queue q;
52 ceph::spinlock sp;
53 CACHE_PAD(0);
54 };
55
56 Lane qlane[nlanes];
57
58 int ix; /* atomicity by portal thread */
59
60 SubmitQueue() : ix(0)
61 {
62 int ix;
63 Lane* lane;
64
65 for (ix = 0; ix < nlanes; ++ix) {
66 lane = &qlane[ix];
67 lane->size = 0;
68 }
69 }
70
71 inline Lane* get_lane(XioConnection *xcon)
72 {
73 return &qlane[(((uint64_t) xcon) / 16) % nlanes];
74 }
75
76 void enq(XioConnection *xcon, XioSubmit* xs)
77 {
78 Lane* lane = get_lane(xcon);
79 std::lock_guard<decltype(lane->sp)> lg(lane->sp);
80 lane->q.push_back(*xs);
81 ++(lane->size);
82 }
83
84 void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
85 {
86 int size = requeue_q.size();
87 Lane* lane = get_lane(xcon);
88 std::lock_guard<decltype(lane->sp)> lg(lane->sp);
89 XioSubmit::Queue::const_iterator i1 = lane->q.end();
90 lane->q.splice(i1, requeue_q);
91 lane->size += size;
92 }
93
94 void deq(XioSubmit::Queue& send_q)
95 {
96 Lane* lane;
97 int cnt;
98
99 for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
100 std::lock_guard<decltype(lane->sp)> lg(lane->sp);
101 lane = &qlane[ix];
102 if (lane->size > 0) {
103 XioSubmit::Queue::const_iterator i1 = send_q.end();
104 send_q.splice(i1, lane->q);
105 lane->size = 0;
106 ++ix, ix = ix % nlanes;
107 break;
108 }
109 }
110 }
111
112 }; /* SubmitQueue */
113
114 Messenger *msgr;
115 struct xio_context *ctx;
116 struct xio_server *server;
117 SubmitQueue submit_q;
118 ceph::spinlock sp;
119 void *ev_loop;
120 string xio_uri;
121 char *portal_id;
122 bool _shutdown;
123 bool drained;
124 uint32_t magic;
125 uint32_t special_handling;
126
127 friend class XioPortals;
128 friend class XioMessenger;
129
130 public:
131 explicit XioPortal(Messenger *_msgr, int max_conns) :
132 msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
133 portal_id(NULL), _shutdown(false), drained(false),
134 magic(0),
135 special_handling(0)
136 {
137 struct xio_context_params ctx_params;
138 memset(&ctx_params, 0, sizeof(ctx_params));
139 ctx_params.user_context = this;
140 /*
141 * hint to Accelio the total number of connections that will share
142 * this context's resources: internal primary task pool...
143 */
144 ctx_params.max_conns_per_ctx = max_conns;
145
146 /* a portal is an xio_context and event loop */
147 ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
148 ceph_assert(ctx && "Whoops, failed to create portal/ctx");
149 }
150
151 int bind(struct xio_session_ops *ops, const string &base_uri,
152 uint16_t port, uint16_t *assigned_port);
153
154 inline void release_xio_msg(XioCompletion* xcmp) {
155 struct xio_msg *msg = xcmp->dequeue();
156 struct xio_msg *next_msg = NULL;
157 int code;
158 if (unlikely(!xcmp->xcon->conn)) {
159 // NOTE: msg is not safe to dereference if the connection was torn down
160 xcmp->xcon->msg_release_fail(msg, ENOTCONN);
161 }
162 else while (msg) {
163 next_msg = static_cast<struct xio_msg *>(msg->user_context);
164 code = xio_release_msg(msg);
165 if (unlikely(code)) /* very unlikely, so log it */
166 xcmp->xcon->msg_release_fail(msg, code);
167 msg = next_msg;
168 }
169 xcmp->trace.event("xio_release_msg");
170 xcmp->finalize(); /* unconditional finalize */
171 }
172
173 void enqueue(XioConnection *xcon, XioSubmit *xs)
174 {
175 if (! _shutdown) {
176 submit_q.enq(xcon, xs);
177 xio_context_stop_loop(ctx);
178 return;
179 }
180
181 /* dispose xs */
182 switch(xs->type) {
183 case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
184 {
185 XioSend* xsend = static_cast<XioSend*>(xs);
186 xs->xcon->msg_send_fail(xsend, -EINVAL);
187 }
188 break;
189 default:
190 /* INCOMING_MSG_RELEASE */
191 release_xio_msg(static_cast<XioCompletion*>(xs));
192 break;
193 };
194 }
195
196 void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
197 submit_q.enq(xcon, send_q);
198 }
199
200 void requeue_all_xcon(XioConnection* xcon,
201 XioSubmit::Queue::iterator& q_iter,
202 XioSubmit::Queue& send_q) {
203 // XXX gather all already-dequeued outgoing messages for xcon
204 // and push them in FIFO order to front of the input queue,
205 // and mark the connection as flow-controlled
206 XioSubmit::Queue requeue_q;
207
208 while (q_iter != send_q.end()) {
209 XioSubmit *xs = &(*q_iter);
210 // skip retires and anything for other connections
211 if (xs->xcon != xcon) {
212 q_iter++;
213 continue;
214 }
215 q_iter = send_q.erase(q_iter);
216 requeue_q.push_back(*xs);
217 }
218 std::lock_guard<decltype(xcon->sp)> lg(xcon->sp);
219 XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
220 xcon->outgoing.requeue.splice(i1, requeue_q);
221 xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
222 }
223
224 void *entry()
225 {
226 int size, code = 0;
227 uint32_t xio_qdepth_high;
228 XioSubmit::Queue send_q;
229 XioSubmit::Queue::iterator q_iter;
230 struct xio_msg *msg = NULL;
231 XioConnection *xcon;
232 XioSubmit *xs;
233 XioSend *xsend;
234
235 do {
236 submit_q.deq(send_q);
237
238 /* shutdown() barrier */
239 std::lock_guard<decltype(sp)> lg(sp);
240
241 restart:
242 size = send_q.size();
243
244 if (_shutdown) {
245 // XXX XioSend queues for flow-controlled connections may require
246 // cleanup
247 drained = true;
248 }
249
250 if (size > 0) {
251 q_iter = send_q.begin();
252 while (q_iter != send_q.end()) {
253 xs = &(*q_iter);
254 xcon = xs->xcon;
255
256 switch (xs->type) {
257 case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
258 xsend = static_cast<XioSend*>(xs);
259 if (unlikely(!xcon->conn || !xcon->is_connected()))
260 code = ENOTCONN;
261 else {
262 /* XXX guard Accelio send queue (should be safe to rely
263 * on Accelio's check on below, but this assures that
264 * all chained xio_msg are accounted) */
265 xio_qdepth_high = xcon->xio_qdepth_high_mark();
266 if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
267 xio_qdepth_high)) {
268 requeue_all_xcon(xcon, q_iter, send_q);
269 goto restart;
270 }
271
272 xs->trace.event("xio_send_msg");
273 msg = xsend->get_xio_msg();
274 code = xio_send_msg(xcon->conn, msg);
275 /* header trace moved here to capture xio serial# */
276 if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
277 xsend->print_debug(msgr->cct, "xio_send_msg");
278 }
279 /* get the right Accelio's errno code */
280 if (unlikely(code)) {
281 if ((code == -1) && (xio_errno() == -1)) {
282 /* In case XIO does not have any credits to send,
283 * it would still queue up the message(s) for transmission,
284 * but would return -1 and errno would also be set to -1.
285 * This needs to be treated as a success.
286 */
287 code = 0;
288 }
289 else {
290 code = xio_errno();
291 }
292 }
293 } /* !ENOTCONN */
294 if (unlikely(code)) {
295 switch (code) {
296 case XIO_E_TX_QUEUE_OVERFLOW:
297 {
298 requeue_all_xcon(xcon, q_iter, send_q);
299 goto restart;
300 }
301 break;
302 default:
303 q_iter = send_q.erase(q_iter);
304 xcon->msg_send_fail(xsend, code);
305 continue;
306 break;
307 };
308 } else {
309 xcon->send.set(msg->timestamp); // need atomic?
310 xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
311 }
312 break;
313 default:
314 /* INCOMING_MSG_RELEASE */
315 q_iter = send_q.erase(q_iter);
316 release_xio_msg(static_cast<XioCompletion*>(xs));
317 continue;
318 } /* switch (xs->type) */
319 q_iter = send_q.erase(q_iter);
320 } /* while */
321 } /* size > 0 */
322
323 xio_context_run_loop(ctx, 300);
324
325 } while ((!_shutdown) || (!drained));
326
327 /* shutting down */
328 if (server) {
329 xio_unbind(server);
330 }
331 xio_context_destroy(ctx);
332 return NULL;
333 }
334
335 void shutdown()
336 {
337 std::lock_guard<decltype(sp)> lg(sp);
338 _shutdown = true;
339 }
340 };
341
342 class XioPortals
343 {
344 private:
345 vector<XioPortal*> portals;
346 char **p_vec;
347 int n;
348 int last_unused;
349
350 public:
351 XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
352 {
353 n = max(_n, 1);
354
355 portals.resize(n);
356 for (int i = 0; i < n; i++) {
357 if (!portals[i]) {
358 portals[i] = new XioPortal(msgr, nconns);
359 ceph_assert(portals[i] != nullptr);
360 }
361 }
362 }
363
364 vector<XioPortal*>& get() { return portals; }
365
366 const char **get_vec()
367 {
368 return (const char **) p_vec;
369 }
370
371 int get_portals_len()
372 {
373 return n;
374 }
375
376 int get_last_unused()
377 {
378 int pix = last_unused;
379 if (++last_unused >= get_portals_len())
380 last_unused = 0;
381 return pix;
382 }
383
384 XioPortal* get_next_portal()
385 {
386 int pix = get_last_unused();
387 return portals[pix];
388 }
389
390 int bind(struct xio_session_ops *ops, const string& base_uri,
391 uint16_t port, uint16_t *port0);
392
393 int accept(struct xio_session *session,
394 struct xio_new_session_req *req,
395 void *cb_user_context)
396 {
397 const char **portals_vec = get_vec();
398 int pix = get_last_unused();
399
400 if (pix == 0) {
401 return xio_accept(session, NULL, 0, NULL, 0);
402 } else {
403 return xio_accept(session,
404 (const char **)&(portals_vec[pix]),
405 1, NULL, 0);
406 }
407 }
408
409 void start()
410 {
411 XioPortal *portal;
412 int p_ix, nportals = portals.size();
413
414 p_vec = new char*[nportals];
415 for (p_ix = 0; p_ix < nportals; ++p_ix) {
416 portal = portals[p_ix];
417 p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
418 portal->portal_id;
419 }
420
421 for (p_ix = 0; p_ix < nportals; ++p_ix) {
422 string thread_name = "ms_xio_";
423 thread_name.append(std::to_string(p_ix));
424 portal = portals[p_ix];
425 portal->create(thread_name.c_str());
426 }
427 }
428
429 void shutdown()
430 {
431 int nportals = portals.size();
432 for (int p_ix = 0; p_ix < nportals; ++p_ix) {
433 XioPortal *portal = portals[p_ix];
434 portal->shutdown();
435 }
436 }
437
438 void join()
439 {
440 int nportals = portals.size();
441 for (int p_ix = 0; p_ix < nportals; ++p_ix) {
442 XioPortal *portal = portals[p_ix];
443 portal->join();
444 }
445 }
446
447 ~XioPortals()
448 {
449 int nportals = portals.size();
450 for (int ix = 0; ix < nportals; ++ix)
451 delete(portals[ix]);
452 portals.clear();
453 if (p_vec)
454 delete[] p_vec;
455 }
456 };
457
458 #endif /* XIO_PORTAL_H */