]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/xio/XioConnection.cc
update download target update for octopus release
[ceph.git] / ceph / src / msg / xio / XioConnection.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include "XioMsg.h"
17#include "XioConnection.h"
18#include "XioMessenger.h"
19#include "messages/MDataPing.h"
20#include "msg/msg_types.h"
21#include "auth/none/AuthNoneProtocol.h" // XXX
22
11fdf7f2 23#include "include/ceph_assert.h"
7c673cae
FG
24#include "common/dout.h"
25
26extern struct xio_mempool *xio_msgr_mpool;
27extern struct xio_mempool *xio_msgr_noreg_mpool;
28
29#define dout_subsys ceph_subsys_xio
30
31void print_xio_msg_hdr(CephContext *cct, const char *tag,
32 const XioMsgHdr &hdr, const struct xio_msg *msg)
33{
34 if (msg) {
35 ldout(cct,4) << tag <<
36 " xio msg:" <<
37 " sn: " << msg->sn <<
38 " timestamp: " << msg->timestamp <<
39 dendl;
40 }
41
42 ldout(cct,4) << tag <<
43 " ceph header: " <<
44 " front_len: " << hdr.hdr->front_len <<
45 " seq: " << hdr.hdr->seq <<
46 " tid: " << hdr.hdr->tid <<
47 " type: " << hdr.hdr->type <<
48 " prio: " << hdr.hdr->priority <<
49 " name type: " << (int) hdr.hdr->src.type <<
50 " name num: " << (int) hdr.hdr->src.num <<
51 " version: " << hdr.hdr->version <<
52 " compat_version: " << hdr.hdr->compat_version <<
53 " front_len: " << hdr.hdr->front_len <<
54 " middle_len: " << hdr.hdr->middle_len <<
55 " data_len: " << hdr.hdr->data_len <<
56 " xio header: " <<
57 " msg_cnt: " << hdr.msg_cnt <<
58 dendl;
59
60 ldout(cct,4) << tag <<
61 " ceph footer: " <<
62 " front_crc: " << hdr.ftr->front_crc <<
63 " middle_crc: " << hdr.ftr->middle_crc <<
64 " data_crc: " << hdr.ftr->data_crc <<
65 " sig: " << hdr.ftr->sig <<
66 " flags: " << (uint32_t) hdr.ftr->flags <<
67 dendl;
68}
69
70void print_ceph_msg(CephContext *cct, const char *tag, Message *m)
71{
72 if (m->get_magic() & (MSG_MAGIC_XIO & MSG_MAGIC_TRACE_DTOR)) {
73 ceph_msg_header& header = m->get_header();
74 ldout(cct,4) << tag << " header version " << header.version <<
75 " compat version " << header.compat_version <<
76 dendl;
77 }
78}
79
80#undef dout_prefix
81#define dout_prefix conn_prefix(_dout)
82ostream& XioConnection::conn_prefix(std::ostream *_dout) {
83 return *_dout << "-- " << get_messenger()->get_myinst().addr << " >> " << peer_addr
84 << " peer=" << peer.name.type_str()
85 << " conn=" << conn << " sess=" << session << " ";
86}
87
88XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
89 const entity_inst_t& _peer) :
90 Connection(m->cct, m),
91 xio_conn_type(_type),
92 portal(m->get_portal()),
93 connected(false),
94 peer(_peer),
95 session(NULL),
96 conn(NULL),
97 magic(m->get_magic()),
98 scount(0),
99 send_ctr(0),
100 in_seq(),
101 cstate(this)
102{
7c673cae
FG
103 set_peer_type(peer.name.type());
104 set_peer_addr(peer.addr);
105
106 Messenger::Policy policy;
107 int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0;
108 int xopt;
109
110 policy = m->get_policy(peer_type);
111
112 if (policy.throttler_messages) {
113 max_msgs = policy.throttler_messages->get_max();
114 ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
115 }
116
117 xopt = m->cct->_conf->xio_queue_depth;
118 if (max_msgs > xopt)
119 xopt = max_msgs;
120
121 /* set high mark for send, reserved 20% for credits */
122 q_high_mark = xopt * 4 / 5;
123 q_low_mark = q_high_mark/2;
124
125 /* set send & receive msgs queue depth */
126 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
127 &xopt, sizeof(xopt));
128 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
129 &xopt, sizeof(xopt));
130
131 if (policy.throttler_bytes) {
132 max_bytes = policy.throttler_bytes->get_max();
133 ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
134 }
135
136 bytes_opt = (2 << 28); /* default: 512 MB */
137 if (max_bytes > bytes_opt)
138 bytes_opt = max_bytes;
139
140 /* set send & receive total bytes throttle */
141 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES,
142 &bytes_opt, sizeof(bytes_opt));
143 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES,
144 &bytes_opt, sizeof(bytes_opt));
145
146 ldout(m->cct,4) << "throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl;
147
148 /* XXXX fake features, aieee! */
149 set_features(XIO_ALL_FEATURES);
150}
151
152int XioConnection::send_message(Message *m)
153{
154 XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
155 return ms->_send_message(m, this);
156}
157
158void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp)
159{
160 /* If con is not in READY state, we need to queue the request */
161 if (cstate.session_state.read() != XioConnection::UP) {
11fdf7f2 162 std::lock_guad<ceph::util::spinlock> lg(sp);
7c673cae
FG
163 if (cstate.session_state.read() != XioConnection::UP) {
164 if (ack) {
165 outgoing.ack = true;
166 outgoing.ack_time = *tp;
167 }
168 else {
169 outgoing.keepalive = true;
170 }
7c673cae
FG
171 return;
172 }
7c673cae
FG
173 }
174
175 send_keepalive_or_ack_internal(ack, tp);
176}
177
178void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp)
179{
180 XioCommand *xcmd = pool_alloc_xio_command(this);
181 if (! xcmd) {
182 /* could happen if Accelio has been shutdown */
183 return;
184 }
185
186 struct ceph_timespec ts;
187 if (ack) {
11fdf7f2 188 ceph_assert(tp);
7c673cae
FG
189 tp->encode_timeval(&ts);
190 xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
191 xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
192 } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
193 utime_t t = ceph_clock_now();
194 t.encode_timeval(&ts);
195 xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2);
196 xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
197 } else {
198 xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE);
199 }
200
201 const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers();
11fdf7f2 202 ceph_assert(header.size() == 1); /* accelio header must be without scatter gather */
7c673cae 203 list<bufferptr>::const_iterator pb = header.begin();
11fdf7f2 204 ceph_assert(pb->length() < XioMsgHdr::get_max_encoded_length());
7c673cae
FG
205 struct xio_msg * msg = xcmd->get_xio_msg();
206 msg->out.header.iov_base = (char*) pb->c_str();
207 msg->out.header.iov_len = pb->length();
208
209 ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base)
210 << " len " << msg->out.header.iov_len << dendl;
211
212 portal->enqueue(this, xcmd);
213}
214
215
216int XioConnection::passive_setup()
217{
218 /* XXX passive setup is a placeholder for (potentially active-side
219 initiated) feature and auth* negotiation */
220 static bufferlist authorizer_reply; /* static because fake */
221 static CryptoKey session_key; /* ditto */
222 bool authorizer_valid;
223
224 XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
225
226 // fake an auth buffer
227 EntityName name;
228 name.set_type(peer.name.type());
229
230 AuthNoneAuthorizer auth;
231 auth.build_authorizer(name, peer.name.num());
232
233 /* XXX fake authorizer! */
234 msgr->ms_deliver_verify_authorizer(
235 this, peer_type, CEPH_AUTH_NONE,
236 auth.bl,
11fdf7f2 237 0,
7c673cae
FG
238 authorizer_reply,
239 authorizer_valid,
240 session_key);
241
242 /* notify hook */
243 msgr->ms_deliver_handle_accept(this);
244 msgr->ms_deliver_handle_fast_accept(this);
245
246 /* try to insert in conns_entity_map */
247 msgr->try_insert(this);
248 return (0);
249}
250
251static inline XioDispatchHook* pool_alloc_xio_dispatch_hook(
252 XioConnection *xcon, Message *m, XioInSeq& msg_seq)
253{
254 struct xio_reg_mem mp_mem;
255 int e = xpool_alloc(xio_msgr_noreg_mpool,
256 sizeof(XioDispatchHook), &mp_mem);
257 if (!!e)
258 return NULL;
259 XioDispatchHook *xhook = static_cast<XioDispatchHook*>(mp_mem.addr);
260 new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem);
261 return xhook;
262}
263
264int XioConnection::handle_data_msg(struct xio_session *session,
265 struct xio_msg *msg,
266 int more_in_batch,
267 void *cb_user_context)
268{
269 struct xio_msg *tmsg = msg;
270
271 /* XXX Accelio guarantees message ordering at
272 * xio_session */
273
274 if (! in_seq.p()) {
275 if (!tmsg->in.header.iov_len) {
276 ldout(msgr->cct,0) << __func__ << " empty header: packet out of sequence?" << dendl;
277 xio_release_msg(msg);
278 return 0;
279 }
280 const size_t sizeof_tag = 1;
281 XioMsgCnt msg_cnt(
282 buffer::create_static(tmsg->in.header.iov_len-sizeof_tag,
283 ((char*) tmsg->in.header.iov_base)+sizeof_tag));
284 ldout(msgr->cct,10) << __func__ << " receive msg " << "tmsg " << tmsg
285 << " msg_cnt " << msg_cnt.msg_cnt
286 << " iov_base " << tmsg->in.header.iov_base
287 << " iov_len " << (int) tmsg->in.header.iov_len
288 << " nents " << tmsg->in.pdata_iov.nents
289 << " sn " << tmsg->sn << dendl;
11fdf7f2 290 ceph_assert(session == this->session);
7c673cae
FG
291 in_seq.set_count(msg_cnt.msg_cnt);
292 } else {
293 /* XXX major sequence error */
11fdf7f2 294 ceph_assert(! tmsg->in.header.iov_len);
7c673cae
FG
295 }
296
297 in_seq.append(msg);
298 if (in_seq.count() > 0) {
299 return 0;
300 }
301
302 XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
303 XioDispatchHook *m_hook =
304 pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq);
305 XioInSeq& msg_seq = m_hook->msg_seq;
306 in_seq.clear();
307
308 ceph_msg_header header;
309 ceph_msg_footer footer;
310 buffer::list payload, middle, data;
311
312 const utime_t recv_stamp = ceph_clock_now();
313
314 ldout(msgr->cct,4) << __func__ << " " << "msg_seq.size()=" << msg_seq.size() <<
315 dendl;
316
317 struct xio_msg* msg_iter = msg_seq.begin();
318 tmsg = msg_iter;
319 XioMsgHdr hdr(header, footer,
320 buffer::create_static(tmsg->in.header.iov_len,
321 (char*) tmsg->in.header.iov_base));
322
323 if (magic & (MSG_MAGIC_TRACE_XCON)) {
324 if (hdr.hdr->type == 43) {
325 print_xio_msg_hdr(msgr->cct, "on_msg", hdr, NULL);
326 }
327 }
328
329 unsigned int ix, blen, iov_len;
330 struct xio_iovec_ex *msg_iov, *iovs;
331 uint32_t take_len, left_len = 0;
332 char *left_base = NULL;
333
334 ix = 0;
335 blen = header.front_len;
336
337 while (blen && (msg_iter != msg_seq.end())) {
338 tmsg = msg_iter;
339 iov_len = vmsg_sglist_nents(&tmsg->in);
340 iovs = vmsg_sglist(&tmsg->in);
341 for (; blen && (ix < iov_len); ++ix) {
342 msg_iov = &iovs[ix];
343
344 /* XXX need to detect any buffer which needs to be
345 * split due to coalescing of a segment (front, middle,
346 * data) boundary */
347
11fdf7f2 348 take_len = std::min(blen, msg_iov->iov_len);
7c673cae
FG
349 payload.append(
350 buffer::create_msg(
351 take_len, (char*) msg_iov->iov_base, m_hook));
352 blen -= take_len;
353 if (! blen) {
354 left_len = msg_iov->iov_len - take_len;
355 if (left_len) {
356 left_base = ((char*) msg_iov->iov_base) + take_len;
357 }
358 }
359 }
360 /* XXX as above, if a buffer is split, then we needed to track
361 * the new start (carry) and not advance */
362 if (ix == iov_len) {
363 msg_seq.next(&msg_iter);
364 ix = 0;
365 }
366 }
367
368 if (magic & (MSG_MAGIC_TRACE_XCON)) {
369 if (hdr.hdr->type == 43) {
370 ldout(msgr->cct,4) << "front (payload) dump:";
371 payload.hexdump( *_dout );
372 *_dout << dendl;
373 }
374 }
375
376 blen = header.middle_len;
377
378 if (blen && left_len) {
379 middle.append(
380 buffer::create_msg(left_len, left_base, m_hook));
381 left_len = 0;
382 }
383
384 while (blen && (msg_iter != msg_seq.end())) {
385 tmsg = msg_iter;
386 iov_len = vmsg_sglist_nents(&tmsg->in);
387 iovs = vmsg_sglist(&tmsg->in);
388 for (; blen && (ix < iov_len); ++ix) {
389 msg_iov = &iovs[ix];
11fdf7f2 390 take_len = std::min(blen, msg_iov->iov_len);
7c673cae
FG
391 middle.append(
392 buffer::create_msg(
393 take_len, (char*) msg_iov->iov_base, m_hook));
394 blen -= take_len;
395 if (! blen) {
396 left_len = msg_iov->iov_len - take_len;
397 if (left_len) {
398 left_base = ((char*) msg_iov->iov_base) + take_len;
399 }
400 }
401 }
402 if (ix == iov_len) {
403 msg_seq.next(&msg_iter);
404 ix = 0;
405 }
406 }
407
408 blen = header.data_len;
409
410 if (blen && left_len) {
411 data.append(
412 buffer::create_msg(left_len, left_base, m_hook));
413 left_len = 0;
414 }
415
416 while (blen && (msg_iter != msg_seq.end())) {
417 tmsg = msg_iter;
418 iov_len = vmsg_sglist_nents(&tmsg->in);
419 iovs = vmsg_sglist(&tmsg->in);
420 for (; blen && (ix < iov_len); ++ix) {
421 msg_iov = &iovs[ix];
422 data.append(
423 buffer::create_msg(
424 msg_iov->iov_len, (char*) msg_iov->iov_base, m_hook));
425 blen -= msg_iov->iov_len;
426 }
427 if (ix == iov_len) {
428 msg_seq.next(&msg_iter);
429 ix = 0;
430 }
431 }
432
433 /* update connection timestamp */
31f18b77 434 recv = tmsg->timestamp;
7c673cae
FG
435
436 Message *m = decode_message(msgr->cct, msgr->crcflags, header, footer,
437 payload, middle, data, this);
438
439 if (m) {
440 /* completion */
441 m->set_connection(this);
442
443 /* reply hook */
444 m_hook->set_message(m);
445 m->set_completion_hook(m_hook);
446
447 /* trace flag */
448 m->set_magic(magic);
449
450 /* update timestamps */
451 m->set_recv_stamp(recv_stamp);
452 m->set_recv_complete_stamp(ceph_clock_now());
453 m->set_seq(header.seq);
454
455 /* MP-SAFE */
456 state.set_in_seq(header.seq);
457
458 /* XXXX validate peer type */
459 if (peer_type != (int) hdr.peer_type) { /* XXX isn't peer_type -1? */
460 peer_type = hdr.peer_type;
461 peer_addr = hdr.addr;
462 peer.addr = peer_addr;
463 peer.name = entity_name_t(hdr.hdr->src);
464 if (xio_conn_type == XioConnection::PASSIVE) {
465 /* XXX kick off feature/authn/authz negotiation
466 * nb: very possibly the active side should initiate this, but
467 * for now, call a passive hook so OSD and friends can create
468 * sessions without actually negotiating
469 */
470 passive_setup();
471 }
472 }
473
474 if (magic & (MSG_MAGIC_TRACE_XCON)) {
475 ldout(msgr->cct,4) << "decode m is " << m->get_type() << dendl;
476 }
477
478 /* dispatch it */
479 msgr->ds_dispatch(m);
480 } else {
481 /* responds for undecoded messages and frees hook */
482 ldout(msgr->cct,4) << "decode m failed" << dendl;
483 m_hook->on_err_finalize(this);
484 }
485
486 return 0;
487}
488
489int XioConnection::on_msg(struct xio_session *session,
490 struct xio_msg *msg,
491 int more_in_batch,
492 void *cb_user_context)
493{
494 char tag = CEPH_MSGR_TAG_MSG;
495 if (msg->in.header.iov_len)
496 tag = *(char*)msg->in.header.iov_base;
497
498 ldout(msgr->cct,8) << __func__ << " receive msg with iov_len "
499 << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl;
500
501 //header_len_without_tag is only meaningful in case we have tag
502 size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag);
503
504 switch(tag) {
505 case CEPH_MSGR_TAG_MSG:
506 ldout(msgr->cct, 20) << __func__ << " got data message" << dendl;
507 return handle_data_msg(session, msg, more_in_batch, cb_user_context);
508
509 case CEPH_MSGR_TAG_KEEPALIVE:
510 ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
511 set_last_keepalive(ceph_clock_now());
512 break;
513
514 case CEPH_MSGR_TAG_KEEPALIVE2:
515 if (header_len_without_tag < sizeof(ceph_timespec)) {
516 lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag <<
517 " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
518 }
519 else {
520 ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
521 utime_t kp_t = utime_t(*t);
522 ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl;
523 send_keepalive_or_ack(true, &kp_t);
524 set_last_keepalive(ceph_clock_now());
525 }
526
527 break;
528
529 case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
530 if (header_len_without_tag < sizeof(ceph_timespec)) {
531 lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag <<
532 " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
533 }
534 else {
535 ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
536 utime_t kp_t(*t);
537 ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl;
538 set_last_keepalive_ack(kp_t);
539 }
540 break;
541
542 default:
543 lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl;
11fdf7f2 544 ceph_assert(! "unsupported message tag");
7c673cae
FG
545 }
546
547 xio_release_msg(msg);
548 return 0;
549}
550
551
552int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
553 struct xio_msg *req,
554 void *conn_user_context)
555{
556 /* requester send complete (one-way) */
557 uint64_t rc = ++scount;
558
559 XioSend* xsend = static_cast<XioSend*>(req->user_context);
560 if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) {
561 if (unlikely((rc % 1000000) == 0)) {
562 std::cout << "xio finished " << rc << " " << time(0) << std::endl;
563 }
564 } /* trace ctr */
565
566 ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xsend->xcon <<
567 " msg: " << req << " sn: " << req->sn << dendl;
568
569 XioMsg *xmsg = dynamic_cast<XioMsg*>(xsend);
570 if (xmsg) {
571 ldout(msgr->cct,11) << "on_msg_delivered xcon: " <<
572 " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
573 " seq: " << xmsg->m->get_seq() << dendl;
574 }
575
576 --send_ctr; /* atomic, because portal thread */
577
578 /* unblock flow-controlled connections, avoid oscillation */
579 if (unlikely(cstate.session_state.read() ==
580 XioConnection::FLOW_CONTROLLED)) {
581 if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
582 (1 /* XXX memory <= memory low-water mark */)) {
583 cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
584 ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xsend->xcon
585 << " up_ready from flow_controlled" << dendl;
586 }
587 }
588
589 xsend->put();
590
591 return 0;
592} /* on_msg_delivered */
593
594void XioConnection::msg_send_fail(XioSend *xsend, int code)
595{
596 ldout(msgr->cct,2) << "xio_send_msg FAILED xcon: " << this <<
597 " msg: " << xsend->get_xio_msg() << " code=" << code <<
598 " (" << xio_strerror(code) << ")" << dendl;
599 /* return refs taken for each xio_msg */
600 xsend->put_msg_refs();
601} /* msg_send_fail */
602
603void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
604{
605 ldout(msgr->cct,2) << "xio_release_msg FAILED xcon: " << this <<
606 " msg: " << msg << "code=" << code <<
607 " (" << xio_strerror(code) << ")" << dendl;
608} /* msg_release_fail */
609
610int XioConnection::flush_out_queues(uint32_t flags) {
611 XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
612 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 613 sp.lock();
7c673cae
FG
614
615 if (outgoing.keepalive) {
616 outgoing.keepalive = false;
617 send_keepalive_or_ack_internal();
618 }
619
620 if (outgoing.ack) {
621 outgoing.ack = false;
622 send_keepalive_or_ack_internal(true, &outgoing.ack_time);
623 }
624
625 // send deferred 1 (direct backpresssure)
626 if (outgoing.requeue.size() > 0)
627 portal->requeue(this, outgoing.requeue);
628
629 // send deferred 2 (sent while deferred)
630 int ix, q_size = outgoing.mqueue.size();
631 for (ix = 0; ix < q_size; ++ix) {
632 Message::Queue::iterator q_iter = outgoing.mqueue.begin();
633 Message* m = &(*q_iter);
634 outgoing.mqueue.erase(q_iter);
635 msgr->_send_message_impl(m, this);
636 }
637 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 638 sp.unlock();
7c673cae
FG
639 return 0;
640}
641
642int XioConnection::discard_out_queues(uint32_t flags)
643{
644 Message::Queue disc_q;
645 XioSubmit::Queue deferred_q;
646
647 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 648 sp.lock();
7c673cae
FG
649
650 /* the two send queues contain different objects:
651 * - anything on the mqueue is a Message
652 * - anything on the requeue is an XioSend
653 */
654 Message::Queue::const_iterator i1 = disc_q.end();
655 disc_q.splice(i1, outgoing.mqueue);
656
657 XioSubmit::Queue::const_iterator i2 = deferred_q.end();
658 deferred_q.splice(i2, outgoing.requeue);
659
660 outgoing.keepalive = outgoing.ack = false;
661
662 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 663 sp.unlock();
7c673cae
FG
664
665 // mqueue
666 while (!disc_q.empty()) {
667 Message::Queue::iterator q_iter = disc_q.begin();
668 Message* m = &(*q_iter);
669 disc_q.erase(q_iter);
670 m->put();
671 }
672
673 // requeue
674 while (!deferred_q.empty()) {
675 XioSubmit::Queue::iterator q_iter = deferred_q.begin();
676 XioSubmit* xs = &(*q_iter);
677 XioSend* xsend;
678 switch (xs->type) {
679 case XioSubmit::OUTGOING_MSG:
680 xsend = static_cast<XioSend*>(xs);
681 deferred_q.erase(q_iter);
682 // release once for each chained xio_msg
683 xsend->put(xsend->get_msg_count());
684 break;
685 case XioSubmit::INCOMING_MSG_RELEASE:
686 deferred_q.erase(q_iter);
687 portal->release_xio_msg(static_cast<XioCompletion*>(xs));
688 break;
689 default:
690 ldout(msgr->cct,0) << __func__ << ": Unknown Msg type " << xs->type << dendl;
691 break;
692 }
693 }
694
695 return 0;
696}
697
698int XioConnection::adjust_clru(uint32_t flags)
699{
700 if (flags & CState::OP_FLAG_LOCKED)
11fdf7f2 701 sp.unlock();
7c673cae
FG
702
703 XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
704 msgr->conns_sp.lock();
11fdf7f2 705 sp.lock();
7c673cae
FG
706
707 if (cstate.flags & CState::FLAG_MAPPED) {
708 XioConnection::ConnList::iterator citer =
709 XioConnection::ConnList::s_iterator_to(*this);
710 msgr->conns_list.erase(citer);
711 msgr->conns_list.push_front(*this); // LRU
712 }
713
714 msgr->conns_sp.unlock();
715
716 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 717 sp.unlock();
7c673cae
FG
718
719 return 0;
720}
721
722int XioConnection::on_msg_error(struct xio_session *session,
723 enum xio_status error,
724 struct xio_msg *msg,
725 void *conn_user_context)
726{
727 XioSend *xsend = static_cast<XioSend*>(msg->user_context);
728 if (xsend)
729 xsend->put();
730
731 --send_ctr; /* atomic, because portal thread */
732 return 0;
733} /* on_msg_error */
734
735void XioConnection::mark_down()
736{
737 _mark_down(XioConnection::CState::OP_FLAG_NONE);
738}
739
740int XioConnection::_mark_down(uint32_t flags)
741{
742 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 743 sp.lock();
7c673cae
FG
744
745 // per interface comment, we only stage a remote reset if the
746 // current policy required it
747 if (cstate.policy.resetcheck)
748 cstate.flags |= CState::FLAG_RESET;
749
750 disconnect();
751
752 /* XXX this will almost certainly be called again from
753 * on_disconnect_event() */
754 discard_out_queues(flags|CState::OP_FLAG_LOCKED);
755
756 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 757 sp.unlock();
7c673cae
FG
758
759 return 0;
760}
761
762void XioConnection::mark_disposable()
763{
764 _mark_disposable(XioConnection::CState::OP_FLAG_NONE);
765}
766
767int XioConnection::_mark_disposable(uint32_t flags)
768{
769 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 770 sp.lock();
7c673cae
FG
771
772 cstate.policy.lossy = true;
773
774 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 775 sp.unlock();
7c673cae
FG
776
777 return 0;
778}
779
780int XioConnection::CState::state_up_ready(uint32_t flags)
781{
782 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 783 xcon->sp.lock();
7c673cae
FG
784
785 xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED);
786
31f18b77
FG
787 session_state = session_states::UP;
788 startup_state = session_startup_states::READY;
7c673cae
FG
789
790 if (! (flags & CState::OP_FLAG_LOCKED))
11fdf7f2 791 xcon->sp.unlock();
7c673cae
FG
792
793 return (0);
794}
795
796int XioConnection::CState::state_discon()
797{
31f18b77
FG
798 session_state = session_states::DISCONNECTED;
799 startup_state = session_startup_states::IDLE;
7c673cae
FG
800
801 return 0;
802}
803
804int XioConnection::CState::state_flow_controlled(uint32_t flags)
805{
806 if (! (flags & OP_FLAG_LOCKED))
11fdf7f2 807 xcon->sp.lock();
7c673cae 808
31f18b77 809 session_state = session_states::FLOW_CONTROLLED;
7c673cae
FG
810
811 if (! (flags & OP_FLAG_LOCKED))
11fdf7f2 812 xcon->sp.unlock();
7c673cae
FG
813
814 return (0);
815}
816
817int XioConnection::CState::state_fail(Message* m, uint32_t flags)
818{
819 if (! (flags & OP_FLAG_LOCKED))
11fdf7f2 820 xcon->sp.lock();
7c673cae
FG
821
822 // advance to state FAIL, drop queued, msgs, adjust LRU
11fdf7f2
TL
823 session_state = session_states::DISCONNECTED;
824 startup_state = session_startup_states::FAIL;
7c673cae
FG
825
826 xcon->discard_out_queues(flags|OP_FLAG_LOCKED);
827 xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU);
828
829 xcon->disconnect();
830
831 if (! (flags & OP_FLAG_LOCKED))
11fdf7f2 832 xcon->sp.unlock();
7c673cae
FG
833
834 // notify ULP
835 XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger());
836 msgr->ms_deliver_handle_reset(xcon);
837 m->put();
838
839 return 0;
840}
841
842
843int XioLoopbackConnection::send_message(Message *m)
844{
845 XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
846 m->set_connection(this);
847 m->set_seq(next_seq());
848 m->set_src(ms->get_myinst().name);
849 ms->ds_dispatch(m);
850 return 0;
851}
852
853void XioLoopbackConnection::send_keepalive()
854{
855 utime_t t = ceph_clock_now();
856 set_last_keepalive(t);
857 set_last_keepalive_ack(t);
858}