1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "XioConnection.h"
18 #include "XioMessenger.h"
19 #include "messages/MDataPing.h"
20 #include "msg/msg_types.h"
21 #include "auth/none/AuthNoneProtocol.h" // XXX
23 #include "include/assert.h"
24 #include "common/dout.h"
26 extern struct xio_mempool
*xio_msgr_mpool
;
27 extern struct xio_mempool
*xio_msgr_noreg_mpool
;
29 #define dout_subsys ceph_subsys_xio
31 void print_xio_msg_hdr(CephContext
*cct
, const char *tag
,
32 const XioMsgHdr
&hdr
, const struct xio_msg
*msg
)
35 ldout(cct
,4) << tag
<<
38 " timestamp: " << msg
->timestamp
<<
42 ldout(cct
,4) << tag
<<
44 " front_len: " << hdr
.hdr
->front_len
<<
45 " seq: " << hdr
.hdr
->seq
<<
46 " tid: " << hdr
.hdr
->tid
<<
47 " type: " << hdr
.hdr
->type
<<
48 " prio: " << hdr
.hdr
->priority
<<
49 " name type: " << (int) hdr
.hdr
->src
.type
<<
50 " name num: " << (int) hdr
.hdr
->src
.num
<<
51 " version: " << hdr
.hdr
->version
<<
52 " compat_version: " << hdr
.hdr
->compat_version
<<
53 " front_len: " << hdr
.hdr
->front_len
<<
54 " middle_len: " << hdr
.hdr
->middle_len
<<
55 " data_len: " << hdr
.hdr
->data_len
<<
57 " msg_cnt: " << hdr
.msg_cnt
<<
60 ldout(cct
,4) << tag
<<
62 " front_crc: " << hdr
.ftr
->front_crc
<<
63 " middle_crc: " << hdr
.ftr
->middle_crc
<<
64 " data_crc: " << hdr
.ftr
->data_crc
<<
65 " sig: " << hdr
.ftr
->sig
<<
66 " flags: " << (uint32_t) hdr
.ftr
->flags
<<
70 void print_ceph_msg(CephContext
*cct
, const char *tag
, Message
*m
)
72 if (m
->get_magic() & (MSG_MAGIC_XIO
& MSG_MAGIC_TRACE_DTOR
)) {
73 ceph_msg_header
& header
= m
->get_header();
74 ldout(cct
,4) << tag
<< " header version " << header
.version
<<
75 " compat version " << header
.compat_version
<<
81 #define dout_prefix conn_prefix(_dout)
82 ostream
& XioConnection::conn_prefix(std::ostream
*_dout
) {
83 return *_dout
<< "-- " << get_messenger()->get_myinst().addr
<< " >> " << peer_addr
84 << " peer=" << peer
.name
.type_str()
85 << " conn=" << conn
<< " sess=" << session
<< " ";
88 XioConnection::XioConnection(XioMessenger
*m
, XioConnection::type _type
,
89 const entity_inst_t
& _peer
) :
90 Connection(m
->cct
, m
),
92 portal(m
->get_portal()),
97 magic(m
->get_magic()),
103 pthread_spin_init(&sp
, PTHREAD_PROCESS_PRIVATE
);
104 set_peer_type(peer
.name
.type());
105 set_peer_addr(peer
.addr
);
107 Messenger::Policy policy
;
108 int64_t max_msgs
= 0, max_bytes
= 0, bytes_opt
= 0;
111 policy
= m
->get_policy(peer_type
);
113 if (policy
.throttler_messages
) {
114 max_msgs
= policy
.throttler_messages
->get_max();
115 ldout(m
->cct
,4) << "XioMessenger throttle_msgs: " << max_msgs
<< dendl
;
118 xopt
= m
->cct
->_conf
->xio_queue_depth
;
122 /* set high mark for send, reserved 20% for credits */
123 q_high_mark
= xopt
* 4 / 5;
124 q_low_mark
= q_high_mark
/2;
126 /* set send & receive msgs queue depth */
127 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS
,
128 &xopt
, sizeof(xopt
));
129 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS
,
130 &xopt
, sizeof(xopt
));
132 if (policy
.throttler_bytes
) {
133 max_bytes
= policy
.throttler_bytes
->get_max();
134 ldout(m
->cct
,4) << "XioMessenger throttle_bytes: " << max_bytes
<< dendl
;
137 bytes_opt
= (2 << 28); /* default: 512 MB */
138 if (max_bytes
> bytes_opt
)
139 bytes_opt
= max_bytes
;
141 /* set send & receive total bytes throttle */
142 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES
,
143 &bytes_opt
, sizeof(bytes_opt
));
144 xio_set_opt(NULL
, XIO_OPTLEVEL_ACCELIO
, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES
,
145 &bytes_opt
, sizeof(bytes_opt
));
147 ldout(m
->cct
,4) << "throttle_msgs: " << xopt
<< " throttle_bytes: " << bytes_opt
<< dendl
;
149 /* XXXX fake features, aieee! */
150 set_features(XIO_ALL_FEATURES
);
153 int XioConnection::send_message(Message
*m
)
155 XioMessenger
*ms
= static_cast<XioMessenger
*>(get_messenger());
156 return ms
->_send_message(m
, this);
159 void XioConnection::send_keepalive_or_ack(bool ack
, const utime_t
*tp
)
161 /* If con is not in READY state, we need to queue the request */
162 if (cstate
.session_state
.read() != XioConnection::UP
) {
163 pthread_spin_lock(&sp
);
164 if (cstate
.session_state
.read() != XioConnection::UP
) {
167 outgoing
.ack_time
= *tp
;
170 outgoing
.keepalive
= true;
172 pthread_spin_unlock(&sp
);
175 pthread_spin_unlock(&sp
);
178 send_keepalive_or_ack_internal(ack
, tp
);
181 void XioConnection::send_keepalive_or_ack_internal(bool ack
, const utime_t
*tp
)
183 XioCommand
*xcmd
= pool_alloc_xio_command(this);
185 /* could happen if Accelio has been shutdown */
189 struct ceph_timespec ts
;
192 tp
->encode_timeval(&ts
);
193 xcmd
->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK
);
194 xcmd
->get_bl_ref().append((char*)&ts
, sizeof(ts
));
195 } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
196 utime_t t
= ceph_clock_now();
197 t
.encode_timeval(&ts
);
198 xcmd
->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2
);
199 xcmd
->get_bl_ref().append((char*)&ts
, sizeof(ts
));
201 xcmd
->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE
);
204 const std::list
<buffer::ptr
>& header
= xcmd
->get_bl_ref().buffers();
205 assert(header
.size() == 1); /* accelio header must be without scatter gather */
206 list
<bufferptr
>::const_iterator pb
= header
.begin();
207 assert(pb
->length() < XioMsgHdr::get_max_encoded_length());
208 struct xio_msg
* msg
= xcmd
->get_xio_msg();
209 msg
->out
.header
.iov_base
= (char*) pb
->c_str();
210 msg
->out
.header
.iov_len
= pb
->length();
212 ldout(msgr
->cct
,8) << __func__
<< " sending command with tag " << (int)(*(char*)msg
->out
.header
.iov_base
)
213 << " len " << msg
->out
.header
.iov_len
<< dendl
;
215 portal
->enqueue(this, xcmd
);
219 int XioConnection::passive_setup()
221 /* XXX passive setup is a placeholder for (potentially active-side
222 initiated) feature and auth* negotiation */
223 static bufferlist authorizer_reply
; /* static because fake */
224 static CryptoKey session_key
; /* ditto */
225 bool authorizer_valid
;
227 XioMessenger
*msgr
= static_cast<XioMessenger
*>(get_messenger());
229 // fake an auth buffer
231 name
.set_type(peer
.name
.type());
233 AuthNoneAuthorizer auth
;
234 auth
.build_authorizer(name
, peer
.name
.num());
236 /* XXX fake authorizer! */
237 msgr
->ms_deliver_verify_authorizer(
238 this, peer_type
, CEPH_AUTH_NONE
,
245 msgr
->ms_deliver_handle_accept(this);
246 msgr
->ms_deliver_handle_fast_accept(this);
248 /* try to insert in conns_entity_map */
249 msgr
->try_insert(this);
253 static inline XioDispatchHook
* pool_alloc_xio_dispatch_hook(
254 XioConnection
*xcon
, Message
*m
, XioInSeq
& msg_seq
)
256 struct xio_reg_mem mp_mem
;
257 int e
= xpool_alloc(xio_msgr_noreg_mpool
,
258 sizeof(XioDispatchHook
), &mp_mem
);
261 XioDispatchHook
*xhook
= static_cast<XioDispatchHook
*>(mp_mem
.addr
);
262 new (xhook
) XioDispatchHook(xcon
, m
, msg_seq
, mp_mem
);
266 int XioConnection::handle_data_msg(struct xio_session
*session
,
269 void *cb_user_context
)
271 struct xio_msg
*tmsg
= msg
;
273 /* XXX Accelio guarantees message ordering at
277 if (!tmsg
->in
.header
.iov_len
) {
278 ldout(msgr
->cct
,0) << __func__
<< " empty header: packet out of sequence?" << dendl
;
279 xio_release_msg(msg
);
282 const size_t sizeof_tag
= 1;
284 buffer::create_static(tmsg
->in
.header
.iov_len
-sizeof_tag
,
285 ((char*) tmsg
->in
.header
.iov_base
)+sizeof_tag
));
286 ldout(msgr
->cct
,10) << __func__
<< " receive msg " << "tmsg " << tmsg
287 << " msg_cnt " << msg_cnt
.msg_cnt
288 << " iov_base " << tmsg
->in
.header
.iov_base
289 << " iov_len " << (int) tmsg
->in
.header
.iov_len
290 << " nents " << tmsg
->in
.pdata_iov
.nents
291 << " sn " << tmsg
->sn
<< dendl
;
292 assert(session
== this->session
);
293 in_seq
.set_count(msg_cnt
.msg_cnt
);
295 /* XXX major sequence error */
296 assert(! tmsg
->in
.header
.iov_len
);
300 if (in_seq
.count() > 0) {
304 XioMessenger
*msgr
= static_cast<XioMessenger
*>(get_messenger());
305 XioDispatchHook
*m_hook
=
306 pool_alloc_xio_dispatch_hook(this, NULL
/* msg */, in_seq
);
307 XioInSeq
& msg_seq
= m_hook
->msg_seq
;
310 ceph_msg_header header
;
311 ceph_msg_footer footer
;
312 buffer::list payload
, middle
, data
;
314 const utime_t recv_stamp
= ceph_clock_now();
316 ldout(msgr
->cct
,4) << __func__
<< " " << "msg_seq.size()=" << msg_seq
.size() <<
319 struct xio_msg
* msg_iter
= msg_seq
.begin();
321 XioMsgHdr
hdr(header
, footer
,
322 buffer::create_static(tmsg
->in
.header
.iov_len
,
323 (char*) tmsg
->in
.header
.iov_base
));
325 if (magic
& (MSG_MAGIC_TRACE_XCON
)) {
326 if (hdr
.hdr
->type
== 43) {
327 print_xio_msg_hdr(msgr
->cct
, "on_msg", hdr
, NULL
);
331 unsigned int ix
, blen
, iov_len
;
332 struct xio_iovec_ex
*msg_iov
, *iovs
;
333 uint32_t take_len
, left_len
= 0;
334 char *left_base
= NULL
;
337 blen
= header
.front_len
;
339 while (blen
&& (msg_iter
!= msg_seq
.end())) {
341 iov_len
= vmsg_sglist_nents(&tmsg
->in
);
342 iovs
= vmsg_sglist(&tmsg
->in
);
343 for (; blen
&& (ix
< iov_len
); ++ix
) {
346 /* XXX need to detect any buffer which needs to be
347 * split due to coalescing of a segment (front, middle,
350 take_len
= MIN(blen
, msg_iov
->iov_len
);
353 take_len
, (char*) msg_iov
->iov_base
, m_hook
));
356 left_len
= msg_iov
->iov_len
- take_len
;
358 left_base
= ((char*) msg_iov
->iov_base
) + take_len
;
362 /* XXX as above, if a buffer is split, then we needed to track
363 * the new start (carry) and not advance */
365 msg_seq
.next(&msg_iter
);
370 if (magic
& (MSG_MAGIC_TRACE_XCON
)) {
371 if (hdr
.hdr
->type
== 43) {
372 ldout(msgr
->cct
,4) << "front (payload) dump:";
373 payload
.hexdump( *_dout
);
378 blen
= header
.middle_len
;
380 if (blen
&& left_len
) {
382 buffer::create_msg(left_len
, left_base
, m_hook
));
386 while (blen
&& (msg_iter
!= msg_seq
.end())) {
388 iov_len
= vmsg_sglist_nents(&tmsg
->in
);
389 iovs
= vmsg_sglist(&tmsg
->in
);
390 for (; blen
&& (ix
< iov_len
); ++ix
) {
392 take_len
= MIN(blen
, msg_iov
->iov_len
);
395 take_len
, (char*) msg_iov
->iov_base
, m_hook
));
398 left_len
= msg_iov
->iov_len
- take_len
;
400 left_base
= ((char*) msg_iov
->iov_base
) + take_len
;
405 msg_seq
.next(&msg_iter
);
410 blen
= header
.data_len
;
412 if (blen
&& left_len
) {
414 buffer::create_msg(left_len
, left_base
, m_hook
));
418 while (blen
&& (msg_iter
!= msg_seq
.end())) {
420 iov_len
= vmsg_sglist_nents(&tmsg
->in
);
421 iovs
= vmsg_sglist(&tmsg
->in
);
422 for (; blen
&& (ix
< iov_len
); ++ix
) {
426 msg_iov
->iov_len
, (char*) msg_iov
->iov_base
, m_hook
));
427 blen
-= msg_iov
->iov_len
;
430 msg_seq
.next(&msg_iter
);
435 /* update connection timestamp */
436 recv
= tmsg
->timestamp
;
438 Message
*m
= decode_message(msgr
->cct
, msgr
->crcflags
, header
, footer
,
439 payload
, middle
, data
, this);
443 m
->set_connection(this);
446 m_hook
->set_message(m
);
447 m
->set_completion_hook(m_hook
);
452 /* update timestamps */
453 m
->set_recv_stamp(recv_stamp
);
454 m
->set_recv_complete_stamp(ceph_clock_now());
455 m
->set_seq(header
.seq
);
458 state
.set_in_seq(header
.seq
);
460 /* XXXX validate peer type */
461 if (peer_type
!= (int) hdr
.peer_type
) { /* XXX isn't peer_type -1? */
462 peer_type
= hdr
.peer_type
;
463 peer_addr
= hdr
.addr
;
464 peer
.addr
= peer_addr
;
465 peer
.name
= entity_name_t(hdr
.hdr
->src
);
466 if (xio_conn_type
== XioConnection::PASSIVE
) {
467 /* XXX kick off feature/authn/authz negotiation
468 * nb: very possibly the active side should initiate this, but
469 * for now, call a passive hook so OSD and friends can create
470 * sessions without actually negotiating
476 if (magic
& (MSG_MAGIC_TRACE_XCON
)) {
477 ldout(msgr
->cct
,4) << "decode m is " << m
->get_type() << dendl
;
481 msgr
->ds_dispatch(m
);
483 /* responds for undecoded messages and frees hook */
484 ldout(msgr
->cct
,4) << "decode m failed" << dendl
;
485 m_hook
->on_err_finalize(this);
491 int XioConnection::on_msg(struct xio_session
*session
,
494 void *cb_user_context
)
496 char tag
= CEPH_MSGR_TAG_MSG
;
497 if (msg
->in
.header
.iov_len
)
498 tag
= *(char*)msg
->in
.header
.iov_base
;
500 ldout(msgr
->cct
,8) << __func__
<< " receive msg with iov_len "
501 << (int) msg
->in
.header
.iov_len
<< " tag " << (int)tag
<< dendl
;
503 //header_len_without_tag is only meaningful in case we have tag
504 size_t header_len_without_tag
= msg
->in
.header
.iov_len
- sizeof(tag
);
507 case CEPH_MSGR_TAG_MSG
:
508 ldout(msgr
->cct
, 20) << __func__
<< " got data message" << dendl
;
509 return handle_data_msg(session
, msg
, more_in_batch
, cb_user_context
);
511 case CEPH_MSGR_TAG_KEEPALIVE
:
512 ldout(msgr
->cct
, 20) << __func__
<< " got KEEPALIVE" << dendl
;
513 set_last_keepalive(ceph_clock_now());
516 case CEPH_MSGR_TAG_KEEPALIVE2
:
517 if (header_len_without_tag
< sizeof(ceph_timespec
)) {
518 lderr(msgr
->cct
) << __func__
<< " too few data for KEEPALIVE2: got " << header_len_without_tag
<<
519 " bytes instead of " << sizeof(ceph_timespec
) << " bytes" << dendl
;
522 ceph_timespec
*t
= (ceph_timespec
*) ((char*)msg
->in
.header
.iov_base
+ sizeof(tag
));
523 utime_t kp_t
= utime_t(*t
);
524 ldout(msgr
->cct
, 20) << __func__
<< " got KEEPALIVE2 with timestamp" << kp_t
<< dendl
;
525 send_keepalive_or_ack(true, &kp_t
);
526 set_last_keepalive(ceph_clock_now());
531 case CEPH_MSGR_TAG_KEEPALIVE2_ACK
:
532 if (header_len_without_tag
< sizeof(ceph_timespec
)) {
533 lderr(msgr
->cct
) << __func__
<< " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag
<<
534 " bytes instead of " << sizeof(ceph_timespec
) << " bytes" << dendl
;
537 ceph_timespec
*t
= (ceph_timespec
*) ((char*)msg
->in
.header
.iov_base
+ sizeof(tag
));
539 ldout(msgr
->cct
, 20) << __func__
<< " got KEEPALIVE2_ACK with timestamp" << kp_t
<< dendl
;
540 set_last_keepalive_ack(kp_t
);
545 lderr(msgr
->cct
) << __func__
<< " unsupported message tag " << (int) tag
<< dendl
;
546 assert(! "unsupported message tag");
549 xio_release_msg(msg
);
554 int XioConnection::on_ow_msg_send_complete(struct xio_session
*session
,
556 void *conn_user_context
)
558 /* requester send complete (one-way) */
559 uint64_t rc
= ++scount
;
561 XioSend
* xsend
= static_cast<XioSend
*>(req
->user_context
);
562 if (unlikely(magic
& MSG_MAGIC_TRACE_CTR
)) {
563 if (unlikely((rc
% 1000000) == 0)) {
564 std::cout
<< "xio finished " << rc
<< " " << time(0) << std::endl
;
568 ldout(msgr
->cct
,11) << "on_msg_delivered xcon: " << xsend
->xcon
<<
569 " msg: " << req
<< " sn: " << req
->sn
<< dendl
;
571 XioMsg
*xmsg
= dynamic_cast<XioMsg
*>(xsend
);
573 ldout(msgr
->cct
,11) << "on_msg_delivered xcon: " <<
574 " type: " << xmsg
->m
->get_type() << " tid: " << xmsg
->m
->get_tid() <<
575 " seq: " << xmsg
->m
->get_seq() << dendl
;
578 --send_ctr
; /* atomic, because portal thread */
580 /* unblock flow-controlled connections, avoid oscillation */
581 if (unlikely(cstate
.session_state
.read() ==
582 XioConnection::FLOW_CONTROLLED
)) {
583 if ((send_ctr
<= uint32_t(xio_qdepth_low_mark())) &&
584 (1 /* XXX memory <= memory low-water mark */)) {
585 cstate
.state_up_ready(XioConnection::CState::OP_FLAG_NONE
);
586 ldout(msgr
->cct
,2) << "on_msg_delivered xcon: " << xsend
->xcon
587 << " up_ready from flow_controlled" << dendl
;
594 } /* on_msg_delivered */
596 void XioConnection::msg_send_fail(XioSend
*xsend
, int code
)
598 ldout(msgr
->cct
,2) << "xio_send_msg FAILED xcon: " << this <<
599 " msg: " << xsend
->get_xio_msg() << " code=" << code
<<
600 " (" << xio_strerror(code
) << ")" << dendl
;
601 /* return refs taken for each xio_msg */
602 xsend
->put_msg_refs();
603 } /* msg_send_fail */
605 void XioConnection::msg_release_fail(struct xio_msg
*msg
, int code
)
607 ldout(msgr
->cct
,2) << "xio_release_msg FAILED xcon: " << this <<
608 " msg: " << msg
<< "code=" << code
<<
609 " (" << xio_strerror(code
) << ")" << dendl
;
610 } /* msg_release_fail */
612 int XioConnection::flush_out_queues(uint32_t flags
) {
613 XioMessenger
* msgr
= static_cast<XioMessenger
*>(get_messenger());
614 if (! (flags
& CState::OP_FLAG_LOCKED
))
615 pthread_spin_lock(&sp
);
617 if (outgoing
.keepalive
) {
618 outgoing
.keepalive
= false;
619 send_keepalive_or_ack_internal();
623 outgoing
.ack
= false;
624 send_keepalive_or_ack_internal(true, &outgoing
.ack_time
);
627 // send deferred 1 (direct backpresssure)
628 if (outgoing
.requeue
.size() > 0)
629 portal
->requeue(this, outgoing
.requeue
);
631 // send deferred 2 (sent while deferred)
632 int ix
, q_size
= outgoing
.mqueue
.size();
633 for (ix
= 0; ix
< q_size
; ++ix
) {
634 Message::Queue::iterator q_iter
= outgoing
.mqueue
.begin();
635 Message
* m
= &(*q_iter
);
636 outgoing
.mqueue
.erase(q_iter
);
637 msgr
->_send_message_impl(m
, this);
639 if (! (flags
& CState::OP_FLAG_LOCKED
))
640 pthread_spin_unlock(&sp
);
644 int XioConnection::discard_out_queues(uint32_t flags
)
646 Message::Queue disc_q
;
647 XioSubmit::Queue deferred_q
;
649 if (! (flags
& CState::OP_FLAG_LOCKED
))
650 pthread_spin_lock(&sp
);
652 /* the two send queues contain different objects:
653 * - anything on the mqueue is a Message
654 * - anything on the requeue is an XioSend
656 Message::Queue::const_iterator i1
= disc_q
.end();
657 disc_q
.splice(i1
, outgoing
.mqueue
);
659 XioSubmit::Queue::const_iterator i2
= deferred_q
.end();
660 deferred_q
.splice(i2
, outgoing
.requeue
);
662 outgoing
.keepalive
= outgoing
.ack
= false;
664 if (! (flags
& CState::OP_FLAG_LOCKED
))
665 pthread_spin_unlock(&sp
);
668 while (!disc_q
.empty()) {
669 Message::Queue::iterator q_iter
= disc_q
.begin();
670 Message
* m
= &(*q_iter
);
671 disc_q
.erase(q_iter
);
676 while (!deferred_q
.empty()) {
677 XioSubmit::Queue::iterator q_iter
= deferred_q
.begin();
678 XioSubmit
* xs
= &(*q_iter
);
681 case XioSubmit::OUTGOING_MSG
:
682 xsend
= static_cast<XioSend
*>(xs
);
683 deferred_q
.erase(q_iter
);
684 // release once for each chained xio_msg
685 xsend
->put(xsend
->get_msg_count());
687 case XioSubmit::INCOMING_MSG_RELEASE
:
688 deferred_q
.erase(q_iter
);
689 portal
->release_xio_msg(static_cast<XioCompletion
*>(xs
));
692 ldout(msgr
->cct
,0) << __func__
<< ": Unknown Msg type " << xs
->type
<< dendl
;
700 int XioConnection::adjust_clru(uint32_t flags
)
702 if (flags
& CState::OP_FLAG_LOCKED
)
703 pthread_spin_unlock(&sp
);
705 XioMessenger
* msgr
= static_cast<XioMessenger
*>(get_messenger());
706 msgr
->conns_sp
.lock();
707 pthread_spin_lock(&sp
);
709 if (cstate
.flags
& CState::FLAG_MAPPED
) {
710 XioConnection::ConnList::iterator citer
=
711 XioConnection::ConnList::s_iterator_to(*this);
712 msgr
->conns_list
.erase(citer
);
713 msgr
->conns_list
.push_front(*this); // LRU
716 msgr
->conns_sp
.unlock();
718 if (! (flags
& CState::OP_FLAG_LOCKED
))
719 pthread_spin_unlock(&sp
);
724 int XioConnection::on_msg_error(struct xio_session
*session
,
725 enum xio_status error
,
727 void *conn_user_context
)
729 XioSend
*xsend
= static_cast<XioSend
*>(msg
->user_context
);
733 --send_ctr
; /* atomic, because portal thread */
737 void XioConnection::mark_down()
739 _mark_down(XioConnection::CState::OP_FLAG_NONE
);
742 int XioConnection::_mark_down(uint32_t flags
)
744 if (! (flags
& CState::OP_FLAG_LOCKED
))
745 pthread_spin_lock(&sp
);
747 // per interface comment, we only stage a remote reset if the
748 // current policy required it
749 if (cstate
.policy
.resetcheck
)
750 cstate
.flags
|= CState::FLAG_RESET
;
754 /* XXX this will almost certainly be called again from
755 * on_disconnect_event() */
756 discard_out_queues(flags
|CState::OP_FLAG_LOCKED
);
758 if (! (flags
& CState::OP_FLAG_LOCKED
))
759 pthread_spin_unlock(&sp
);
764 void XioConnection::mark_disposable()
766 _mark_disposable(XioConnection::CState::OP_FLAG_NONE
);
769 int XioConnection::_mark_disposable(uint32_t flags
)
771 if (! (flags
& CState::OP_FLAG_LOCKED
))
772 pthread_spin_lock(&sp
);
774 cstate
.policy
.lossy
= true;
776 if (! (flags
& CState::OP_FLAG_LOCKED
))
777 pthread_spin_unlock(&sp
);
782 int XioConnection::CState::state_up_ready(uint32_t flags
)
784 if (! (flags
& CState::OP_FLAG_LOCKED
))
785 pthread_spin_lock(&xcon
->sp
);
787 xcon
->flush_out_queues(flags
|CState::OP_FLAG_LOCKED
);
789 session_state
= session_states::UP
;
790 startup_state
= session_startup_states::READY
;
792 if (! (flags
& CState::OP_FLAG_LOCKED
))
793 pthread_spin_unlock(&xcon
->sp
);
798 int XioConnection::CState::state_discon()
800 session_state
= session_states::DISCONNECTED
;
801 startup_state
= session_startup_states::IDLE
;
806 int XioConnection::CState::state_flow_controlled(uint32_t flags
)
808 if (! (flags
& OP_FLAG_LOCKED
))
809 pthread_spin_lock(&xcon
->sp
);
811 session_state
= session_states::FLOW_CONTROLLED
;
813 if (! (flags
& OP_FLAG_LOCKED
))
814 pthread_spin_unlock(&xcon
->sp
);
819 int XioConnection::CState::state_fail(Message
* m
, uint32_t flags
)
821 if (! (flags
& OP_FLAG_LOCKED
))
822 pthread_spin_lock(&xcon
->sp
);
824 // advance to state FAIL, drop queued, msgs, adjust LRU
825 session_state
= session_states::DISCONNECTED
);
826 startup_state
= session_startup_states::FAIL
);
828 xcon
->discard_out_queues(flags
|OP_FLAG_LOCKED
);
829 xcon
->adjust_clru(flags
|OP_FLAG_LOCKED
|OP_FLAG_LRU
);
833 if (! (flags
& OP_FLAG_LOCKED
))
834 pthread_spin_unlock(&xcon
->sp
);
837 XioMessenger
* msgr
= static_cast<XioMessenger
*>(xcon
->get_messenger());
838 msgr
->ms_deliver_handle_reset(xcon
);
845 int XioLoopbackConnection::send_message(Message
*m
)
847 XioMessenger
*ms
= static_cast<XioMessenger
*>(get_messenger());
848 m
->set_connection(this);
849 m
->set_seq(next_seq());
850 m
->set_src(ms
->get_myinst().name
);
855 void XioLoopbackConnection::send_keepalive()
857 utime_t t
= ceph_clock_now();
858 set_last_keepalive(t
);
859 set_last_keepalive_ack(t
);