1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "XioConnection.h"
18 #include "XioMessenger.h"
19 #include "messages/MDataPing.h"
20 #include "msg/msg_types.h"
21 #include "auth/none/AuthNoneProtocol.h" // XXX
23 #include "include/ceph_assert.h"
24 #include "common/dout.h"
26 extern struct xio_mempool
*xio_msgr_mpool
;
27 extern struct xio_mempool
*xio_msgr_noreg_mpool
;
29 #define dout_subsys ceph_subsys_xio
31 void print_xio_msg_hdr(CephContext
*cct
, const char *tag
,
32 const XioMsgHdr
&hdr
, const struct xio_msg
*msg
)
35 ldout(cct
,4) << tag
<<
38 " timestamp: " << msg
->timestamp
<<
42 ldout(cct
,4) << tag
<<
44 " front_len: " << hdr
.hdr
->front_len
<<
45 " seq: " << hdr
.hdr
->seq
<<
46 " tid: " << hdr
.hdr
->tid
<<
47 " type: " << hdr
.hdr
->type
<<
48 " prio: " << hdr
.hdr
->priority
<<
49 " name type: " << (int) hdr
.hdr
->src
.type
<<
50 " name num: " << (int) hdr
.hdr
->src
.num
<<
51 " version: " << hdr
.hdr
->version
<<
52 " compat_version: " << hdr
.hdr
->compat_version
<<
53 " front_len: " << hdr
.hdr
->front_len
<<
54 " middle_len: " << hdr
.hdr
->middle_len
<<
55 " data_len: " << hdr
.hdr
->data_len
<<
57 " msg_cnt: " << hdr
.msg_cnt
<<
60 ldout(cct
,4) << tag
<<
62 " front_crc: " << hdr
.ftr
->front_crc
<<
63 " middle_crc: " << hdr
.ftr
->middle_crc
<<
64 " data_crc: " << hdr
.ftr
->data_crc
<<
65 " sig: " << hdr
.ftr
->sig
<<
66 " flags: " << (uint32_t) hdr
.ftr
->flags
<<
70 void print_ceph_msg(CephContext
*cct
, const char *tag
, Message
*m
)
72 if (m
->get_magic() & (MSG_MAGIC_XIO
& MSG_MAGIC_TRACE_DTOR
)) {
73 ceph_msg_header
& header
= m
->get_header();
74 ldout(cct
,4) << tag
<< " header version " << header
.version
<<
75 " compat version " << header
.compat_version
<<
81 #define dout_prefix conn_prefix(_dout)
82 ostream
& XioConnection::conn_prefix(std::ostream
*_dout
) {
83 return *_dout
<< "-- " << get_messenger()->get_myinst().addr
<< " >> " << peer_addr
84 << " peer=" << peer
.name
.type_str()
85 << " conn=" << conn
<< " sess=" << session
<< " ";
88 XioConnection::XioConnection(XioMessenger
*m
, XioConnection::type _type
,
89 const entity_inst_t
& _peer
) :
90 Connection(m
->cct
, m
),
92 portal(m
->get_portal()),
97 magic(m
->get_magic()),
103 set_peer_type(peer
.name
.type());
104 set_peer_addr(peer
.addr
);
106 Messenger::Policy policy
;
107 int64_t max_msgs
= 0, max_bytes
= 0, bytes_opt
= 0;
110 policy
= m
->get_policy(peer_type
);
112 if (policy
.throttler_messages
) {
113 max_msgs
= policy
.throttler_messages
->get_max();
114 ldout(m
->cct
,4) << "XioMessenger throttle_msgs: " << max_msgs
<< dendl
;
117 xopt
= m
->cct
->_conf
->xio_queue_depth
;
121 /* set high mark for send, reserved 20% for credits */
122 q_high_mark
= xopt
* 4 / 5;
123 q_low_mark
= q_high_mark
/2;
125 /* set send & receive msgs queue depth */
126 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS
,
127 &xopt
, sizeof(xopt
));
128 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS
,
129 &xopt
, sizeof(xopt
));
131 if (policy
.throttler_bytes
) {
132 max_bytes
= policy
.throttler_bytes
->get_max();
133 ldout(m
->cct
,4) << "XioMessenger throttle_bytes: " << max_bytes
<< dendl
;
136 bytes_opt
= (2 << 28); /* default: 512 MB */
137 if (max_bytes
> bytes_opt
)
138 bytes_opt
= max_bytes
;
140 /* set send & receive total bytes throttle */
141 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES
,
142 &bytes_opt
, sizeof(bytes_opt
));
143 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES
,
144 &bytes_opt
, sizeof(bytes_opt
));
146 ldout(m
->cct
,4) << "throttle_msgs: " << xopt
<< " throttle_bytes: " << bytes_opt
<< dendl
;
148 /* XXXX fake features, aieee! */
149 set_features(XIO_ALL_FEATURES
);
152 int XioConnection::send_message(Message
*m
)
154 XioMessenger
*ms
= static_cast<XioMessenger
*>(get_messenger());
155 return ms
->_send_message(m
, this);
158 void XioConnection::send_keepalive_or_ack(bool ack
, const utime_t
*tp
)
160 /* If con is not in READY state, we need to queue the request */
161 if (cstate
.session_state
.read() != XioConnection::UP
) {
162 std::lock_guad
<ceph::util::spinlock
> lg(sp
);
163 if (cstate
.session_state
.read() != XioConnection::UP
) {
166 outgoing
.ack_time
= *tp
;
169 outgoing
.keepalive
= true;
175 send_keepalive_or_ack_internal(ack
, tp
);
178 void XioConnection::send_keepalive_or_ack_internal(bool ack
, const utime_t
*tp
)
180 XioCommand
*xcmd
= pool_alloc_xio_command(this);
182 /* could happen if Accelio has been shutdown */
186 struct ceph_timespec ts
;
189 tp
->encode_timeval(&ts
);
190 xcmd
->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK
);
191 xcmd
->get_bl_ref().append((char*)&ts
, sizeof(ts
));
192 } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
193 utime_t t
= ceph_clock_now();
194 t
.encode_timeval(&ts
);
195 xcmd
->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2
);
196 xcmd
->get_bl_ref().append((char*)&ts
, sizeof(ts
));
198 xcmd
->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE
);
201 const std::list
<buffer::ptr
>& header
= xcmd
->get_bl_ref().buffers();
202 ceph_assert(header
.size() == 1); /* accelio header must be without scatter gather */
203 list
<bufferptr
>::const_iterator pb
= header
.begin();
204 ceph_assert(pb
->length() < XioMsgHdr::get_max_encoded_length());
205 struct xio_msg
* msg
= xcmd
->get_xio_msg();
206 msg
->out
.header
.iov_base
= (char*) pb
->c_str();
207 msg
->out
.header
.iov_len
= pb
->length();
209 ldout(msgr
->cct
,8) << __func__
<< " sending command with tag " << (int)(*(char*)msg
->out
.header
.iov_base
)
210 << " len " << msg
->out
.header
.iov_len
<< dendl
;
212 portal
->enqueue(this, xcmd
);
216 int XioConnection::passive_setup()
218 /* XXX passive setup is a placeholder for (potentially active-side
219 initiated) feature and auth* negotiation */
220 static bufferlist authorizer_reply
; /* static because fake */
221 static CryptoKey session_key
; /* ditto */
222 bool authorizer_valid
;
224 XioMessenger
*msgr
= static_cast<XioMessenger
*>(get_messenger());
226 // fake an auth buffer
228 name
.set_type(peer
.name
.type());
230 AuthNoneAuthorizer auth
;
231 auth
.build_authorizer(name
, peer
.name
.num());
233 /* XXX fake authorizer! */
234 msgr
->ms_deliver_verify_authorizer(
235 this, peer_type
, CEPH_AUTH_NONE
,
243 msgr
->ms_deliver_handle_accept(this);
244 msgr
->ms_deliver_handle_fast_accept(this);
246 /* try to insert in conns_entity_map */
247 msgr
->try_insert(this);
251 static inline XioDispatchHook
* pool_alloc_xio_dispatch_hook(
252 XioConnection
*xcon
, Message
*m
, XioInSeq
& msg_seq
)
254 struct xio_reg_mem mp_mem
;
255 int e
= xpool_alloc(xio_msgr_noreg_mpool
,
256 sizeof(XioDispatchHook
), &mp_mem
);
259 XioDispatchHook
*xhook
= static_cast<XioDispatchHook
*>(mp_mem
.addr
);
260 new (xhook
) XioDispatchHook(xcon
, m
, msg_seq
, mp_mem
);
264 int XioConnection::handle_data_msg(struct xio_session
*session
,
267 void *cb_user_context
)
269 struct xio_msg
*tmsg
= msg
;
271 /* XXX Accelio guarantees message ordering at
275 if (!tmsg
->in
.header
.iov_len
) {
276 ldout(msgr
->cct
,0) << __func__
<< " empty header: packet out of sequence?" << dendl
;
277 xio_release_msg(msg
);
280 const size_t sizeof_tag
= 1;
282 buffer::create_static(tmsg
->in
.header
.iov_len
-sizeof_tag
,
283 ((char*) tmsg
->in
.header
.iov_base
)+sizeof_tag
));
284 ldout(msgr
->cct
,10) << __func__
<< " receive msg " << "tmsg " << tmsg
285 << " msg_cnt " << msg_cnt
.msg_cnt
286 << " iov_base " << tmsg
->in
.header
.iov_base
287 << " iov_len " << (int) tmsg
->in
.header
.iov_len
288 << " nents " << tmsg
->in
.pdata_iov
.nents
289 << " sn " << tmsg
->sn
<< dendl
;
290 ceph_assert(session
== this->session
);
291 in_seq
.set_count(msg_cnt
.msg_cnt
);
293 /* XXX major sequence error */
294 ceph_assert(! tmsg
->in
.header
.iov_len
);
298 if (in_seq
.count() > 0) {
302 XioMessenger
*msgr
= static_cast<XioMessenger
*>(get_messenger());
303 XioDispatchHook
*m_hook
=
304 pool_alloc_xio_dispatch_hook(this, NULL
/* msg */, in_seq
);
305 XioInSeq
& msg_seq
= m_hook
->msg_seq
;
308 ceph_msg_header header
;
309 ceph_msg_footer footer
;
310 buffer::list payload
, middle
, data
;
312 const utime_t recv_stamp
= ceph_clock_now();
314 ldout(msgr
->cct
,4) << __func__
<< " " << "msg_seq.size()=" << msg_seq
.size() <<
317 struct xio_msg
* msg_iter
= msg_seq
.begin();
319 XioMsgHdr
hdr(header
, footer
,
320 buffer::create_static(tmsg
->in
.header
.iov_len
,
321 (char*) tmsg
->in
.header
.iov_base
));
323 if (magic
& (MSG_MAGIC_TRACE_XCON
)) {
324 if (hdr
.hdr
->type
== 43) {
325 print_xio_msg_hdr(msgr
->cct
, "on_msg", hdr
, NULL
);
329 unsigned int ix
, blen
, iov_len
;
330 struct xio_iovec_ex
*msg_iov
, *iovs
;
331 uint32_t take_len
, left_len
= 0;
332 char *left_base
= NULL
;
335 blen
= header
.front_len
;
337 while (blen
&& (msg_iter
!= msg_seq
.end())) {
339 iov_len
= vmsg_sglist_nents(&tmsg
->in
);
340 iovs
= vmsg_sglist(&tmsg
->in
);
341 for (; blen
&& (ix
< iov_len
); ++ix
) {
344 /* XXX need to detect any buffer which needs to be
345 * split due to coalescing of a segment (front, middle,
348 take_len
= std::min(blen
, msg_iov
->iov_len
);
351 take_len
, (char*) msg_iov
->iov_base
, m_hook
));
354 left_len
= msg_iov
->iov_len
- take_len
;
356 left_base
= ((char*) msg_iov
->iov_base
) + take_len
;
360 /* XXX as above, if a buffer is split, then we needed to track
361 * the new start (carry) and not advance */
363 msg_seq
.next(&msg_iter
);
368 if (magic
& (MSG_MAGIC_TRACE_XCON
)) {
369 if (hdr
.hdr
->type
== 43) {
370 ldout(msgr
->cct
,4) << "front (payload) dump:";
371 payload
.hexdump( *_dout
);
376 blen
= header
.middle_len
;
378 if (blen
&& left_len
) {
380 buffer::create_msg(left_len
, left_base
, m_hook
));
384 while (blen
&& (msg_iter
!= msg_seq
.end())) {
386 iov_len
= vmsg_sglist_nents(&tmsg
->in
);
387 iovs
= vmsg_sglist(&tmsg
->in
);
388 for (; blen
&& (ix
< iov_len
); ++ix
) {
390 take_len
= std::min(blen
, msg_iov
->iov_len
);
393 take_len
, (char*) msg_iov
->iov_base
, m_hook
));
396 left_len
= msg_iov
->iov_len
- take_len
;
398 left_base
= ((char*) msg_iov
->iov_base
) + take_len
;
403 msg_seq
.next(&msg_iter
);
408 blen
= header
.data_len
;
410 if (blen
&& left_len
) {
412 buffer::create_msg(left_len
, left_base
, m_hook
));
416 while (blen
&& (msg_iter
!= msg_seq
.end())) {
418 iov_len
= vmsg_sglist_nents(&tmsg
->in
);
419 iovs
= vmsg_sglist(&tmsg
->in
);
420 for (; blen
&& (ix
< iov_len
); ++ix
) {
424 msg_iov
->iov_len
, (char*) msg_iov
->iov_base
, m_hook
));
425 blen
-= msg_iov
->iov_len
;
428 msg_seq
.next(&msg_iter
);
433 /* update connection timestamp */
434 recv
= tmsg
->timestamp
;
436 Message
*m
= decode_message(msgr
->cct
, msgr
->crcflags
, header
, footer
,
437 payload
, middle
, data
, this);
441 m
->set_connection(this);
444 m_hook
->set_message(m
);
445 m
->set_completion_hook(m_hook
);
450 /* update timestamps */
451 m
->set_recv_stamp(recv_stamp
);
452 m
->set_recv_complete_stamp(ceph_clock_now());
453 m
->set_seq(header
.seq
);
456 state
.set_in_seq(header
.seq
);
458 /* XXXX validate peer type */
459 if (peer_type
!= (int) hdr
.peer_type
) { /* XXX isn't peer_type -1? */
460 peer_type
= hdr
.peer_type
;
461 peer_addr
= hdr
.addr
;
462 peer
.addr
= peer_addr
;
463 peer
.name
= entity_name_t(hdr
.hdr
->src
);
464 if (xio_conn_type
== XioConnection::PASSIVE
) {
465 /* XXX kick off feature/authn/authz negotiation
466 * nb: very possibly the active side should initiate this, but
467 * for now, call a passive hook so OSD and friends can create
468 * sessions without actually negotiating
474 if (magic
& (MSG_MAGIC_TRACE_XCON
)) {
475 ldout(msgr
->cct
,4) << "decode m is " << m
->get_type() << dendl
;
479 msgr
->ds_dispatch(m
);
481 /* responds for undecoded messages and frees hook */
482 ldout(msgr
->cct
,4) << "decode m failed" << dendl
;
483 m_hook
->on_err_finalize(this);
489 int XioConnection::on_msg(struct xio_session
*session
,
492 void *cb_user_context
)
494 char tag
= CEPH_MSGR_TAG_MSG
;
495 if (msg
->in
.header
.iov_len
)
496 tag
= *(char*)msg
->in
.header
.iov_base
;
498 ldout(msgr
->cct
,8) << __func__
<< " receive msg with iov_len "
499 << (int) msg
->in
.header
.iov_len
<< " tag " << (int)tag
<< dendl
;
501 //header_len_without_tag is only meaningful in case we have tag
502 size_t header_len_without_tag
= msg
->in
.header
.iov_len
- sizeof(tag
);
505 case CEPH_MSGR_TAG_MSG
:
506 ldout(msgr
->cct
, 20) << __func__
<< " got data message" << dendl
;
507 return handle_data_msg(session
, msg
, more_in_batch
, cb_user_context
);
509 case CEPH_MSGR_TAG_KEEPALIVE
:
510 ldout(msgr
->cct
, 20) << __func__
<< " got KEEPALIVE" << dendl
;
511 set_last_keepalive(ceph_clock_now());
514 case CEPH_MSGR_TAG_KEEPALIVE2
:
515 if (header_len_without_tag
< sizeof(ceph_timespec
)) {
516 lderr(msgr
->cct
) << __func__
<< " too few data for KEEPALIVE2: got " << header_len_without_tag
<<
517 " bytes instead of " << sizeof(ceph_timespec
) << " bytes" << dendl
;
520 ceph_timespec
*t
= (ceph_timespec
*) ((char*)msg
->in
.header
.iov_base
+ sizeof(tag
));
521 utime_t kp_t
= utime_t(*t
);
522 ldout(msgr
->cct
, 20) << __func__
<< " got KEEPALIVE2 with timestamp" << kp_t
<< dendl
;
523 send_keepalive_or_ack(true, &kp_t
);
524 set_last_keepalive(ceph_clock_now());
529 case CEPH_MSGR_TAG_KEEPALIVE2_ACK
:
530 if (header_len_without_tag
< sizeof(ceph_timespec
)) {
531 lderr(msgr
->cct
) << __func__
<< " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag
<<
532 " bytes instead of " << sizeof(ceph_timespec
) << " bytes" << dendl
;
535 ceph_timespec
*t
= (ceph_timespec
*) ((char*)msg
->in
.header
.iov_base
+ sizeof(tag
));
537 ldout(msgr
->cct
, 20) << __func__
<< " got KEEPALIVE2_ACK with timestamp" << kp_t
<< dendl
;
538 set_last_keepalive_ack(kp_t
);
543 lderr(msgr
->cct
) << __func__
<< " unsupported message tag " << (int) tag
<< dendl
;
544 ceph_assert(! "unsupported message tag");
547 xio_release_msg(msg
);
552 int XioConnection::on_ow_msg_send_complete(struct xio_session
*session
,
554 void *conn_user_context
)
556 /* requester send complete (one-way) */
557 uint64_t rc
= ++scount
;
559 XioSend
* xsend
= static_cast<XioSend
*>(req
->user_context
);
560 if (unlikely(magic
& MSG_MAGIC_TRACE_CTR
)) {
561 if (unlikely((rc
% 1000000) == 0)) {
562 std::cout
<< "xio finished " << rc
<< " " << time(0) << std::endl
;
566 ldout(msgr
->cct
,11) << "on_msg_delivered xcon: " << xsend
->xcon
<<
567 " msg: " << req
<< " sn: " << req
->sn
<< dendl
;
569 XioMsg
*xmsg
= dynamic_cast<XioMsg
*>(xsend
);
571 ldout(msgr
->cct
,11) << "on_msg_delivered xcon: " <<
572 " type: " << xmsg
->m
->get_type() << " tid: " << xmsg
->m
->get_tid() <<
573 " seq: " << xmsg
->m
->get_seq() << dendl
;
576 --send_ctr
; /* atomic, because portal thread */
578 /* unblock flow-controlled connections, avoid oscillation */
579 if (unlikely(cstate
.session_state
.read() ==
580 XioConnection::FLOW_CONTROLLED
)) {
581 if ((send_ctr
<= uint32_t(xio_qdepth_low_mark())) &&
582 (1 /* XXX memory <= memory low-water mark */)) {
583 cstate
.state_up_ready(XioConnection::CState::OP_FLAG_NONE
);
584 ldout(msgr
->cct
,2) << "on_msg_delivered xcon: " << xsend
->xcon
585 << " up_ready from flow_controlled" << dendl
;
592 } /* on_msg_delivered */
594 void XioConnection::msg_send_fail(XioSend
*xsend
, int code
)
596 ldout(msgr
->cct
,2) << "xio_send_msg FAILED xcon: " << this <<
597 " msg: " << xsend
->get_xio_msg() << " code=" << code
<<
598 " (" << xio_strerror(code
) << ")" << dendl
;
599 /* return refs taken for each xio_msg */
600 xsend
->put_msg_refs();
601 } /* msg_send_fail */
603 void XioConnection::msg_release_fail(struct xio_msg
*msg
, int code
)
605 ldout(msgr
->cct
,2) << "xio_release_msg FAILED xcon: " << this <<
606 " msg: " << msg
<< "code=" << code
<<
607 " (" << xio_strerror(code
) << ")" << dendl
;
608 } /* msg_release_fail */
610 int XioConnection::flush_out_queues(uint32_t flags
) {
611 XioMessenger
* msgr
= static_cast<XioMessenger
*>(get_messenger());
612 if (! (flags
& CState::OP_FLAG_LOCKED
))
615 if (outgoing
.keepalive
) {
616 outgoing
.keepalive
= false;
617 send_keepalive_or_ack_internal();
621 outgoing
.ack
= false;
622 send_keepalive_or_ack_internal(true, &outgoing
.ack_time
);
625 // send deferred 1 (direct backpresssure)
626 if (outgoing
.requeue
.size() > 0)
627 portal
->requeue(this, outgoing
.requeue
);
629 // send deferred 2 (sent while deferred)
630 int ix
, q_size
= outgoing
.mqueue
.size();
631 for (ix
= 0; ix
< q_size
; ++ix
) {
632 Message::Queue::iterator q_iter
= outgoing
.mqueue
.begin();
633 Message
* m
= &(*q_iter
);
634 outgoing
.mqueue
.erase(q_iter
);
635 msgr
->_send_message_impl(m
, this);
637 if (! (flags
& CState::OP_FLAG_LOCKED
))
642 int XioConnection::discard_out_queues(uint32_t flags
)
644 Message::Queue disc_q
;
645 XioSubmit::Queue deferred_q
;
647 if (! (flags
& CState::OP_FLAG_LOCKED
))
650 /* the two send queues contain different objects:
651 * - anything on the mqueue is a Message
652 * - anything on the requeue is an XioSend
654 Message::Queue::const_iterator i1
= disc_q
.end();
655 disc_q
.splice(i1
, outgoing
.mqueue
);
657 XioSubmit::Queue::const_iterator i2
= deferred_q
.end();
658 deferred_q
.splice(i2
, outgoing
.requeue
);
660 outgoing
.keepalive
= outgoing
.ack
= false;
662 if (! (flags
& CState::OP_FLAG_LOCKED
))
666 while (!disc_q
.empty()) {
667 Message::Queue::iterator q_iter
= disc_q
.begin();
668 Message
* m
= &(*q_iter
);
669 disc_q
.erase(q_iter
);
674 while (!deferred_q
.empty()) {
675 XioSubmit::Queue::iterator q_iter
= deferred_q
.begin();
676 XioSubmit
* xs
= &(*q_iter
);
679 case XioSubmit::OUTGOING_MSG
:
680 xsend
= static_cast<XioSend
*>(xs
);
681 deferred_q
.erase(q_iter
);
682 // release once for each chained xio_msg
683 xsend
->put(xsend
->get_msg_count());
685 case XioSubmit::INCOMING_MSG_RELEASE
:
686 deferred_q
.erase(q_iter
);
687 portal
->release_xio_msg(static_cast<XioCompletion
*>(xs
));
690 ldout(msgr
->cct
,0) << __func__
<< ": Unknown Msg type " << xs
->type
<< dendl
;
698 int XioConnection::adjust_clru(uint32_t flags
)
700 if (flags
& CState::OP_FLAG_LOCKED
)
703 XioMessenger
* msgr
= static_cast<XioMessenger
*>(get_messenger());
704 msgr
->conns_sp
.lock();
707 if (cstate
.flags
& CState::FLAG_MAPPED
) {
708 XioConnection::ConnList::iterator citer
=
709 XioConnection::ConnList::s_iterator_to(*this);
710 msgr
->conns_list
.erase(citer
);
711 msgr
->conns_list
.push_front(*this); // LRU
714 msgr
->conns_sp
.unlock();
716 if (! (flags
& CState::OP_FLAG_LOCKED
))
722 int XioConnection::on_msg_error(struct xio_session
*session
,
723 enum xio_status error
,
725 void *conn_user_context
)
727 XioSend
*xsend
= static_cast<XioSend
*>(msg
->user_context
);
731 --send_ctr
; /* atomic, because portal thread */
735 void XioConnection::mark_down()
737 _mark_down(XioConnection::CState::OP_FLAG_NONE
);
740 int XioConnection::_mark_down(uint32_t flags
)
742 if (! (flags
& CState::OP_FLAG_LOCKED
))
745 // per interface comment, we only stage a remote reset if the
746 // current policy required it
747 if (cstate
.policy
.resetcheck
)
748 cstate
.flags
|= CState::FLAG_RESET
;
752 /* XXX this will almost certainly be called again from
753 * on_disconnect_event() */
754 discard_out_queues(flags
|CState::OP_FLAG_LOCKED
);
756 if (! (flags
& CState::OP_FLAG_LOCKED
))
762 void XioConnection::mark_disposable()
764 _mark_disposable(XioConnection::CState::OP_FLAG_NONE
);
767 int XioConnection::_mark_disposable(uint32_t flags
)
769 if (! (flags
& CState::OP_FLAG_LOCKED
))
772 cstate
.policy
.lossy
= true;
774 if (! (flags
& CState::OP_FLAG_LOCKED
))
780 int XioConnection::CState::state_up_ready(uint32_t flags
)
782 if (! (flags
& CState::OP_FLAG_LOCKED
))
785 xcon
->flush_out_queues(flags
|CState::OP_FLAG_LOCKED
);
787 session_state
= session_states::UP
;
788 startup_state
= session_startup_states::READY
;
790 if (! (flags
& CState::OP_FLAG_LOCKED
))
796 int XioConnection::CState::state_discon()
798 session_state
= session_states::DISCONNECTED
;
799 startup_state
= session_startup_states::IDLE
;
804 int XioConnection::CState::state_flow_controlled(uint32_t flags
)
806 if (! (flags
& OP_FLAG_LOCKED
))
809 session_state
= session_states::FLOW_CONTROLLED
;
811 if (! (flags
& OP_FLAG_LOCKED
))
817 int XioConnection::CState::state_fail(Message
* m
, uint32_t flags
)
819 if (! (flags
& OP_FLAG_LOCKED
))
822 // advance to state FAIL, drop queued, msgs, adjust LRU
823 session_state
= session_states::DISCONNECTED
;
824 startup_state
= session_startup_states::FAIL
;
826 xcon
->discard_out_queues(flags
|OP_FLAG_LOCKED
);
827 xcon
->adjust_clru(flags
|OP_FLAG_LOCKED
|OP_FLAG_LRU
);
831 if (! (flags
& OP_FLAG_LOCKED
))
835 XioMessenger
* msgr
= static_cast<XioMessenger
*>(xcon
->get_messenger());
836 msgr
->ms_deliver_handle_reset(xcon
);
843 int XioLoopbackConnection::send_message(Message
*m
)
845 XioMessenger
*ms
= static_cast<XioMessenger
*>(get_messenger());
846 m
->set_connection(this);
847 m
->set_seq(next_seq());
848 m
->set_src(ms
->get_myinst().name
);
853 void XioLoopbackConnection::send_keepalive()
855 utime_t t
= ceph_clock_now();
856 set_last_keepalive(t
);
857 set_last_keepalive_ack(t
);