1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV1.h"
6 #include "common/errno.h"
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
13 #define dout_subsys ceph_subsys_ms
15 #define dout_prefix _conn_prefix(_dout)
16 ostream
&ProtocolV1::_conn_prefix(std::ostream
*_dout
) {
17 return *_dout
<< "--1- " << messenger
->get_myaddrs() << " >> "
18 << *connection
->peer_addrs
20 << connection
<< " " << this
21 << " :" << connection
->port
<< " s=" << get_state_name(state
)
22 << " pgs=" << peer_global_seq
<< " cs=" << connect_seq
23 << " l=" << connection
->policy
.lossy
<< ").";
26 #define WRITE(B, C) write(CONTINUATION(C), B)
28 #define READ(L, C) read(CONTINUATION(C), L)
30 #define READB(L, B, C) read(CONTINUATION(C), L, B)
32 // Constant to limit starting sequence number to 2^31. Nothing special about
33 // it, just a big number. PLR
34 #define SEQ_MASK 0x7fffffff
36 const int ASYNC_COALESCE_THRESHOLD
= 256;
40 static void alloc_aligned_buffer(bufferlist
&data
, unsigned len
, unsigned off
) {
41 // create a buffer to read into that matches the data alignment
42 unsigned alloc_len
= 0;
45 if (off
& ~CEPH_PAGE_MASK
) {
47 alloc_len
+= CEPH_PAGE_SIZE
;
48 head
= std::min
<uint64_t>(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
52 bufferptr
ptr(buffer::create_small_page_aligned(alloc_len
));
53 if (head
) ptr
.set_offset(CEPH_PAGE_SIZE
- head
);
54 data
.push_back(std::move(ptr
));
61 ProtocolV1::ProtocolV1(AsyncConnection
*connection
)
62 : Protocol(1, connection
),
64 can_write(WriteStatus::NOWRITE
),
71 is_reset_from_peer(false),
77 temp_buffer
= new char[4096];
80 ProtocolV1::~ProtocolV1() {
81 ceph_assert(out_q
.empty());
82 ceph_assert(sent
.empty());
91 void ProtocolV1::connect() {
92 this->state
= START_CONNECT
;
94 // reset connect state variables
99 authorizer_buf
.clear();
100 memset(&connect_msg
, 0, sizeof(connect_msg
));
101 memset(&connect_reply
, 0, sizeof(connect_reply
));
103 global_seq
= messenger
->get_global_seq();
106 void ProtocolV1::accept() { this->state
= START_ACCEPT
; }
108 bool ProtocolV1::is_connected() {
109 return can_write
.load() == WriteStatus::CANWRITE
;
112 void ProtocolV1::stop() {
113 ldout(cct
, 20) << __func__
<< dendl
;
114 if (state
== CLOSED
) {
118 if (connection
->delay_state
) connection
->delay_state
->flush();
120 ldout(cct
, 2) << __func__
<< dendl
;
121 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
128 can_write
= WriteStatus::CLOSED
;
132 void ProtocolV1::fault() {
133 ldout(cct
, 20) << __func__
<< dendl
;
135 if (state
== CLOSED
|| state
== NONE
) {
136 ldout(cct
, 10) << __func__
<< " connection is already closed" << dendl
;
140 if (connection
->policy
.lossy
&& state
!= START_CONNECT
&&
141 state
!= CONNECTING
) {
142 ldout(cct
, 1) << __func__
<< " on lossy channel, failing" << dendl
;
144 connection
->dispatch_queue
->queue_reset(connection
);
148 connection
->write_lock
.lock();
149 can_write
= WriteStatus::NOWRITE
;
150 is_reset_from_peer
= false;
152 // requeue sent items
155 if (!once_ready
&& out_q
.empty() && state
>= START_ACCEPT
&&
156 state
<= ACCEPTING_WAIT_CONNECT_MSG_AUTH
&& !replacing
) {
157 ldout(cct
, 10) << __func__
<< " with nothing to send and in the half "
158 << " accept state just closed" << dendl
;
159 connection
->write_lock
.unlock();
161 connection
->dispatch_queue
->queue_reset(connection
);
170 if (connection
->policy
.standby
&& out_q
.empty() && !keepalive
&&
172 ldout(cct
, 10) << __func__
<< " with nothing to send, going to standby"
175 connection
->write_lock
.unlock();
179 connection
->write_lock
.unlock();
181 if ((state
>= START_CONNECT
&& state
<= CONNECTING_SEND_CONNECT_MSG
) ||
185 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
186 } else if (backoff
== utime_t()) {
187 backoff
.set_from_double(cct
->_conf
->ms_initial_backoff
);
190 if (backoff
> cct
->_conf
->ms_max_backoff
)
191 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
194 global_seq
= messenger
->get_global_seq();
195 state
= START_CONNECT
;
196 connection
->state
= AsyncConnection::STATE_CONNECTING
;
197 ldout(cct
, 10) << __func__
<< " waiting " << backoff
<< dendl
;
199 connection
->register_time_events
.insert(
200 connection
->center
->create_time_event(backoff
.to_nsec() / 1000,
201 connection
->wakeup_handler
));
203 // policy maybe empty when state is in accept
204 if (connection
->policy
.server
) {
205 ldout(cct
, 0) << __func__
<< " server, going to standby" << dendl
;
208 ldout(cct
, 0) << __func__
<< " initiating reconnect" << dendl
;
210 global_seq
= messenger
->get_global_seq();
211 state
= START_CONNECT
;
212 connection
->state
= AsyncConnection::STATE_CONNECTING
;
215 connection
->center
->dispatch_event_external(connection
->read_handler
);
219 void ProtocolV1::send_message(Message
*m
) {
221 uint64_t f
= connection
->get_features();
223 // TODO: Currently not all messages supports reencode like MOSDMap, so here
224 // only let fast dispatch support messages prepare message
225 bool can_fast_prepare
= messenger
->ms_can_fast_dispatch(m
);
226 if (can_fast_prepare
) {
227 prepare_send_message(f
, m
, bl
);
230 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
231 // "features" changes will change the payload encoding
232 if (can_fast_prepare
&&
233 (can_write
== WriteStatus::NOWRITE
|| connection
->get_features() != f
)) {
234 // ensure the correctness of message encoding
237 ldout(cct
, 5) << __func__
<< " clear encoded buffer previous " << f
238 << " != " << connection
->get_features() << dendl
;
240 if (can_write
== WriteStatus::CLOSED
) {
241 ldout(cct
, 10) << __func__
<< " connection closed."
242 << " Drop message " << m
<< dendl
;
245 m
->trace
.event("async enqueueing message");
246 out_q
[m
->get_priority()].emplace_back(std::move(bl
), m
);
247 ldout(cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
249 if (can_write
!= WriteStatus::REPLACING
) {
250 connection
->center
->dispatch_event_external(connection
->write_handler
);
255 void ProtocolV1::prepare_send_message(uint64_t features
, Message
*m
,
257 ldout(cct
, 20) << __func__
<< " m " << *m
<< dendl
;
259 // associate message with Connection (for benefit of encode_payload)
260 if (m
->empty_payload()) {
261 ldout(cct
, 20) << __func__
<< " encoding features " << features
<< " " << m
262 << " " << *m
<< dendl
;
264 ldout(cct
, 20) << __func__
<< " half-reencoding features " << features
265 << " " << m
<< " " << *m
<< dendl
;
268 // encode and copy out of *m
269 m
->encode(features
, messenger
->crcflags
);
271 bl
.append(m
->get_payload());
272 bl
.append(m
->get_middle());
273 bl
.append(m
->get_data());
276 void ProtocolV1::send_keepalive() {
277 ldout(cct
, 10) << __func__
<< dendl
;
278 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
279 if (can_write
!= WriteStatus::CLOSED
) {
281 connection
->center
->dispatch_event_external(connection
->write_handler
);
285 void ProtocolV1::read_event() {
286 ldout(cct
, 20) << __func__
<< dendl
;
289 CONTINUATION_RUN(CONTINUATION(send_client_banner
));
292 CONTINUATION_RUN(CONTINUATION(send_server_banner
));
295 CONTINUATION_RUN(CONTINUATION(wait_message
));
297 case THROTTLE_MESSAGE
:
298 CONTINUATION_RUN(CONTINUATION(throttle_message
));
301 CONTINUATION_RUN(CONTINUATION(throttle_bytes
));
303 case THROTTLE_DISPATCH_QUEUE
:
304 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue
));
311 void ProtocolV1::write_event() {
312 ldout(cct
, 10) << __func__
<< dendl
;
315 connection
->write_lock
.lock();
316 if (can_write
== WriteStatus::CANWRITE
) {
318 append_keepalive_or_ack();
322 auto start
= ceph::mono_clock::now();
326 Message
*m
= _get_next_outgoing(&data
);
331 if (!connection
->policy
.lossy
) {
336 more
= !out_q
.empty();
337 connection
->write_lock
.unlock();
339 // send_message or requeue messages may not encode message
340 if (!data
.length()) {
341 prepare_send_message(connection
->get_features(), m
, data
);
344 r
= write_message(m
, data
, more
);
346 connection
->write_lock
.lock();
350 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
354 } while (can_write
== WriteStatus::CANWRITE
);
355 connection
->write_lock
.unlock();
357 // if r > 0 mean data still lefted, so no need _try_send.
359 uint64_t left
= ack_left
;
363 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_ACK
);
364 connection
->outcoming_bl
.append((char *)&s
, sizeof(s
));
365 ldout(cct
, 10) << __func__
<< " try send msg ack, acked " << left
366 << " messages" << dendl
;
369 r
= connection
->_try_send(left
);
370 } else if (is_queued()) {
371 r
= connection
->_try_send();
375 connection
->logger
->tinc(l_msgr_running_send_time
,
376 ceph::mono_clock::now() - start
);
378 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
379 connection
->lock
.lock();
381 connection
->lock
.unlock();
385 connection
->write_lock
.unlock();
386 connection
->lock
.lock();
387 connection
->write_lock
.lock();
388 if (state
== STANDBY
&& !connection
->policy
.server
&& is_queued()) {
389 ldout(cct
, 10) << __func__
<< " policy.server is false" << dendl
;
390 connection
->_connect();
391 } else if (connection
->cs
&& state
!= NONE
&& state
!= CLOSED
&&
392 state
!= START_CONNECT
) {
393 r
= connection
->_try_send();
395 ldout(cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
396 connection
->write_lock
.unlock();
398 connection
->lock
.unlock();
402 connection
->write_lock
.unlock();
403 connection
->lock
.unlock();
407 bool ProtocolV1::is_queued() {
408 return !out_q
.empty() || connection
->is_queued();
411 void ProtocolV1::run_continuation(CtPtr pcontinuation
) {
413 CONTINUATION_RUN(*pcontinuation
);
417 CtPtr
ProtocolV1::read(CONTINUATION_RX_TYPE
<ProtocolV1
> &next
,
418 int len
, char *buffer
) {
420 buffer
= temp_buffer
;
422 ssize_t r
= connection
->read(len
, buffer
,
423 [&next
, this](char *buffer
, int r
) {
424 next
.setParams(buffer
, r
);
425 CONTINUATION_RUN(next
);
428 next
.setParams(buffer
, r
);
435 CtPtr
ProtocolV1::write(CONTINUATION_TX_TYPE
<ProtocolV1
> &next
,
436 bufferlist
&buffer
) {
437 ssize_t r
= connection
->write(buffer
, [&next
, this](int r
) {
439 CONTINUATION_RUN(next
);
449 CtPtr
ProtocolV1::ready() {
450 ldout(cct
, 25) << __func__
<< dendl
;
452 // make sure no pending tick timer
453 if (connection
->last_tick_id
) {
454 connection
->center
->delete_time_event(connection
->last_tick_id
);
456 connection
->last_tick_id
= connection
->center
->create_time_event(
457 connection
->inactive_timeout_us
, connection
->tick_handler
);
459 connection
->write_lock
.lock();
460 can_write
= WriteStatus::CANWRITE
;
462 connection
->center
->dispatch_event_external(connection
->write_handler
);
464 connection
->write_lock
.unlock();
465 connection
->maybe_start_delay_thread();
468 return wait_message();
471 CtPtr
ProtocolV1::wait_message() {
472 if (state
!= OPENED
) { // must have changed due to a replace
476 ldout(cct
, 20) << __func__
<< dendl
;
478 return READ(sizeof(char), handle_message
);
481 CtPtr
ProtocolV1::handle_message(char *buffer
, int r
) {
482 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
485 ldout(cct
, 1) << __func__
<< " read tag failed" << dendl
;
489 char tag
= buffer
[0];
490 ldout(cct
, 20) << __func__
<< " process tag " << (int)tag
<< dendl
;
492 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
493 ldout(cct
, 20) << __func__
<< " got KEEPALIVE" << dendl
;
494 connection
->set_last_keepalive(ceph_clock_now());
495 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
496 return READ(sizeof(ceph_timespec
), handle_keepalive2
);
497 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
498 return READ(sizeof(ceph_timespec
), handle_keepalive2_ack
);
499 } else if (tag
== CEPH_MSGR_TAG_ACK
) {
500 return READ(sizeof(ceph_le64
), handle_tag_ack
);
501 } else if (tag
== CEPH_MSGR_TAG_MSG
) {
502 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
503 ltt_recv_stamp
= ceph_clock_now();
505 recv_stamp
= ceph_clock_now();
506 ldout(cct
, 20) << __func__
<< " begin MSG" << dendl
;
507 return READ(sizeof(ceph_msg_header
), handle_message_header
);
508 } else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
509 ldout(cct
, 20) << __func__
<< " got CLOSE" << dendl
;
512 ldout(cct
, 0) << __func__
<< " bad tag " << (int)tag
<< dendl
;
518 CtPtr
ProtocolV1::handle_keepalive2(char *buffer
, int r
) {
519 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
522 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
526 ldout(cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
529 t
= (ceph_timespec
*)buffer
;
530 utime_t kp_t
= utime_t(*t
);
531 connection
->write_lock
.lock();
532 append_keepalive_or_ack(true, &kp_t
);
533 connection
->write_lock
.unlock();
535 ldout(cct
, 20) << __func__
<< " got KEEPALIVE2 " << kp_t
<< dendl
;
536 connection
->set_last_keepalive(ceph_clock_now());
538 if (is_connected()) {
539 connection
->center
->dispatch_event_external(connection
->write_handler
);
542 return CONTINUE(wait_message
);
545 void ProtocolV1::append_keepalive_or_ack(bool ack
, utime_t
*tp
) {
546 ldout(cct
, 10) << __func__
<< dendl
;
549 struct ceph_timespec ts
;
550 tp
->encode_timeval(&ts
);
551 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK
);
552 connection
->outcoming_bl
.append((char *)&ts
, sizeof(ts
));
553 } else if (connection
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
554 struct ceph_timespec ts
;
555 utime_t t
= ceph_clock_now();
556 t
.encode_timeval(&ts
);
557 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2
);
558 connection
->outcoming_bl
.append((char *)&ts
, sizeof(ts
));
560 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE
);
564 CtPtr
ProtocolV1::handle_keepalive2_ack(char *buffer
, int r
) {
565 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
568 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
573 t
= (ceph_timespec
*)buffer
;
574 connection
->set_last_keepalive_ack(utime_t(*t
));
575 ldout(cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
577 return CONTINUE(wait_message
);
580 CtPtr
ProtocolV1::handle_tag_ack(char *buffer
, int r
) {
581 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
584 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
589 seq
= *(ceph_le64
*)buffer
;
590 ldout(cct
, 20) << __func__
<< " got ACK" << dendl
;
592 ldout(cct
, 15) << __func__
<< " got ack seq " << seq
<< dendl
;
594 static const int max_pending
= 128;
596 Message
*pending
[max_pending
];
597 connection
->write_lock
.lock();
598 while (!sent
.empty() && sent
.front()->get_seq() <= seq
&& i
< max_pending
) {
599 Message
*m
= sent
.front();
602 ldout(cct
, 10) << __func__
<< " got ack seq " << seq
603 << " >= " << m
->get_seq() << " on " << m
<< " " << *m
606 connection
->write_lock
.unlock();
607 for (int k
= 0; k
< i
; k
++) {
611 return CONTINUE(wait_message
);
614 CtPtr
ProtocolV1::handle_message_header(char *buffer
, int r
) {
615 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
618 ldout(cct
, 1) << __func__
<< " read message header failed" << dendl
;
622 ldout(cct
, 20) << __func__
<< " got MSG header" << dendl
;
624 current_header
= *((ceph_msg_header
*)buffer
);
626 ldout(cct
, 20) << __func__
<< " got envelope type=" << current_header
.type
<< " src "
627 << entity_name_t(current_header
.src
) << " front=" << current_header
.front_len
628 << " data=" << current_header
.data_len
<< " off " << current_header
.data_off
631 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
632 __u32 header_crc
= 0;
633 header_crc
= ceph_crc32c(0, (unsigned char *)¤t_header
,
634 sizeof(current_header
) - sizeof(current_header
.crc
));
636 if (header_crc
!= current_header
.crc
) {
637 ldout(cct
, 0) << __func__
<< " got bad header crc " << header_crc
638 << " != " << current_header
.crc
<< dendl
;
649 state
= THROTTLE_MESSAGE
;
650 return CONTINUE(throttle_message
);
653 CtPtr
ProtocolV1::throttle_message() {
654 ldout(cct
, 20) << __func__
<< dendl
;
656 if (connection
->policy
.throttler_messages
) {
657 ldout(cct
, 10) << __func__
<< " wants " << 1
658 << " message from policy throttler "
659 << connection
->policy
.throttler_messages
->get_current()
660 << "/" << connection
->policy
.throttler_messages
->get_max()
662 if (!connection
->policy
.throttler_messages
->get_or_fail()) {
663 ldout(cct
, 10) << __func__
<< " wants 1 message from policy throttle "
664 << connection
->policy
.throttler_messages
->get_current()
665 << "/" << connection
->policy
.throttler_messages
->get_max()
666 << " failed, just wait." << dendl
;
667 // following thread pool deal with th full message queue isn't a
668 // short time, so we can wait a ms.
669 if (connection
->register_time_events
.empty()) {
670 connection
->register_time_events
.insert(
671 connection
->center
->create_time_event(1000,
672 connection
->wakeup_handler
));
678 state
= THROTTLE_BYTES
;
679 return CONTINUE(throttle_bytes
);
682 CtPtr
ProtocolV1::throttle_bytes() {
683 ldout(cct
, 20) << __func__
<< dendl
;
685 cur_msg_size
= current_header
.front_len
+ current_header
.middle_len
+
686 current_header
.data_len
;
688 if (connection
->policy
.throttler_bytes
) {
689 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
690 << " bytes from policy throttler "
691 << connection
->policy
.throttler_bytes
->get_current() << "/"
692 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
693 if (!connection
->policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
694 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
695 << " bytes from policy throttler "
696 << connection
->policy
.throttler_bytes
->get_current()
697 << "/" << connection
->policy
.throttler_bytes
->get_max()
698 << " failed, just wait." << dendl
;
699 // following thread pool deal with th full message queue isn't a
700 // short time, so we can wait a ms.
701 if (connection
->register_time_events
.empty()) {
702 connection
->register_time_events
.insert(
703 connection
->center
->create_time_event(
704 1000, connection
->wakeup_handler
));
711 state
= THROTTLE_DISPATCH_QUEUE
;
712 return CONTINUE(throttle_dispatch_queue
);
715 CtPtr
ProtocolV1::throttle_dispatch_queue() {
716 ldout(cct
, 20) << __func__
<< dendl
;
719 if (!connection
->dispatch_queue
->dispatch_throttler
.get_or_fail(
722 << __func__
<< " wants " << cur_msg_size
723 << " bytes from dispatch throttle "
724 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
725 << connection
->dispatch_queue
->dispatch_throttler
.get_max()
726 << " failed, just wait." << dendl
;
727 // following thread pool deal with th full message queue isn't a
728 // short time, so we can wait a ms.
729 if (connection
->register_time_events
.empty()) {
730 connection
->register_time_events
.insert(
731 connection
->center
->create_time_event(1000,
732 connection
->wakeup_handler
));
738 throttle_stamp
= ceph_clock_now();
740 state
= READ_MESSAGE_FRONT
;
741 return read_message_front();
744 CtPtr
ProtocolV1::read_message_front() {
745 ldout(cct
, 20) << __func__
<< dendl
;
747 unsigned front_len
= current_header
.front_len
;
749 if (!front
.length()) {
750 front
.push_back(buffer::create(front_len
));
752 return READB(front_len
, front
.c_str(), handle_message_front
);
754 return read_message_middle();
757 CtPtr
ProtocolV1::handle_message_front(char *buffer
, int r
) {
758 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
761 ldout(cct
, 1) << __func__
<< " read message front failed" << dendl
;
765 ldout(cct
, 20) << __func__
<< " got front " << front
.length() << dendl
;
767 return read_message_middle();
770 CtPtr
ProtocolV1::read_message_middle() {
771 ldout(cct
, 20) << __func__
<< dendl
;
773 if (current_header
.middle_len
) {
774 if (!middle
.length()) {
775 middle
.push_back(buffer::create(current_header
.middle_len
));
777 return READB(current_header
.middle_len
, middle
.c_str(),
778 handle_message_middle
);
781 return read_message_data_prepare();
784 CtPtr
ProtocolV1::handle_message_middle(char *buffer
, int r
) {
785 ldout(cct
, 20) << __func__
<< " r" << r
<< dendl
;
788 ldout(cct
, 1) << __func__
<< " read message middle failed" << dendl
;
792 ldout(cct
, 20) << __func__
<< " got middle " << middle
.length() << dendl
;
794 return read_message_data_prepare();
797 CtPtr
ProtocolV1::read_message_data_prepare() {
798 ldout(cct
, 20) << __func__
<< dendl
;
800 unsigned data_len
= le32_to_cpu(current_header
.data_len
);
801 unsigned data_off
= le32_to_cpu(current_header
.data_off
);
806 // rx_buffers is broken by design... see
807 // http://tracker.ceph.com/issues/22480
808 map
<ceph_tid_t
, pair
<bufferlist
, int> >::iterator p
=
809 connection
->rx_buffers
.find(current_header
.tid
);
810 if (p
!= connection
->rx_buffers
.end()) {
811 ldout(cct
, 10) << __func__
<< " seleting rx buffer v " << p
->second
.second
812 << " at offset " << data_off
<< " len "
813 << p
->second
.first
.length() << dendl
;
814 data_buf
= p
->second
.first
;
815 // make sure it's big enough
816 if (data_buf
.length() < data_len
)
817 data_buf
.push_back(buffer::create(data_len
- data_buf
.length()));
818 data_blp
= data_buf
.begin();
820 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
821 << data_off
<< dendl
;
822 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
823 data_blp
= data_buf
.begin();
826 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
827 << data_off
<< dendl
;
828 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
829 data_blp
= data_buf
.begin();
835 return CONTINUE(read_message_data
);
838 CtPtr
ProtocolV1::read_message_data() {
839 ldout(cct
, 20) << __func__
<< " msg_left=" << msg_left
<< dendl
;
842 bufferptr bp
= data_blp
.get_current_ptr();
843 unsigned read_len
= std::min(bp
.length(), msg_left
);
845 return READB(read_len
, bp
.c_str(), handle_message_data
);
848 return read_message_footer();
851 CtPtr
ProtocolV1::handle_message_data(char *buffer
, int r
) {
852 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
855 ldout(cct
, 1) << __func__
<< " read data error " << dendl
;
859 bufferptr bp
= data_blp
.get_current_ptr();
860 unsigned read_len
= std::min(bp
.length(), msg_left
);
861 ceph_assert(read_len
< std::numeric_limits
<int>::max());
862 data_blp
.advance(read_len
);
863 data
.append(bp
, 0, read_len
);
864 msg_left
-= read_len
;
866 return CONTINUE(read_message_data
);
869 CtPtr
ProtocolV1::read_message_footer() {
870 ldout(cct
, 20) << __func__
<< dendl
;
872 state
= READ_FOOTER_AND_DISPATCH
;
875 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
876 len
= sizeof(ceph_msg_footer
);
878 len
= sizeof(ceph_msg_footer_old
);
881 return READ(len
, handle_message_footer
);
884 CtPtr
ProtocolV1::handle_message_footer(char *buffer
, int r
) {
885 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
888 ldout(cct
, 1) << __func__
<< " read footer data error " << dendl
;
892 ceph_msg_footer footer
;
893 ceph_msg_footer_old old_footer
;
895 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
896 footer
= *((ceph_msg_footer
*)buffer
);
898 old_footer
= *((ceph_msg_footer_old
*)buffer
);
899 footer
.front_crc
= old_footer
.front_crc
;
900 footer
.middle_crc
= old_footer
.middle_crc
;
901 footer
.data_crc
= old_footer
.data_crc
;
903 footer
.flags
= old_footer
.flags
;
906 int aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
907 ldout(cct
, 10) << __func__
<< " aborted = " << aborted
<< dendl
;
909 ldout(cct
, 0) << __func__
<< " got " << front
.length() << " + "
910 << middle
.length() << " + " << data
.length()
911 << " byte message.. ABORTED" << dendl
;
915 ldout(cct
, 20) << __func__
<< " got " << front
.length() << " + "
916 << middle
.length() << " + " << data
.length() << " byte message"
918 Message
*message
= decode_message(cct
, messenger
->crcflags
, current_header
,
919 footer
, front
, middle
, data
, connection
);
921 ldout(cct
, 1) << __func__
<< " decode message failed " << dendl
;
926 // Check the signature if one should be present. A zero return indicates
930 if (session_security
.get() == NULL
) {
931 ldout(cct
, 10) << __func__
<< " no session security set" << dendl
;
933 if (session_security
->check_message_signature(message
)) {
934 ldout(cct
, 0) << __func__
<< " Signature check failed" << dendl
;
939 message
->set_byte_throttler(connection
->policy
.throttler_bytes
);
940 message
->set_message_throttler(connection
->policy
.throttler_messages
);
942 // store reservation size in message, so we don't get confused
943 // by messages entering the dispatch queue through other paths.
944 message
->set_dispatch_throttle_size(cur_msg_size
);
946 message
->set_recv_stamp(recv_stamp
);
947 message
->set_throttle_stamp(throttle_stamp
);
948 message
->set_recv_complete_stamp(ceph_clock_now());
950 // check received seq#. if it is old, drop the message.
951 // note that incoming messages may skip ahead. this is convenient for the
952 // client side queueing because messages can't be renumbered, but the (kernel)
953 // client will occasionally pull a message out of the sent queue to send
954 // elsewhere. in that case it doesn't matter if we "got" it or not.
955 uint64_t cur_seq
= in_seq
;
956 if (message
->get_seq() <= cur_seq
) {
957 ldout(cct
, 0) << __func__
<< " got old message " << message
->get_seq()
958 << " <= " << cur_seq
<< " " << message
<< " " << *message
959 << ", discarding" << dendl
;
961 if (connection
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
962 cct
->_conf
->ms_die_on_old_message
) {
963 ceph_assert(0 == "old msgs despite reconnect_seq feature");
967 if (message
->get_seq() > cur_seq
+ 1) {
968 ldout(cct
, 0) << __func__
<< " missed message? skipped from seq "
969 << cur_seq
<< " to " << message
->get_seq() << dendl
;
970 if (cct
->_conf
->ms_die_on_skipped_message
) {
971 ceph_assert(0 == "skipped incoming seq");
975 message
->set_connection(connection
);
977 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
978 if (message
->get_type() == CEPH_MSG_OSD_OP
||
979 message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
980 utime_t ltt_processed_stamp
= ceph_clock_now();
981 double usecs_elapsed
=
982 (ltt_processed_stamp
.to_nsec() - ltt_recv_stamp
.to_nsec()) / 1000;
984 if (message
->get_type() == CEPH_MSG_OSD_OP
)
985 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP",
988 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY",
993 // note last received message.
994 in_seq
= message
->get_seq();
995 ldout(cct
, 5) << " rx " << message
->get_source() << " seq "
996 << message
->get_seq() << " " << message
<< " " << *message
999 bool need_dispatch_writer
= false;
1000 if (!connection
->policy
.lossy
) {
1002 need_dispatch_writer
= true;
1007 connection
->logger
->inc(l_msgr_recv_messages
);
1008 connection
->logger
->inc(
1010 cur_msg_size
+ sizeof(ceph_msg_header
) + sizeof(ceph_msg_footer
));
1012 messenger
->ms_fast_preprocess(message
);
1013 auto fast_dispatch_time
= ceph::mono_clock::now();
1014 connection
->logger
->tinc(l_msgr_running_recv_time
,
1015 fast_dispatch_time
- connection
->recv_start_time
);
1016 if (connection
->delay_state
) {
1017 double delay_period
= 0;
1018 if (rand() % 10000 < cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1020 cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1021 ldout(cct
, 1) << "queue_received will delay after "
1022 << (ceph_clock_now() + delay_period
) << " on " << message
1023 << " " << *message
<< dendl
;
1025 connection
->delay_state
->queue(delay_period
, message
);
1026 } else if (messenger
->ms_can_fast_dispatch(message
)) {
1027 connection
->lock
.unlock();
1028 connection
->dispatch_queue
->fast_dispatch(message
);
1029 connection
->recv_start_time
= ceph::mono_clock::now();
1030 connection
->logger
->tinc(l_msgr_running_fast_dispatch_time
,
1031 connection
->recv_start_time
- fast_dispatch_time
);
1032 connection
->lock
.lock();
1034 connection
->dispatch_queue
->enqueue(message
, message
->get_priority(),
1035 connection
->conn_id
);
1038 // clean up local buffer references
1044 if (need_dispatch_writer
&& connection
->is_connected()) {
1045 connection
->center
->dispatch_event_external(connection
->write_handler
);
1048 return CONTINUE(wait_message
);
1051 void ProtocolV1::session_reset() {
1052 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1054 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1055 if (connection
->delay_state
) {
1056 connection
->delay_state
->discard();
1059 connection
->dispatch_queue
->discard_queue(connection
->conn_id
);
1060 discard_out_queue();
1061 // note: we need to clear outcoming_bl here, but session_reset may be
1062 // called by other thread, so let caller clear this itself!
1063 // outcoming_bl.clear();
1065 connection
->dispatch_queue
->queue_remote_reset(connection
);
1067 randomize_out_seq();
1071 // it's safe to directly set 0, double locked
1074 can_write
= WriteStatus::NOWRITE
;
1077 void ProtocolV1::randomize_out_seq() {
1078 if (connection
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1079 // Set out_seq to a random value, so CRC won't be predictable.
1080 auto rand_seq
= ceph::util::generate_random_number
<uint64_t>(0, SEQ_MASK
);
1081 ldout(cct
, 10) << __func__
<< " randomize_out_seq " << rand_seq
<< dendl
;
1084 // previously, seq #'s always started at 0.
1089 ssize_t
ProtocolV1::write_message(Message
*m
, bufferlist
&bl
, bool more
) {
1091 ceph_assert(connection
->center
->in_thread());
1092 m
->set_seq(++out_seq
);
1094 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1095 m
->calc_header_crc();
1098 ceph_msg_header
&header
= m
->get_header();
1099 ceph_msg_footer
&footer
= m
->get_footer();
1101 // TODO: let sign_message could be reentry?
1102 // Now that we have all the crcs calculated, handle the
1103 // digital signature for the message, if the AsyncConnection has session
1104 // security set up. Some session security options do not
1105 // actually calculate and check the signature, but they should
1106 // handle the calls to sign_message and check_signature. PLR
1107 if (session_security
.get() == NULL
) {
1108 ldout(cct
, 20) << __func__
<< " no session security" << dendl
;
1110 if (session_security
->sign_message(m
)) {
1111 ldout(cct
, 20) << __func__
<< " failed to sign m=" << m
1112 << "): sig = " << footer
.sig
<< dendl
;
1114 ldout(cct
, 20) << __func__
<< " signed m=" << m
1115 << "): sig = " << footer
.sig
<< dendl
;
1119 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_MSG
);
1120 connection
->outcoming_bl
.append((char *)&header
, sizeof(header
));
1122 ldout(cct
, 20) << __func__
<< " sending message type=" << header
.type
1123 << " src " << entity_name_t(header
.src
)
1124 << " front=" << header
.front_len
<< " data=" << header
.data_len
1125 << " off " << header
.data_off
<< dendl
;
1127 if ((bl
.length() <= ASYNC_COALESCE_THRESHOLD
) && (bl
.buffers().size() > 1)) {
1128 for (const auto &pb
: bl
.buffers()) {
1129 connection
->outcoming_bl
.append((char *)pb
.c_str(), pb
.length());
1132 connection
->outcoming_bl
.claim_append(bl
);
1135 // send footer; if receiver doesn't support signatures, use the old footer
1137 ceph_msg_footer_old old_footer
;
1138 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
1139 connection
->outcoming_bl
.append((char *)&footer
, sizeof(footer
));
1141 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1142 old_footer
.front_crc
= footer
.front_crc
;
1143 old_footer
.middle_crc
= footer
.middle_crc
;
1144 old_footer
.data_crc
= footer
.data_crc
;
1146 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
1148 old_footer
.data_crc
=
1149 messenger
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
1150 old_footer
.flags
= footer
.flags
;
1151 connection
->outcoming_bl
.append((char *)&old_footer
, sizeof(old_footer
));
1154 m
->trace
.event("async writing message");
1155 ldout(cct
, 20) << __func__
<< " sending " << m
->get_seq() << " " << m
1157 ssize_t total_send_size
= connection
->outcoming_bl
.length();
1158 ssize_t rc
= connection
->_try_send(more
);
1160 ldout(cct
, 1) << __func__
<< " error sending " << m
<< ", "
1161 << cpp_strerror(rc
) << dendl
;
1163 connection
->logger
->inc(
1164 l_msgr_send_bytes
, total_send_size
- connection
->outcoming_bl
.length());
1165 ldout(cct
, 10) << __func__
<< " sending " << m
1166 << (rc
? " continuely." : " done.") << dendl
;
1168 if (m
->get_type() == CEPH_MSG_OSD_OP
)
1169 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
1170 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
1171 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
1177 void ProtocolV1::requeue_sent() {
1182 list
<pair
<bufferlist
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1183 out_seq
-= sent
.size();
1184 while (!sent
.empty()) {
1185 Message
*m
= sent
.back();
1187 ldout(cct
, 10) << __func__
<< " " << *m
<< " for resend "
1188 << " (" << m
->get_seq() << ")" << dendl
;
1189 rq
.push_front(make_pair(bufferlist(), m
));
1193 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
) {
1194 ldout(cct
, 10) << __func__
<< " " << seq
<< dendl
;
1195 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1196 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
1199 list
<pair
<bufferlist
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1200 uint64_t count
= out_seq
;
1201 while (!rq
.empty()) {
1202 pair
<bufferlist
, Message
*> p
= rq
.front();
1203 if (p
.second
->get_seq() == 0 || p
.second
->get_seq() > seq
) break;
1204 ldout(cct
, 10) << __func__
<< " " << *(p
.second
) << " for resend seq "
1205 << p
.second
->get_seq() << " <= " << seq
<< ", discarding"
1211 if (rq
.empty()) out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1216 * Tears down the message queues, and removes them from the
1217 * DispatchQueue Must hold write_lock prior to calling.
1219 void ProtocolV1::discard_out_queue() {
1220 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1222 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1223 ldout(cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
1227 for (map
<int, list
<pair
<bufferlist
, Message
*> > >::iterator p
=
1229 p
!= out_q
.end(); ++p
) {
1230 for (list
<pair
<bufferlist
, Message
*> >::iterator r
= p
->second
.begin();
1231 r
!= p
->second
.end(); ++r
) {
1232 ldout(cct
, 20) << __func__
<< " discard " << r
->second
<< dendl
;
1239 void ProtocolV1::reset_recv_state() {
1240 // clean up state internal variables and states
1241 if (state
== CONNECTING_SEND_CONNECT_MSG
) {
1245 authorizer
= nullptr;
1248 // clean read and write callbacks
1249 connection
->pendingReadLen
.reset();
1250 connection
->writeCallback
.reset();
1252 if (state
> THROTTLE_MESSAGE
&& state
<= READ_FOOTER_AND_DISPATCH
&&
1253 connection
->policy
.throttler_messages
) {
1254 ldout(cct
, 10) << __func__
<< " releasing " << 1
1255 << " message to policy throttler "
1256 << connection
->policy
.throttler_messages
->get_current()
1257 << "/" << connection
->policy
.throttler_messages
->get_max()
1259 connection
->policy
.throttler_messages
->put();
1261 if (state
> THROTTLE_BYTES
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1262 if (connection
->policy
.throttler_bytes
) {
1263 ldout(cct
, 10) << __func__
<< " releasing " << cur_msg_size
1264 << " bytes to policy throttler "
1265 << connection
->policy
.throttler_bytes
->get_current() << "/"
1266 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
1267 connection
->policy
.throttler_bytes
->put(cur_msg_size
);
1270 if (state
> THROTTLE_DISPATCH_QUEUE
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1272 << __func__
<< " releasing " << cur_msg_size
1273 << " bytes to dispatch_queue throttler "
1274 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
1275 << connection
->dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
1276 connection
->dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
1280 Message
*ProtocolV1::_get_next_outgoing(bufferlist
*bl
) {
1282 if (!out_q
.empty()) {
1283 map
<int, list
<pair
<bufferlist
, Message
*> > >::reverse_iterator it
=
1285 ceph_assert(!it
->second
.empty());
1286 list
<pair
<bufferlist
, Message
*> >::iterator p
= it
->second
.begin();
1288 if (bl
) bl
->swap(p
->first
);
1289 it
->second
.erase(p
);
1290 if (it
->second
.empty()) out_q
.erase(it
->first
);
1296 * Client Protocol V1
1299 CtPtr
ProtocolV1::send_client_banner() {
1300 ldout(cct
, 20) << __func__
<< dendl
;
1304 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1305 return WRITE(bl
, handle_client_banner_write
);
1308 CtPtr
ProtocolV1::handle_client_banner_write(int r
) {
1309 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1312 ldout(cct
, 1) << __func__
<< " write client banner failed" << dendl
;
1315 ldout(cct
, 10) << __func__
<< " connect write banner done: "
1316 << connection
->get_peer_addr() << dendl
;
1318 return wait_server_banner();
1321 CtPtr
ProtocolV1::wait_server_banner() {
1322 state
= CONNECTING_WAIT_BANNER_AND_IDENTIFY
;
1324 ldout(cct
, 20) << __func__
<< dendl
;
1326 bufferlist myaddrbl
;
1327 unsigned banner_len
= strlen(CEPH_BANNER
);
1328 unsigned need_len
= banner_len
+ sizeof(ceph_entity_addr
) * 2;
1329 return READ(need_len
, handle_server_banner_and_identify
);
1332 CtPtr
ProtocolV1::handle_server_banner_and_identify(char *buffer
, int r
) {
1333 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1336 ldout(cct
, 1) << __func__
<< " read banner and identify addresses failed"
1341 unsigned banner_len
= strlen(CEPH_BANNER
);
1342 if (memcmp(buffer
, CEPH_BANNER
, banner_len
)) {
1343 ldout(cct
, 0) << __func__
<< " connect protocol error (bad banner) on peer "
1344 << connection
->get_peer_addr() << dendl
;
1349 entity_addr_t paddr
, peer_addr_for_me
;
1351 bl
.append(buffer
+ banner_len
, sizeof(ceph_entity_addr
) * 2);
1352 auto p
= bl
.cbegin();
1355 decode(peer_addr_for_me
, p
);
1356 } catch (const buffer::error
&e
) {
1357 lderr(cct
) << __func__
<< " decode peer addr failed " << dendl
;
1360 ldout(cct
, 20) << __func__
<< " connect read peer addr " << paddr
1361 << " on socket " << connection
->cs
.fd() << dendl
;
1363 entity_addr_t peer_addr
= connection
->peer_addrs
->legacy_addr();
1364 if (peer_addr
!= paddr
) {
1365 if (paddr
.is_blank_ip() && peer_addr
.get_port() == paddr
.get_port() &&
1366 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1367 ldout(cct
, 0) << __func__
<< " connect claims to be " << paddr
<< " not "
1368 << peer_addr
<< " - presumably this is the same node!"
1371 ldout(cct
, 10) << __func__
<< " connect claims to be " << paddr
<< " not "
1372 << peer_addr
<< dendl
;
1377 ldout(cct
, 20) << __func__
<< " connect peer addr for me is "
1378 << peer_addr_for_me
<< dendl
;
1379 connection
->lock
.unlock();
1380 messenger
->learned_addr(peer_addr_for_me
);
1381 if (cct
->_conf
->ms_inject_internal_delays
&&
1382 cct
->_conf
->ms_inject_socket_failures
) {
1383 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
1384 ldout(cct
, 10) << __func__
<< " sleep for "
1385 << cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1387 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
1392 connection
->lock
.lock();
1393 if (state
!= CONNECTING_WAIT_BANNER_AND_IDENTIFY
) {
1394 ldout(cct
, 1) << __func__
1395 << " state changed while learned_addr, mark_down or "
1396 << " replacing must be happened just now" << dendl
;
1400 bufferlist myaddrbl
;
1401 encode(messenger
->get_myaddr_legacy(), myaddrbl
, 0); // legacy
1402 return WRITE(myaddrbl
, handle_my_addr_write
);
1405 CtPtr
ProtocolV1::handle_my_addr_write(int r
) {
1406 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1409 ldout(cct
, 2) << __func__
<< " connect couldn't write my addr, "
1410 << cpp_strerror(r
) << dendl
;
1413 ldout(cct
, 10) << __func__
<< " connect sent my addr "
1414 << messenger
->get_myaddr_legacy() << dendl
;
1416 return CONTINUE(send_connect_message
);
1419 CtPtr
ProtocolV1::send_connect_message() {
1420 state
= CONNECTING_SEND_CONNECT_MSG
;
1422 ldout(cct
, 20) << __func__
<< dendl
;
1425 authorizer
= messenger
->ms_deliver_get_authorizer(connection
->peer_type
);
1428 ceph_msg_connect connect
;
1429 connect
.features
= connection
->policy
.features_supported
;
1430 connect
.host_type
= messenger
->get_myname().type();
1431 connect
.global_seq
= global_seq
;
1432 connect
.connect_seq
= connect_seq
;
1433 connect
.protocol_version
=
1434 messenger
->get_proto_version(connection
->peer_type
, true);
1435 connect
.authorizer_protocol
= authorizer
? authorizer
->protocol
: 0;
1436 connect
.authorizer_len
= authorizer
? authorizer
->bl
.length() : 0;
1439 ldout(cct
, 10) << __func__
1440 << " connect_msg.authorizer_len=" << connect
.authorizer_len
1441 << " protocol=" << connect
.authorizer_protocol
<< dendl
;
1445 if (connection
->policy
.lossy
) {
1447 CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1451 bl
.append((char *)&connect
, sizeof(connect
));
1453 bl
.append(authorizer
->bl
.c_str(), authorizer
->bl
.length());
1456 ldout(cct
, 10) << __func__
<< " connect sending gseq=" << global_seq
1457 << " cseq=" << connect_seq
1458 << " proto=" << connect
.protocol_version
<< dendl
;
1460 return WRITE(bl
, handle_connect_message_write
);
1463 CtPtr
ProtocolV1::handle_connect_message_write(int r
) {
1464 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1467 ldout(cct
, 2) << __func__
<< " connect couldn't send reply "
1468 << cpp_strerror(r
) << dendl
;
1472 ldout(cct
, 20) << __func__
1473 << " connect wrote (self +) cseq, waiting for reply" << dendl
;
1475 return wait_connect_reply();
1478 CtPtr
ProtocolV1::wait_connect_reply() {
1479 ldout(cct
, 20) << __func__
<< dendl
;
1481 memset(&connect_reply
, 0, sizeof(connect_reply
));
1482 return READ(sizeof(connect_reply
), handle_connect_reply_1
);
1485 CtPtr
ProtocolV1::handle_connect_reply_1(char *buffer
, int r
) {
1486 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1489 ldout(cct
, 1) << __func__
<< " read connect reply failed" << dendl
;
1493 connect_reply
= *((ceph_msg_connect_reply
*)buffer
);
1495 ldout(cct
, 20) << __func__
<< " connect got reply tag "
1496 << (int)connect_reply
.tag
<< " connect_seq "
1497 << connect_reply
.connect_seq
<< " global_seq "
1498 << connect_reply
.global_seq
<< " proto "
1499 << connect_reply
.protocol_version
<< " flags "
1500 << (int)connect_reply
.flags
<< " features "
1501 << connect_reply
.features
<< dendl
;
1503 if (connect_reply
.authorizer_len
) {
1504 return wait_connect_reply_auth();
1507 return handle_connect_reply_2();
1510 CtPtr
ProtocolV1::wait_connect_reply_auth() {
1511 ldout(cct
, 20) << __func__
<< dendl
;
1513 ldout(cct
, 10) << __func__
1514 << " reply.authorizer_len=" << connect_reply
.authorizer_len
1517 ceph_assert(connect_reply
.authorizer_len
< 4096);
1519 return READ(connect_reply
.authorizer_len
, handle_connect_reply_auth
);
1522 CtPtr
ProtocolV1::handle_connect_reply_auth(char *buffer
, int r
) {
1523 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1526 ldout(cct
, 1) << __func__
<< " read connect reply authorizer failed"
1531 bufferlist authorizer_reply
;
1532 authorizer_reply
.append(buffer
, connect_reply
.authorizer_len
);
1534 if (connect_reply
.tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
) {
1535 ldout(cct
, 10) << __func__
<< " connect got auth challenge" << dendl
;
1536 authorizer
->add_challenge(cct
, authorizer_reply
);
1537 return CONTINUE(send_connect_message
);
1540 auto iter
= authorizer_reply
.cbegin();
1541 if (authorizer
&& !authorizer
->verify_reply(iter
,
1542 nullptr /* connection_secret */)) {
1543 ldout(cct
, 0) << __func__
<< " failed verifying authorize reply" << dendl
;
1547 return handle_connect_reply_2();
1550 CtPtr
ProtocolV1::handle_connect_reply_2() {
1551 ldout(cct
, 20) << __func__
<< dendl
;
1553 if (connect_reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1554 ldout(cct
, 0) << __func__
<< " connect protocol feature mismatch, my "
1555 << std::hex
<< connection
->policy
.features_supported
1556 << " < peer " << connect_reply
.features
<< " missing "
1557 << (connect_reply
.features
&
1558 ~connection
->policy
.features_supported
)
1559 << std::dec
<< dendl
;
1563 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1564 ldout(cct
, 0) << __func__
<< " connect protocol version mismatch, my "
1565 << messenger
->get_proto_version(connection
->peer_type
, true)
1566 << " != " << connect_reply
.protocol_version
<< dendl
;
1570 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1571 ldout(cct
, 0) << __func__
<< " connect got BADAUTHORIZER" << dendl
;
1575 if (connect_reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1576 ldout(cct
, 0) << __func__
<< " connect got RESETSESSION" << dendl
;
1580 // see session_reset
1581 connection
->outcoming_bl
.clear();
1583 return CONTINUE(send_connect_message
);
1586 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1587 global_seq
= messenger
->get_global_seq(connect_reply
.global_seq
);
1588 ldout(cct
, 5) << __func__
<< " connect got RETRY_GLOBAL "
1589 << connect_reply
.global_seq
<< " chose new " << global_seq
1591 return CONTINUE(send_connect_message
);
1594 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1595 ceph_assert(connect_reply
.connect_seq
> connect_seq
);
1596 ldout(cct
, 5) << __func__
<< " connect got RETRY_SESSION " << connect_seq
1597 << " -> " << connect_reply
.connect_seq
<< dendl
;
1598 connect_seq
= connect_reply
.connect_seq
;
1599 return CONTINUE(send_connect_message
);
1602 if (connect_reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1603 ldout(cct
, 1) << __func__
<< " connect got WAIT (connection race)" << dendl
;
1608 uint64_t feat_missing
;
1610 connection
->policy
.features_required
& ~(uint64_t)connect_reply
.features
;
1612 ldout(cct
, 1) << __func__
<< " missing required features " << std::hex
1613 << feat_missing
<< std::dec
<< dendl
;
1617 if (connect_reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1620 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1623 return wait_ack_seq();
1626 if (connect_reply
.tag
== CEPH_MSGR_TAG_READY
) {
1627 ldout(cct
, 10) << __func__
<< " got CEPH_MSGR_TAG_READY " << dendl
;
1630 return client_ready();
1633 CtPtr
ProtocolV1::wait_ack_seq() {
1634 ldout(cct
, 20) << __func__
<< dendl
;
1636 return READ(sizeof(uint64_t), handle_ack_seq
);
1639 CtPtr
ProtocolV1::handle_ack_seq(char *buffer
, int r
) {
1640 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1643 ldout(cct
, 1) << __func__
<< " read connect ack seq failed" << dendl
;
1647 uint64_t newly_acked_seq
= 0;
1649 newly_acked_seq
= *((uint64_t *)buffer
);
1650 ldout(cct
, 2) << __func__
<< " got newly_acked_seq " << newly_acked_seq
1651 << " vs out_seq " << out_seq
<< dendl
;
1652 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
1655 uint64_t s
= in_seq
;
1656 bl
.append((char *)&s
, sizeof(s
));
1658 return WRITE(bl
, handle_in_seq_write
);
1661 CtPtr
ProtocolV1::handle_in_seq_write(int r
) {
1662 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1665 ldout(cct
, 10) << __func__
<< " failed to send in_seq " << dendl
;
1669 ldout(cct
, 10) << __func__
<< " send in_seq done " << dendl
;
1671 return client_ready();
1674 CtPtr
ProtocolV1::client_ready() {
1675 ldout(cct
, 20) << __func__
<< dendl
;
1678 peer_global_seq
= connect_reply
.global_seq
;
1679 connection
->policy
.lossy
= connect_reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1683 ceph_assert(connect_seq
== connect_reply
.connect_seq
);
1684 backoff
= utime_t();
1685 connection
->set_features((uint64_t)connect_reply
.features
&
1686 (uint64_t)connection
->policy
.features_supported
);
1687 ldout(cct
, 10) << __func__
<< " connect success " << connect_seq
1688 << ", lossy = " << connection
->policy
.lossy
<< ", features "
1689 << connection
->get_features() << dendl
;
1691 // If we have an authorizer, get a new AuthSessionHandler to deal with
1692 // ongoing security of the connection. PLR
1693 if (authorizer
!= NULL
) {
1694 ldout(cct
, 10) << __func__
<< " setting up session_security with auth "
1695 << authorizer
<< dendl
;
1696 session_security
.reset(get_auth_session_handler(
1697 cct
, authorizer
->protocol
,
1698 authorizer
->session_key
,
1699 connection
->get_features()));
1701 // We have no authorizer, so we shouldn't be applying security to messages
1702 // in this AsyncConnection. PLR
1703 ldout(cct
, 10) << __func__
<< " no authorizer, clearing session_security"
1705 session_security
.reset();
1708 if (connection
->delay_state
) {
1709 ceph_assert(connection
->delay_state
->ready());
1711 connection
->dispatch_queue
->queue_connect(connection
);
1712 messenger
->ms_deliver_handle_fast_connect(connection
);
1718 * Server Protocol V1
1721 CtPtr
ProtocolV1::send_server_banner() {
1722 ldout(cct
, 20) << __func__
<< dendl
;
1727 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1729 // as a server, we should have a legacy addr if we accepted this connection.
1730 auto legacy
= messenger
->get_myaddrs().legacy_addr();
1731 encode(legacy
, bl
, 0); // legacy
1732 connection
->port
= legacy
.get_port();
1733 encode(connection
->target_addr
, bl
, 0); // legacy
1735 ldout(cct
, 1) << __func__
<< " sd=" << connection
->cs
.fd()
1736 << " legacy " << legacy
1737 << " socket_addr " << connection
->socket_addr
1738 << " target_addr " << connection
->target_addr
1741 return WRITE(bl
, handle_server_banner_write
);
1744 CtPtr
ProtocolV1::handle_server_banner_write(int r
) {
1745 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1748 ldout(cct
, 1) << " write server banner failed" << dendl
;
1751 ldout(cct
, 10) << __func__
<< " write banner and addr done: "
1752 << connection
->get_peer_addr() << dendl
;
1754 return wait_client_banner();
1757 CtPtr
ProtocolV1::wait_client_banner() {
1758 ldout(cct
, 20) << __func__
<< dendl
;
1760 return READ(strlen(CEPH_BANNER
) + sizeof(ceph_entity_addr
),
1761 handle_client_banner
);
1764 CtPtr
ProtocolV1::handle_client_banner(char *buffer
, int r
) {
1765 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1768 ldout(cct
, 1) << __func__
<< " read peer banner and addr failed" << dendl
;
1772 if (memcmp(buffer
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1773 ldout(cct
, 1) << __func__
<< " accept peer sent bad banner '" << buffer
1774 << "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
1779 entity_addr_t peer_addr
;
1781 addr_bl
.append(buffer
+ strlen(CEPH_BANNER
), sizeof(ceph_entity_addr
));
1783 auto ti
= addr_bl
.cbegin();
1784 decode(peer_addr
, ti
);
1785 } catch (const buffer::error
&e
) {
1786 lderr(cct
) << __func__
<< " decode peer_addr failed " << dendl
;
1790 ldout(cct
, 10) << __func__
<< " accept peer addr is " << peer_addr
<< dendl
;
1791 if (peer_addr
.is_blank_ip()) {
1792 // peer apparently doesn't know what ip they have; figure it out for them.
1793 int port
= peer_addr
.get_port();
1794 peer_addr
.set_sockaddr(connection
->target_addr
.get_sockaddr());
1795 peer_addr
.set_port(port
);
1797 ldout(cct
, 0) << __func__
<< " accept peer addr is really " << peer_addr
1798 << " (socket is " << connection
->target_addr
<< ")" << dendl
;
1800 connection
->set_peer_addr(peer_addr
); // so that connection_state gets set up
1801 connection
->target_addr
= peer_addr
;
1803 return CONTINUE(wait_connect_message
);
1806 CtPtr
ProtocolV1::wait_connect_message() {
1807 ldout(cct
, 20) << __func__
<< dendl
;
1809 memset(&connect_msg
, 0, sizeof(connect_msg
));
1810 return READ(sizeof(connect_msg
), handle_connect_message_1
);
1813 CtPtr
ProtocolV1::handle_connect_message_1(char *buffer
, int r
) {
1814 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1817 ldout(cct
, 1) << __func__
<< " read connect msg failed" << dendl
;
1821 connect_msg
= *((ceph_msg_connect
*)buffer
);
1823 state
= ACCEPTING_WAIT_CONNECT_MSG_AUTH
;
1825 if (connect_msg
.authorizer_len
) {
1826 return wait_connect_message_auth();
1829 return handle_connect_message_2();
1832 CtPtr
ProtocolV1::wait_connect_message_auth() {
1833 ldout(cct
, 20) << __func__
<< dendl
;
1834 authorizer_buf
.clear();
1835 authorizer_buf
.push_back(buffer::create(connect_msg
.authorizer_len
));
1836 return READB(connect_msg
.authorizer_len
, authorizer_buf
.c_str(),
1837 handle_connect_message_auth
);
1840 CtPtr
ProtocolV1::handle_connect_message_auth(char *buffer
, int r
) {
1841 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1844 ldout(cct
, 1) << __func__
<< " read connect authorizer failed" << dendl
;
1848 return handle_connect_message_2();
1851 CtPtr
ProtocolV1::handle_connect_message_2() {
1852 ldout(cct
, 20) << __func__
<< dendl
;
1854 ldout(cct
, 20) << __func__
<< " accept got peer connect_seq "
1855 << connect_msg
.connect_seq
<< " global_seq "
1856 << connect_msg
.global_seq
<< dendl
;
1858 connection
->set_peer_type(connect_msg
.host_type
);
1859 connection
->policy
= messenger
->get_policy(connect_msg
.host_type
);
1861 ldout(cct
, 10) << __func__
<< " accept of host_type " << connect_msg
.host_type
1862 << ", policy.lossy=" << connection
->policy
.lossy
1863 << " policy.server=" << connection
->policy
.server
1864 << " policy.standby=" << connection
->policy
.standby
1865 << " policy.resetcheck=" << connection
->policy
.resetcheck
1866 << " features 0x" << std::hex
<< (uint64_t)connect_msg
.features
1870 ceph_msg_connect_reply reply
;
1871 bufferlist authorizer_reply
;
1873 memset(&reply
, 0, sizeof(reply
));
1874 reply
.protocol_version
=
1875 messenger
->get_proto_version(connection
->peer_type
, false);
1878 ldout(cct
, 10) << __func__
<< " accept my proto " << reply
.protocol_version
1879 << ", their proto " << connect_msg
.protocol_version
<< dendl
;
1881 if (connect_msg
.protocol_version
!= reply
.protocol_version
) {
1882 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER
, reply
,
1886 // require signatures for cephx?
1887 if (connect_msg
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
1888 if (connection
->peer_type
== CEPH_ENTITY_TYPE_OSD
||
1889 connection
->peer_type
== CEPH_ENTITY_TYPE_MDS
) {
1890 if (cct
->_conf
->cephx_require_signatures
||
1891 cct
->_conf
->cephx_cluster_require_signatures
) {
1894 << " using cephx, requiring MSG_AUTH feature bit for cluster"
1896 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
1899 if (cct
->_conf
->cephx_require_signatures
||
1900 cct
->_conf
->cephx_service_require_signatures
) {
1903 << " using cephx, requiring MSG_AUTH feature bit for service"
1905 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
1910 uint64_t feat_missing
=
1911 connection
->policy
.features_required
& ~(uint64_t)connect_msg
.features
;
1913 ldout(cct
, 1) << __func__
<< " peer missing required features " << std::hex
1914 << feat_missing
<< std::dec
<< dendl
;
1915 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES
, reply
,
1919 bufferlist auth_bl_copy
= authorizer_buf
;
1920 connection
->lock
.unlock();
1921 ldout(cct
,10) << __func__
<< " authorizor_protocol "
1922 << connect_msg
.authorizer_protocol
1923 << " len " << auth_bl_copy
.length()
1925 bool authorizer_valid
;
1926 bool need_challenge
= HAVE_FEATURE(connect_msg
.features
, CEPHX_V2
);
1927 bool had_challenge
= (bool)authorizer_challenge
;
1928 if (!messenger
->ms_deliver_verify_authorizer(
1929 connection
, connection
->peer_type
, connect_msg
.authorizer_protocol
,
1930 auth_bl_copy
, authorizer_reply
, authorizer_valid
, session_key
,
1931 nullptr /* connection_secret */,
1932 need_challenge
? &authorizer_challenge
: nullptr) ||
1933 !authorizer_valid
) {
1934 connection
->lock
.lock();
1935 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
1936 ldout(cct
, 1) << __func__
1937 << " state changed while accept, it must be mark_down"
1939 ceph_assert(state
== CLOSED
);
1943 if (need_challenge
&& !had_challenge
&& authorizer_challenge
) {
1944 ldout(cct
, 10) << __func__
<< ": challenging authorizer" << dendl
;
1945 ceph_assert(authorizer_reply
.length());
1946 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
,
1947 reply
, authorizer_reply
);
1949 ldout(cct
, 0) << __func__
<< ": got bad authorizer, auth_reply_len="
1950 << authorizer_reply
.length() << dendl
;
1951 session_security
.reset();
1952 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER
, reply
,
1957 // We've verified the authorizer for this AsyncConnection, so set up the
1958 // session security structure. PLR
1959 ldout(cct
, 10) << __func__
<< " accept setting up session_security." << dendl
;
1962 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
1964 connection
->inject_delay();
1966 connection
->lock
.lock();
1967 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
1968 ldout(cct
, 1) << __func__
1969 << " state changed while accept, it must be mark_down"
1971 ceph_assert(state
== CLOSED
);
1975 if (existing
== connection
) {
1978 if (existing
&& existing
->protocol
->proto_type
!= 1) {
1979 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
1980 << existing
->protocol
.get() << " version is "
1981 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
1982 existing
->mark_down();
1987 // There is no possible that existing connection will acquire this
1988 // connection's lock
1989 existing
->lock
.lock(); // skip lockdep check (we are locking a second
1990 // AsyncConnection here)
1992 ldout(cct
,10) << __func__
<< " existing=" << existing
<< " exproto="
1993 << existing
->protocol
.get() << dendl
;
1994 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
1995 ceph_assert(exproto
);
1996 ceph_assert(exproto
->proto_type
== 1);
1998 if (exproto
->state
== CLOSED
) {
1999 ldout(cct
, 1) << __func__
<< " existing " << existing
2000 << " already closed." << dendl
;
2001 existing
->lock
.unlock();
2004 return open(reply
, authorizer_reply
);
2007 if (exproto
->replacing
) {
2008 ldout(cct
, 1) << __func__
2009 << " existing racing replace happened while replacing."
2010 << " existing_state="
2011 << connection
->get_state_name(existing
->state
) << dendl
;
2012 reply
.global_seq
= exproto
->peer_global_seq
;
2013 existing
->lock
.unlock();
2014 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2018 if (connect_msg
.global_seq
< exproto
->peer_global_seq
) {
2019 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2020 << exproto
->peer_global_seq
<< " > "
2021 << connect_msg
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
2022 reply
.global_seq
= exproto
->peer_global_seq
; // so we can send it below..
2023 existing
->lock
.unlock();
2024 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2027 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2028 << exproto
->peer_global_seq
2029 << " <= " << connect_msg
.global_seq
<< ", looks ok"
2033 if (existing
->policy
.lossy
) {
2036 << " accept replacing existing (lossy) channel (new one lossy="
2037 << connection
->policy
.lossy
<< ")" << dendl
;
2038 exproto
->session_reset();
2039 return replace(existing
, reply
, authorizer_reply
);
2042 ldout(cct
, 1) << __func__
<< " accept connect_seq "
2043 << connect_msg
.connect_seq
2044 << " vs existing csq=" << exproto
->connect_seq
2045 << " existing_state="
2046 << connection
->get_state_name(existing
->state
) << dendl
;
2048 if (connect_msg
.connect_seq
== 0 && exproto
->connect_seq
> 0) {
2051 << " accept peer reset, then tried to connect to us, replacing"
2053 // this is a hard reset from peer
2054 is_reset_from_peer
= true;
2055 if (connection
->policy
.resetcheck
) {
2056 exproto
->session_reset(); // this resets out_queue, msg_ and
2059 return replace(existing
, reply
, authorizer_reply
);
2062 if (connect_msg
.connect_seq
< exproto
->connect_seq
) {
2063 // old attempt, or we sent READY but they didn't get it.
2064 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".cseq "
2065 << exproto
->connect_seq
<< " > " << connect_msg
.connect_seq
2066 << ", RETRY_SESSION" << dendl
;
2067 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2068 existing
->lock
.unlock();
2069 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2073 if (connect_msg
.connect_seq
== exproto
->connect_seq
) {
2074 // if the existing connection successfully opened, and/or
2075 // subsequently went to standby, then the peer should bump
2076 // their connect_seq and retry: this is not a connection race
2077 // we need to resolve here.
2078 if (exproto
->state
== OPENED
|| exproto
->state
== STANDBY
) {
2079 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2080 << existing
<< ".cseq " << exproto
->connect_seq
2081 << " == " << connect_msg
.connect_seq
2082 << ", OPEN|STANDBY, RETRY_SESSION " << dendl
;
2083 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2085 if (connection
->policy
.resetcheck
&& exproto
->connect_seq
== 0) {
2086 return replace(existing
, reply
, authorizer_reply
);
2089 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2090 existing
->lock
.unlock();
2091 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2096 if (connection
->peer_addrs
->legacy_addr() < messenger
->get_myaddr_legacy() ||
2097 existing
->policy
.server
) {
2099 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2100 << existing
<< ".cseq " << exproto
->connect_seq
2101 << " == " << connect_msg
.connect_seq
2102 << ", or we are server, replacing my attempt" << dendl
;
2103 return replace(existing
, reply
, authorizer_reply
);
2105 // our existing outgoing wins
2106 ldout(messenger
->cct
, 10)
2107 << __func__
<< " accept connection race, existing " << existing
2108 << ".cseq " << exproto
->connect_seq
2109 << " == " << connect_msg
.connect_seq
<< ", sending WAIT" << dendl
;
2110 ceph_assert(connection
->peer_addrs
->legacy_addr() >
2111 messenger
->get_myaddr_legacy());
2112 existing
->lock
.unlock();
2113 // make sure we follow through with opening the existing
2114 // connection (if it isn't yet open) since we know the peer
2115 // has something to send to us.
2116 existing
->send_keepalive();
2117 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT
, reply
,
2122 ceph_assert(connect_msg
.connect_seq
> exproto
->connect_seq
);
2123 ceph_assert(connect_msg
.global_seq
>= exproto
->peer_global_seq
);
2124 if (connection
->policy
.resetcheck
&& // RESETSESSION only used by servers;
2125 // peers do not reset each other
2126 exproto
->connect_seq
== 0) {
2127 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2128 << connect_msg
.connect_seq
<< ", " << existing
2129 << ".cseq = " << exproto
->connect_seq
2130 << "), sending RESETSESSION " << dendl
;
2131 existing
->lock
.unlock();
2132 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2137 ldout(cct
, 10) << __func__
<< " accept peer sent cseq "
2138 << connect_msg
.connect_seq
<< " > " << exproto
->connect_seq
2140 return replace(existing
, reply
, authorizer_reply
);
2142 else if (!replacing
&& connect_msg
.connect_seq
> 0) {
2143 // we reset, and they are opening a new session
2144 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2145 << connect_msg
.connect_seq
<< "), sending RESETSESSION"
2147 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2151 ldout(cct
, 10) << __func__
<< " accept new session" << dendl
;
2153 return open(reply
, authorizer_reply
);
2157 CtPtr
ProtocolV1::send_connect_message_reply(char tag
,
2158 ceph_msg_connect_reply
&reply
,
2159 bufferlist
&authorizer_reply
) {
2160 ldout(cct
, 20) << __func__
<< dendl
;
2161 bufferlist reply_bl
;
2164 ((uint64_t)connect_msg
.features
& connection
->policy
.features_supported
) |
2165 connection
->policy
.features_required
;
2166 reply
.authorizer_len
= authorizer_reply
.length();
2167 reply_bl
.append((char *)&reply
, sizeof(reply
));
2169 ldout(cct
, 10) << __func__
<< " reply features 0x" << std::hex
2170 << reply
.features
<< " = (policy sup 0x"
2171 << connection
->policy
.features_supported
2172 << " & connect 0x" << (uint64_t)connect_msg
.features
2173 << ") | policy req 0x"
2174 << connection
->policy
.features_required
2177 if (reply
.authorizer_len
) {
2178 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2179 authorizer_reply
.clear();
2182 return WRITE(reply_bl
, handle_connect_message_reply_write
);
2185 CtPtr
ProtocolV1::handle_connect_message_reply_write(int r
) {
2186 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2189 ldout(cct
, 1) << " write connect message reply failed" << dendl
;
2190 connection
->inject_delay();
2194 return CONTINUE(wait_connect_message
);
2197 CtPtr
ProtocolV1::replace(AsyncConnectionRef existing
,
2198 ceph_msg_connect_reply
&reply
,
2199 bufferlist
&authorizer_reply
) {
2200 ldout(cct
, 10) << __func__
<< " accept replacing " << existing
<< dendl
;
2202 connection
->inject_delay();
2203 if (existing
->policy
.lossy
) {
2204 // disconnect from the Connection
2205 ldout(cct
, 1) << __func__
<< " replacing on lossy channel, failing existing"
2207 existing
->protocol
->stop();
2208 existing
->dispatch_queue
->queue_reset(existing
.get());
2210 ceph_assert(can_write
== WriteStatus::NOWRITE
);
2211 existing
->write_lock
.lock();
2213 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
2215 // reset the in_seq if this is a hard reset from peer,
2216 // otherwise we respect our original connection's value
2217 if (is_reset_from_peer
) {
2218 exproto
->is_reset_from_peer
= true;
2221 connection
->center
->delete_file_event(connection
->cs
.fd(),
2222 EVENT_READABLE
| EVENT_WRITABLE
);
2224 if (existing
->delay_state
) {
2225 existing
->delay_state
->flush();
2226 ceph_assert(!connection
->delay_state
);
2228 exproto
->reset_recv_state();
2230 exproto
->connect_msg
.features
= connect_msg
.features
;
2232 auto temp_cs
= std::move(connection
->cs
);
2233 EventCenter
*new_center
= connection
->center
;
2234 Worker
*new_worker
= connection
->worker
;
2235 // avoid _stop shutdown replacing socket
2236 // queue a reset on the new connection, which we're dumping for the old
2239 connection
->dispatch_queue
->queue_reset(connection
);
2240 ldout(messenger
->cct
, 1)
2241 << __func__
<< " stop myself to swap existing" << dendl
;
2242 exproto
->can_write
= WriteStatus::REPLACING
;
2243 exproto
->replacing
= true;
2244 existing
->state_offset
= 0;
2245 // avoid previous thread modify event
2246 exproto
->state
= NONE
;
2247 existing
->state
= AsyncConnection::STATE_NONE
;
2248 // Discard existing prefetch buffer in `recv_buf`
2249 existing
->recv_start
= existing
->recv_end
= 0;
2250 // there shouldn't exist any buffer
2251 ceph_assert(connection
->recv_start
== connection
->recv_end
);
2253 exproto
->authorizer_challenge
.reset();
2255 auto deactivate_existing
= std::bind(
2256 [existing
, new_worker
, new_center
, exproto
, reply
,
2257 authorizer_reply
](ConnectedSocket
&cs
) mutable {
2258 // we need to delete time event in original thread
2260 std::lock_guard
<std::mutex
> l(existing
->lock
);
2261 existing
->write_lock
.lock();
2262 exproto
->requeue_sent();
2263 existing
->outcoming_bl
.clear();
2264 existing
->open_write
= false;
2265 existing
->write_lock
.unlock();
2266 if (exproto
->state
== NONE
) {
2267 existing
->shutdown_socket();
2268 existing
->cs
= std::move(cs
);
2269 existing
->worker
->references
--;
2270 new_worker
->references
++;
2271 existing
->logger
= new_worker
->get_perf_counter();
2272 existing
->worker
= new_worker
;
2273 existing
->center
= new_center
;
2274 if (existing
->delay_state
)
2275 existing
->delay_state
->set_center(new_center
);
2276 } else if (exproto
->state
== CLOSED
) {
2277 auto back_to_close
=
2278 std::bind([](ConnectedSocket
&cs
) mutable { cs
.close(); },
2280 new_center
->submit_to(new_center
->get_id(),
2281 std::move(back_to_close
), true);
2288 // Before changing existing->center, it may already exists some
2289 // events in existing->center's queue. Then if we mark down
2290 // `existing`, it will execute in another thread and clean up
2291 // connection. Previous event will result in segment fault
2292 auto transfer_existing
= [existing
, exproto
, reply
,
2293 authorizer_reply
]() mutable {
2294 std::lock_guard
<std::mutex
> l(existing
->lock
);
2295 if (exproto
->state
== CLOSED
) return;
2296 ceph_assert(exproto
->state
== NONE
);
2298 existing
->state
= AsyncConnection::STATE_CONNECTION_ESTABLISHED
;
2299 exproto
->state
= ACCEPTING
;
2301 existing
->center
->create_file_event(
2302 existing
->cs
.fd(), EVENT_READABLE
, existing
->read_handler
);
2303 reply
.global_seq
= exproto
->peer_global_seq
;
2304 exproto
->run_continuation(exproto
->send_connect_message_reply(
2305 CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
, authorizer_reply
));
2307 if (existing
->center
->in_thread())
2308 transfer_existing();
2310 existing
->center
->submit_to(existing
->center
->get_id(),
2311 std::move(transfer_existing
), true);
2313 std::move(temp_cs
));
2315 existing
->center
->submit_to(existing
->center
->get_id(),
2316 std::move(deactivate_existing
), true);
2317 existing
->write_lock
.unlock();
2318 existing
->lock
.unlock();
2321 existing
->lock
.unlock();
2323 return open(reply
, authorizer_reply
);
2326 CtPtr
ProtocolV1::open(ceph_msg_connect_reply
&reply
,
2327 bufferlist
&authorizer_reply
) {
2328 ldout(cct
, 20) << __func__
<< dendl
;
2330 connect_seq
= connect_msg
.connect_seq
+ 1;
2331 peer_global_seq
= connect_msg
.global_seq
;
2332 ldout(cct
, 10) << __func__
<< " accept success, connect_seq = " << connect_seq
2333 << " in_seq=" << in_seq
<< ", sending READY" << dendl
;
2335 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2337 if ((connect_msg
.features
& CEPH_FEATURE_RECONNECT_SEQ
) &&
2338 !is_reset_from_peer
) {
2339 reply
.tag
= CEPH_MSGR_TAG_SEQ
;
2340 wait_for_seq
= true;
2342 reply
.tag
= CEPH_MSGR_TAG_READY
;
2343 wait_for_seq
= false;
2344 out_seq
= discard_requeued_up_to(out_seq
, 0);
2345 is_reset_from_peer
= false;
2350 reply
.features
= connection
->policy
.features_supported
;
2351 reply
.global_seq
= messenger
->get_global_seq();
2352 reply
.connect_seq
= connect_seq
;
2354 reply
.authorizer_len
= authorizer_reply
.length();
2355 if (connection
->policy
.lossy
) {
2356 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
2359 connection
->set_features((uint64_t)reply
.features
&
2360 (uint64_t)connect_msg
.features
);
2361 ldout(cct
, 10) << __func__
<< " accept features "
2362 << connection
->get_features()
2363 << " authorizer_protocol "
2364 << connect_msg
.authorizer_protocol
<< dendl
;
2366 session_security
.reset(
2367 get_auth_session_handler(cct
, connect_msg
.authorizer_protocol
,
2369 connection
->get_features()));
2371 bufferlist reply_bl
;
2372 reply_bl
.append((char *)&reply
, sizeof(reply
));
2374 if (reply
.authorizer_len
) {
2375 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2378 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
2379 uint64_t s
= in_seq
;
2380 reply_bl
.append((char *)&s
, sizeof(s
));
2383 connection
->lock
.unlock();
2384 // Because "replacing" will prevent other connections preempt this addr,
2385 // it's safe that here we don't acquire Connection's lock
2386 ssize_t r
= messenger
->accept_conn(connection
);
2388 connection
->inject_delay();
2390 connection
->lock
.lock();
2393 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2394 << connection
->peer_addrs
->legacy_addr()
2395 << " just fail later one(this)" << dendl
;
2396 ldout(cct
, 10) << "accept fault after register" << dendl
;
2397 connection
->inject_delay();
2400 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2401 ldout(cct
, 1) << __func__
2402 << " state changed while accept_conn, it must be mark_down"
2404 ceph_assert(state
== CLOSED
|| state
== NONE
);
2405 ldout(cct
, 10) << "accept fault after register" << dendl
;
2406 messenger
->unregister_conn(connection
);
2407 connection
->inject_delay();
2411 return WRITE(reply_bl
, handle_ready_connect_message_reply_write
);
2414 CtPtr
ProtocolV1::handle_ready_connect_message_reply_write(int r
) {
2415 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2418 ldout(cct
, 1) << __func__
<< " write ready connect message reply failed"
2424 connection
->dispatch_queue
->queue_accept(connection
);
2425 messenger
->ms_deliver_handle_fast_accept(connection
);
2428 state
= ACCEPTING_HANDLED_CONNECT_MSG
;
2434 return server_ready();
2437 CtPtr
ProtocolV1::wait_seq() {
2438 ldout(cct
, 20) << __func__
<< dendl
;
2440 return READ(sizeof(uint64_t), handle_seq
);
2443 CtPtr
ProtocolV1::handle_seq(char *buffer
, int r
) {
2444 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2447 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
2451 uint64_t newly_acked_seq
= *(uint64_t *)buffer
;
2452 ldout(cct
, 2) << __func__
<< " accept get newly_acked_seq " << newly_acked_seq
2454 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
2456 return server_ready();
2459 CtPtr
ProtocolV1::server_ready() {
2460 ldout(cct
, 20) << __func__
<< " session_security is "
2464 ldout(cct
, 20) << __func__
<< " accept done" << dendl
;
2465 memset(&connect_msg
, 0, sizeof(connect_msg
));
2467 if (connection
->delay_state
) {
2468 ceph_assert(connection
->delay_state
->ready());