1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV1.h"
6 #include "common/errno.h"
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
13 #define dout_subsys ceph_subsys_ms
15 #define dout_prefix _conn_prefix(_dout)
16 ostream
&ProtocolV1::_conn_prefix(std::ostream
*_dout
) {
17 return *_dout
<< "--1- " << messenger
->get_myaddrs() << " >> "
18 << *connection
->peer_addrs
20 << connection
<< " " << this
21 << " :" << connection
->port
<< " s=" << get_state_name(state
)
22 << " pgs=" << peer_global_seq
<< " cs=" << connect_seq
23 << " l=" << connection
->policy
.lossy
<< ").";
26 #define WRITE(B, C) write(CONTINUATION(C), B)
28 #define READ(L, C) read(CONTINUATION(C), L)
30 #define READB(L, B, C) read(CONTINUATION(C), L, B)
32 // Constant to limit starting sequence number to 2^31. Nothing special about
33 // it, just a big number. PLR
34 #define SEQ_MASK 0x7fffffff
36 const int ASYNC_COALESCE_THRESHOLD
= 256;
40 static void alloc_aligned_buffer(bufferlist
&data
, unsigned len
, unsigned off
) {
41 // create a buffer to read into that matches the data alignment
42 unsigned alloc_len
= 0;
45 if (off
& ~CEPH_PAGE_MASK
) {
47 alloc_len
+= CEPH_PAGE_SIZE
;
48 head
= std::min
<uint64_t>(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
52 bufferptr
ptr(buffer::create_small_page_aligned(alloc_len
));
53 if (head
) ptr
.set_offset(CEPH_PAGE_SIZE
- head
);
54 data
.push_back(std::move(ptr
));
61 ProtocolV1::ProtocolV1(AsyncConnection
*connection
)
62 : Protocol(1, connection
),
64 can_write(WriteStatus::NOWRITE
),
71 is_reset_from_peer(false),
77 temp_buffer
= new char[4096];
80 ProtocolV1::~ProtocolV1() {
81 ceph_assert(out_q
.empty());
82 ceph_assert(sent
.empty());
91 void ProtocolV1::connect() {
92 this->state
= START_CONNECT
;
94 // reset connect state variables
99 authorizer_buf
.clear();
100 memset(&connect_msg
, 0, sizeof(connect_msg
));
101 memset(&connect_reply
, 0, sizeof(connect_reply
));
103 global_seq
= messenger
->get_global_seq();
106 void ProtocolV1::accept() { this->state
= START_ACCEPT
; }
108 bool ProtocolV1::is_connected() {
109 return can_write
.load() == WriteStatus::CANWRITE
;
112 void ProtocolV1::stop() {
113 ldout(cct
, 20) << __func__
<< dendl
;
114 if (state
== CLOSED
) {
118 if (connection
->delay_state
) connection
->delay_state
->flush();
120 ldout(cct
, 2) << __func__
<< dendl
;
121 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
128 can_write
= WriteStatus::CLOSED
;
132 void ProtocolV1::fault() {
133 ldout(cct
, 20) << __func__
<< dendl
;
135 if (state
== CLOSED
|| state
== NONE
) {
136 ldout(cct
, 10) << __func__
<< " connection is already closed" << dendl
;
140 if (connection
->policy
.lossy
&& state
!= START_CONNECT
&&
141 state
!= CONNECTING
) {
142 ldout(cct
, 1) << __func__
<< " on lossy channel, failing" << dendl
;
144 connection
->dispatch_queue
->queue_reset(connection
);
148 connection
->write_lock
.lock();
149 can_write
= WriteStatus::NOWRITE
;
150 is_reset_from_peer
= false;
152 // requeue sent items
155 if (!once_ready
&& out_q
.empty() && state
>= START_ACCEPT
&&
156 state
<= ACCEPTING_WAIT_CONNECT_MSG_AUTH
&& !replacing
) {
157 ldout(cct
, 10) << __func__
<< " with nothing to send and in the half "
158 << " accept state just closed" << dendl
;
159 connection
->write_lock
.unlock();
161 connection
->dispatch_queue
->queue_reset(connection
);
170 if (connection
->policy
.standby
&& out_q
.empty() && !keepalive
&&
172 ldout(cct
, 10) << __func__
<< " with nothing to send, going to standby"
175 connection
->write_lock
.unlock();
179 connection
->write_lock
.unlock();
181 if ((state
>= START_CONNECT
&& state
<= CONNECTING_SEND_CONNECT_MSG
) ||
185 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
186 } else if (backoff
== utime_t()) {
187 backoff
.set_from_double(cct
->_conf
->ms_initial_backoff
);
190 if (backoff
> cct
->_conf
->ms_max_backoff
)
191 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
194 global_seq
= messenger
->get_global_seq();
195 state
= START_CONNECT
;
196 connection
->state
= AsyncConnection::STATE_CONNECTING
;
197 ldout(cct
, 10) << __func__
<< " waiting " << backoff
<< dendl
;
199 connection
->register_time_events
.insert(
200 connection
->center
->create_time_event(backoff
.to_nsec() / 1000,
201 connection
->wakeup_handler
));
203 // policy maybe empty when state is in accept
204 if (connection
->policy
.server
) {
205 ldout(cct
, 0) << __func__
<< " server, going to standby" << dendl
;
208 ldout(cct
, 0) << __func__
<< " initiating reconnect" << dendl
;
210 global_seq
= messenger
->get_global_seq();
211 state
= START_CONNECT
;
212 connection
->state
= AsyncConnection::STATE_CONNECTING
;
215 connection
->center
->dispatch_event_external(connection
->read_handler
);
219 void ProtocolV1::send_message(Message
*m
) {
221 uint64_t f
= connection
->get_features();
223 // TODO: Currently not all messages supports reencode like MOSDMap, so here
224 // only let fast dispatch support messages prepare message
225 bool can_fast_prepare
= messenger
->ms_can_fast_dispatch(m
);
226 if (can_fast_prepare
) {
227 prepare_send_message(f
, m
, bl
);
230 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
231 // "features" changes will change the payload encoding
232 if (can_fast_prepare
&&
233 (can_write
== WriteStatus::NOWRITE
|| connection
->get_features() != f
)) {
234 // ensure the correctness of message encoding
237 ldout(cct
, 5) << __func__
<< " clear encoded buffer previous " << f
238 << " != " << connection
->get_features() << dendl
;
240 if (can_write
== WriteStatus::CLOSED
) {
241 ldout(cct
, 10) << __func__
<< " connection closed."
242 << " Drop message " << m
<< dendl
;
245 m
->trace
.event("async enqueueing message");
246 out_q
[m
->get_priority()].emplace_back(std::move(bl
), m
);
247 ldout(cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
249 if (can_write
!= WriteStatus::REPLACING
) {
250 connection
->center
->dispatch_event_external(connection
->write_handler
);
255 void ProtocolV1::prepare_send_message(uint64_t features
, Message
*m
,
257 ldout(cct
, 20) << __func__
<< " m " << *m
<< dendl
;
259 // associate message with Connection (for benefit of encode_payload)
260 if (m
->empty_payload()) {
261 ldout(cct
, 20) << __func__
<< " encoding features " << features
<< " " << m
262 << " " << *m
<< dendl
;
264 ldout(cct
, 20) << __func__
<< " half-reencoding features " << features
265 << " " << m
<< " " << *m
<< dendl
;
268 // encode and copy out of *m
269 m
->encode(features
, messenger
->crcflags
);
271 bl
.append(m
->get_payload());
272 bl
.append(m
->get_middle());
273 bl
.append(m
->get_data());
276 void ProtocolV1::send_keepalive() {
277 ldout(cct
, 10) << __func__
<< dendl
;
278 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
279 if (can_write
!= WriteStatus::CLOSED
) {
281 connection
->center
->dispatch_event_external(connection
->write_handler
);
285 void ProtocolV1::read_event() {
286 ldout(cct
, 20) << __func__
<< dendl
;
289 CONTINUATION_RUN(CONTINUATION(send_client_banner
));
292 CONTINUATION_RUN(CONTINUATION(send_server_banner
));
295 CONTINUATION_RUN(CONTINUATION(wait_message
));
297 case THROTTLE_MESSAGE
:
298 CONTINUATION_RUN(CONTINUATION(throttle_message
));
301 CONTINUATION_RUN(CONTINUATION(throttle_bytes
));
303 case THROTTLE_DISPATCH_QUEUE
:
304 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue
));
311 void ProtocolV1::write_event() {
312 ldout(cct
, 10) << __func__
<< dendl
;
315 connection
->write_lock
.lock();
316 if (can_write
== WriteStatus::CANWRITE
) {
318 append_keepalive_or_ack();
322 auto start
= ceph::mono_clock::now();
326 Message
*m
= _get_next_outgoing(&data
);
331 if (!connection
->policy
.lossy
) {
336 more
= !out_q
.empty();
337 connection
->write_lock
.unlock();
339 // send_message or requeue messages may not encode message
340 if (!data
.length()) {
341 prepare_send_message(connection
->get_features(), m
, data
);
344 r
= write_message(m
, data
, more
);
346 connection
->write_lock
.lock();
350 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
354 } while (can_write
== WriteStatus::CANWRITE
);
355 connection
->write_lock
.unlock();
357 // if r > 0 mean data still lefted, so no need _try_send.
359 uint64_t left
= ack_left
;
363 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_ACK
);
364 connection
->outcoming_bl
.append((char *)&s
, sizeof(s
));
365 ldout(cct
, 10) << __func__
<< " try send msg ack, acked " << left
366 << " messages" << dendl
;
369 r
= connection
->_try_send(left
);
370 } else if (is_queued()) {
371 r
= connection
->_try_send();
375 connection
->logger
->tinc(l_msgr_running_send_time
,
376 ceph::mono_clock::now() - start
);
378 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
379 connection
->lock
.lock();
381 connection
->lock
.unlock();
385 connection
->write_lock
.unlock();
386 connection
->lock
.lock();
387 connection
->write_lock
.lock();
388 if (state
== STANDBY
&& !connection
->policy
.server
&& is_queued()) {
389 ldout(cct
, 10) << __func__
<< " policy.server is false" << dendl
;
390 connection
->_connect();
391 } else if (connection
->cs
&& state
!= NONE
&& state
!= CLOSED
&&
392 state
!= START_CONNECT
) {
393 r
= connection
->_try_send();
395 ldout(cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
396 connection
->write_lock
.unlock();
398 connection
->lock
.unlock();
402 connection
->write_lock
.unlock();
403 connection
->lock
.unlock();
407 bool ProtocolV1::is_queued() {
408 return !out_q
.empty() || connection
->is_queued();
411 void ProtocolV1::run_continuation(CtPtr pcontinuation
) {
413 CONTINUATION_RUN(*pcontinuation
);
417 CtPtr
ProtocolV1::read(CONTINUATION_RX_TYPE
<ProtocolV1
> &next
,
418 int len
, char *buffer
) {
420 buffer
= temp_buffer
;
422 ssize_t r
= connection
->read(len
, buffer
,
423 [&next
, this](char *buffer
, int r
) {
424 next
.setParams(buffer
, r
);
425 CONTINUATION_RUN(next
);
428 next
.setParams(buffer
, r
);
435 CtPtr
ProtocolV1::write(CONTINUATION_TX_TYPE
<ProtocolV1
> &next
,
436 bufferlist
&buffer
) {
437 ssize_t r
= connection
->write(buffer
, [&next
, this](int r
) {
439 CONTINUATION_RUN(next
);
449 CtPtr
ProtocolV1::ready() {
450 ldout(cct
, 25) << __func__
<< dendl
;
452 // make sure no pending tick timer
453 if (connection
->last_tick_id
) {
454 connection
->center
->delete_time_event(connection
->last_tick_id
);
456 connection
->last_tick_id
= connection
->center
->create_time_event(
457 connection
->inactive_timeout_us
, connection
->tick_handler
);
459 connection
->write_lock
.lock();
460 can_write
= WriteStatus::CANWRITE
;
462 connection
->center
->dispatch_event_external(connection
->write_handler
);
464 connection
->write_lock
.unlock();
465 connection
->maybe_start_delay_thread();
468 return wait_message();
471 CtPtr
ProtocolV1::wait_message() {
472 if (state
!= OPENED
) { // must have changed due to a replace
476 ldout(cct
, 20) << __func__
<< dendl
;
478 return READ(sizeof(char), handle_message
);
481 CtPtr
ProtocolV1::handle_message(char *buffer
, int r
) {
482 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
485 ldout(cct
, 1) << __func__
<< " read tag failed" << dendl
;
489 char tag
= buffer
[0];
490 ldout(cct
, 20) << __func__
<< " process tag " << (int)tag
<< dendl
;
492 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
493 ldout(cct
, 20) << __func__
<< " got KEEPALIVE" << dendl
;
494 connection
->set_last_keepalive(ceph_clock_now());
495 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
496 return READ(sizeof(ceph_timespec
), handle_keepalive2
);
497 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
498 return READ(sizeof(ceph_timespec
), handle_keepalive2_ack
);
499 } else if (tag
== CEPH_MSGR_TAG_ACK
) {
500 return READ(sizeof(ceph_le64
), handle_tag_ack
);
501 } else if (tag
== CEPH_MSGR_TAG_MSG
) {
502 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
503 ltt_recv_stamp
= ceph_clock_now();
505 recv_stamp
= ceph_clock_now();
506 ldout(cct
, 20) << __func__
<< " begin MSG" << dendl
;
507 return READ(sizeof(ceph_msg_header
), handle_message_header
);
508 } else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
509 ldout(cct
, 20) << __func__
<< " got CLOSE" << dendl
;
512 ldout(cct
, 0) << __func__
<< " bad tag " << (int)tag
<< dendl
;
518 CtPtr
ProtocolV1::handle_keepalive2(char *buffer
, int r
) {
519 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
522 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
526 ldout(cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
529 t
= (ceph_timespec
*)buffer
;
530 utime_t kp_t
= utime_t(*t
);
531 connection
->write_lock
.lock();
532 append_keepalive_or_ack(true, &kp_t
);
533 connection
->write_lock
.unlock();
535 ldout(cct
, 20) << __func__
<< " got KEEPALIVE2 " << kp_t
<< dendl
;
536 connection
->set_last_keepalive(ceph_clock_now());
538 if (is_connected()) {
539 connection
->center
->dispatch_event_external(connection
->write_handler
);
542 return CONTINUE(wait_message
);
545 void ProtocolV1::append_keepalive_or_ack(bool ack
, utime_t
*tp
) {
546 ldout(cct
, 10) << __func__
<< dendl
;
549 struct ceph_timespec ts
;
550 tp
->encode_timeval(&ts
);
551 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK
);
552 connection
->outcoming_bl
.append((char *)&ts
, sizeof(ts
));
553 } else if (connection
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
554 struct ceph_timespec ts
;
555 utime_t t
= ceph_clock_now();
556 t
.encode_timeval(&ts
);
557 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2
);
558 connection
->outcoming_bl
.append((char *)&ts
, sizeof(ts
));
560 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE
);
564 CtPtr
ProtocolV1::handle_keepalive2_ack(char *buffer
, int r
) {
565 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
568 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
573 t
= (ceph_timespec
*)buffer
;
574 connection
->set_last_keepalive_ack(utime_t(*t
));
575 ldout(cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
577 return CONTINUE(wait_message
);
580 CtPtr
ProtocolV1::handle_tag_ack(char *buffer
, int r
) {
581 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
584 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
589 seq
= *(ceph_le64
*)buffer
;
590 ldout(cct
, 20) << __func__
<< " got ACK" << dendl
;
592 ldout(cct
, 15) << __func__
<< " got ack seq " << seq
<< dendl
;
594 static const int max_pending
= 128;
596 Message
*pending
[max_pending
];
597 connection
->write_lock
.lock();
598 while (!sent
.empty() && sent
.front()->get_seq() <= seq
&& i
< max_pending
) {
599 Message
*m
= sent
.front();
602 ldout(cct
, 10) << __func__
<< " got ack seq " << seq
603 << " >= " << m
->get_seq() << " on " << m
<< " " << *m
606 connection
->write_lock
.unlock();
607 for (int k
= 0; k
< i
; k
++) {
611 return CONTINUE(wait_message
);
614 CtPtr
ProtocolV1::handle_message_header(char *buffer
, int r
) {
615 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
618 ldout(cct
, 1) << __func__
<< " read message header failed" << dendl
;
622 ldout(cct
, 20) << __func__
<< " got MSG header" << dendl
;
624 current_header
= *((ceph_msg_header
*)buffer
);
626 ldout(cct
, 20) << __func__
<< " got envelope type=" << current_header
.type
<< " src "
627 << entity_name_t(current_header
.src
) << " front=" << current_header
.front_len
628 << " data=" << current_header
.data_len
<< " off " << current_header
.data_off
631 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
632 __u32 header_crc
= 0;
633 header_crc
= ceph_crc32c(0, (unsigned char *)¤t_header
,
634 sizeof(current_header
) - sizeof(current_header
.crc
));
636 if (header_crc
!= current_header
.crc
) {
637 ldout(cct
, 0) << __func__
<< " got bad header crc " << header_crc
638 << " != " << current_header
.crc
<< dendl
;
649 state
= THROTTLE_MESSAGE
;
650 return CONTINUE(throttle_message
);
653 CtPtr
ProtocolV1::throttle_message() {
654 ldout(cct
, 20) << __func__
<< dendl
;
656 if (connection
->policy
.throttler_messages
) {
657 ldout(cct
, 10) << __func__
<< " wants " << 1
658 << " message from policy throttler "
659 << connection
->policy
.throttler_messages
->get_current()
660 << "/" << connection
->policy
.throttler_messages
->get_max()
662 if (!connection
->policy
.throttler_messages
->get_or_fail()) {
663 ldout(cct
, 10) << __func__
<< " wants 1 message from policy throttle "
664 << connection
->policy
.throttler_messages
->get_current()
665 << "/" << connection
->policy
.throttler_messages
->get_max()
666 << " failed, just wait." << dendl
;
667 // following thread pool deal with th full message queue isn't a
668 // short time, so we can wait a ms.
669 if (connection
->register_time_events
.empty()) {
670 connection
->register_time_events
.insert(
671 connection
->center
->create_time_event(1000,
672 connection
->wakeup_handler
));
678 state
= THROTTLE_BYTES
;
679 return CONTINUE(throttle_bytes
);
682 CtPtr
ProtocolV1::throttle_bytes() {
683 ldout(cct
, 20) << __func__
<< dendl
;
685 cur_msg_size
= current_header
.front_len
+ current_header
.middle_len
+
686 current_header
.data_len
;
688 if (connection
->policy
.throttler_bytes
) {
689 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
690 << " bytes from policy throttler "
691 << connection
->policy
.throttler_bytes
->get_current() << "/"
692 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
693 if (!connection
->policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
694 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
695 << " bytes from policy throttler "
696 << connection
->policy
.throttler_bytes
->get_current()
697 << "/" << connection
->policy
.throttler_bytes
->get_max()
698 << " failed, just wait." << dendl
;
699 // following thread pool deal with th full message queue isn't a
700 // short time, so we can wait a ms.
701 if (connection
->register_time_events
.empty()) {
702 connection
->register_time_events
.insert(
703 connection
->center
->create_time_event(
704 1000, connection
->wakeup_handler
));
711 state
= THROTTLE_DISPATCH_QUEUE
;
712 return CONTINUE(throttle_dispatch_queue
);
715 CtPtr
ProtocolV1::throttle_dispatch_queue() {
716 ldout(cct
, 20) << __func__
<< dendl
;
719 if (!connection
->dispatch_queue
->dispatch_throttler
.get_or_fail(
722 << __func__
<< " wants " << cur_msg_size
723 << " bytes from dispatch throttle "
724 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
725 << connection
->dispatch_queue
->dispatch_throttler
.get_max()
726 << " failed, just wait." << dendl
;
727 // following thread pool deal with th full message queue isn't a
728 // short time, so we can wait a ms.
729 if (connection
->register_time_events
.empty()) {
730 connection
->register_time_events
.insert(
731 connection
->center
->create_time_event(1000,
732 connection
->wakeup_handler
));
738 throttle_stamp
= ceph_clock_now();
740 state
= READ_MESSAGE_FRONT
;
741 return read_message_front();
744 CtPtr
ProtocolV1::read_message_front() {
745 ldout(cct
, 20) << __func__
<< dendl
;
747 unsigned front_len
= current_header
.front_len
;
749 if (!front
.length()) {
750 front
.push_back(buffer::create(front_len
));
752 return READB(front_len
, front
.c_str(), handle_message_front
);
754 return read_message_middle();
757 CtPtr
ProtocolV1::handle_message_front(char *buffer
, int r
) {
758 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
761 ldout(cct
, 1) << __func__
<< " read message front failed" << dendl
;
765 ldout(cct
, 20) << __func__
<< " got front " << front
.length() << dendl
;
767 return read_message_middle();
770 CtPtr
ProtocolV1::read_message_middle() {
771 ldout(cct
, 20) << __func__
<< dendl
;
773 if (current_header
.middle_len
) {
774 if (!middle
.length()) {
775 middle
.push_back(buffer::create(current_header
.middle_len
));
777 return READB(current_header
.middle_len
, middle
.c_str(),
778 handle_message_middle
);
781 return read_message_data_prepare();
784 CtPtr
ProtocolV1::handle_message_middle(char *buffer
, int r
) {
785 ldout(cct
, 20) << __func__
<< " r" << r
<< dendl
;
788 ldout(cct
, 1) << __func__
<< " read message middle failed" << dendl
;
792 ldout(cct
, 20) << __func__
<< " got middle " << middle
.length() << dendl
;
794 return read_message_data_prepare();
797 CtPtr
ProtocolV1::read_message_data_prepare() {
798 ldout(cct
, 20) << __func__
<< dendl
;
800 unsigned data_len
= le32_to_cpu(current_header
.data_len
);
801 unsigned data_off
= le32_to_cpu(current_header
.data_off
);
806 // rx_buffers is broken by design... see
807 // http://tracker.ceph.com/issues/22480
808 map
<ceph_tid_t
, pair
<bufferlist
, int> >::iterator p
=
809 connection
->rx_buffers
.find(current_header
.tid
);
810 if (p
!= connection
->rx_buffers
.end()) {
811 ldout(cct
, 10) << __func__
<< " seleting rx buffer v " << p
->second
.second
812 << " at offset " << data_off
<< " len "
813 << p
->second
.first
.length() << dendl
;
814 data_buf
= p
->second
.first
;
815 // make sure it's big enough
816 if (data_buf
.length() < data_len
)
817 data_buf
.push_back(buffer::create(data_len
- data_buf
.length()));
818 data_blp
= data_buf
.begin();
820 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
821 << data_off
<< dendl
;
822 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
823 data_blp
= data_buf
.begin();
826 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
827 << data_off
<< dendl
;
828 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
829 data_blp
= data_buf
.begin();
835 return CONTINUE(read_message_data
);
838 CtPtr
ProtocolV1::read_message_data() {
839 ldout(cct
, 20) << __func__
<< " msg_left=" << msg_left
<< dendl
;
842 bufferptr bp
= data_blp
.get_current_ptr();
843 unsigned read_len
= std::min(bp
.length(), msg_left
);
845 return READB(read_len
, bp
.c_str(), handle_message_data
);
848 return read_message_footer();
851 CtPtr
ProtocolV1::handle_message_data(char *buffer
, int r
) {
852 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
855 ldout(cct
, 1) << __func__
<< " read data error " << dendl
;
859 bufferptr bp
= data_blp
.get_current_ptr();
860 unsigned read_len
= std::min(bp
.length(), msg_left
);
861 ceph_assert(read_len
< std::numeric_limits
<int>::max());
862 data_blp
.advance(read_len
);
863 data
.append(bp
, 0, read_len
);
864 msg_left
-= read_len
;
866 return CONTINUE(read_message_data
);
869 CtPtr
ProtocolV1::read_message_footer() {
870 ldout(cct
, 20) << __func__
<< dendl
;
872 state
= READ_FOOTER_AND_DISPATCH
;
875 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
876 len
= sizeof(ceph_msg_footer
);
878 len
= sizeof(ceph_msg_footer_old
);
881 return READ(len
, handle_message_footer
);
884 CtPtr
ProtocolV1::handle_message_footer(char *buffer
, int r
) {
885 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
888 ldout(cct
, 1) << __func__
<< " read footer data error " << dendl
;
892 ceph_msg_footer footer
;
893 ceph_msg_footer_old old_footer
;
895 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
896 footer
= *((ceph_msg_footer
*)buffer
);
898 old_footer
= *((ceph_msg_footer_old
*)buffer
);
899 footer
.front_crc
= old_footer
.front_crc
;
900 footer
.middle_crc
= old_footer
.middle_crc
;
901 footer
.data_crc
= old_footer
.data_crc
;
903 footer
.flags
= old_footer
.flags
;
906 int aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
907 ldout(cct
, 10) << __func__
<< " aborted = " << aborted
<< dendl
;
909 ldout(cct
, 0) << __func__
<< " got " << front
.length() << " + "
910 << middle
.length() << " + " << data
.length()
911 << " byte message.. ABORTED" << dendl
;
915 ldout(cct
, 20) << __func__
<< " got " << front
.length() << " + "
916 << middle
.length() << " + " << data
.length() << " byte message"
918 Message
*message
= decode_message(cct
, messenger
->crcflags
, current_header
,
919 footer
, front
, middle
, data
, connection
);
921 ldout(cct
, 1) << __func__
<< " decode message failed " << dendl
;
926 // Check the signature if one should be present. A zero return indicates
930 if (session_security
.get() == NULL
) {
931 ldout(cct
, 10) << __func__
<< " no session security set" << dendl
;
933 if (session_security
->check_message_signature(message
)) {
934 ldout(cct
, 0) << __func__
<< " Signature check failed" << dendl
;
939 message
->set_byte_throttler(connection
->policy
.throttler_bytes
);
940 message
->set_message_throttler(connection
->policy
.throttler_messages
);
942 // store reservation size in message, so we don't get confused
943 // by messages entering the dispatch queue through other paths.
944 message
->set_dispatch_throttle_size(cur_msg_size
);
946 message
->set_recv_stamp(recv_stamp
);
947 message
->set_throttle_stamp(throttle_stamp
);
948 message
->set_recv_complete_stamp(ceph_clock_now());
950 // check received seq#. if it is old, drop the message.
951 // note that incoming messages may skip ahead. this is convenient for the
952 // client side queueing because messages can't be renumbered, but the (kernel)
953 // client will occasionally pull a message out of the sent queue to send
954 // elsewhere. in that case it doesn't matter if we "got" it or not.
955 uint64_t cur_seq
= in_seq
;
956 if (message
->get_seq() <= cur_seq
) {
957 ldout(cct
, 0) << __func__
<< " got old message " << message
->get_seq()
958 << " <= " << cur_seq
<< " " << message
<< " " << *message
959 << ", discarding" << dendl
;
961 if (connection
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
962 cct
->_conf
->ms_die_on_old_message
) {
963 ceph_assert(0 == "old msgs despite reconnect_seq feature");
967 if (message
->get_seq() > cur_seq
+ 1) {
968 ldout(cct
, 0) << __func__
<< " missed message? skipped from seq "
969 << cur_seq
<< " to " << message
->get_seq() << dendl
;
970 if (cct
->_conf
->ms_die_on_skipped_message
) {
971 ceph_assert(0 == "skipped incoming seq");
975 message
->set_connection(connection
);
977 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
978 if (message
->get_type() == CEPH_MSG_OSD_OP
||
979 message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
980 utime_t ltt_processed_stamp
= ceph_clock_now();
981 double usecs_elapsed
=
982 (ltt_processed_stamp
.to_nsec() - ltt_recv_stamp
.to_nsec()) / 1000;
984 if (message
->get_type() == CEPH_MSG_OSD_OP
)
985 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP",
988 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY",
993 // note last received message.
994 in_seq
= message
->get_seq();
995 ldout(cct
, 5) << " rx " << message
->get_source() << " seq "
996 << message
->get_seq() << " " << message
<< " " << *message
999 bool need_dispatch_writer
= false;
1000 if (!connection
->policy
.lossy
) {
1002 need_dispatch_writer
= true;
1007 connection
->logger
->inc(l_msgr_recv_messages
);
1008 connection
->logger
->inc(
1010 cur_msg_size
+ sizeof(ceph_msg_header
) + sizeof(ceph_msg_footer
));
1012 messenger
->ms_fast_preprocess(message
);
1013 auto fast_dispatch_time
= ceph::mono_clock::now();
1014 connection
->logger
->tinc(l_msgr_running_recv_time
,
1015 fast_dispatch_time
- connection
->recv_start_time
);
1016 if (connection
->delay_state
) {
1017 double delay_period
= 0;
1018 if (rand() % 10000 < cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1020 cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1021 ldout(cct
, 1) << "queue_received will delay after "
1022 << (ceph_clock_now() + delay_period
) << " on " << message
1023 << " " << *message
<< dendl
;
1025 connection
->delay_state
->queue(delay_period
, message
);
1026 } else if (messenger
->ms_can_fast_dispatch(message
)) {
1027 connection
->lock
.unlock();
1028 connection
->dispatch_queue
->fast_dispatch(message
);
1029 connection
->recv_start_time
= ceph::mono_clock::now();
1030 connection
->logger
->tinc(l_msgr_running_fast_dispatch_time
,
1031 connection
->recv_start_time
- fast_dispatch_time
);
1032 connection
->lock
.lock();
1034 connection
->dispatch_queue
->enqueue(message
, message
->get_priority(),
1035 connection
->conn_id
);
1038 // clean up local buffer references
1044 if (need_dispatch_writer
&& connection
->is_connected()) {
1045 connection
->center
->dispatch_event_external(connection
->write_handler
);
1048 return CONTINUE(wait_message
);
1051 void ProtocolV1::session_reset() {
1052 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1054 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1055 if (connection
->delay_state
) {
1056 connection
->delay_state
->discard();
1059 connection
->dispatch_queue
->discard_queue(connection
->conn_id
);
1060 discard_out_queue();
1061 // note: we need to clear outcoming_bl here, but session_reset may be
1062 // called by other thread, so let caller clear this itself!
1063 // outcoming_bl.clear();
1065 connection
->dispatch_queue
->queue_remote_reset(connection
);
1067 randomize_out_seq();
1071 // it's safe to directly set 0, double locked
1074 can_write
= WriteStatus::NOWRITE
;
1077 void ProtocolV1::randomize_out_seq() {
1078 if (connection
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1079 // Set out_seq to a random value, so CRC won't be predictable.
1080 auto rand_seq
= ceph::util::generate_random_number
<uint64_t>(0, SEQ_MASK
);
1081 ldout(cct
, 10) << __func__
<< " randomize_out_seq " << rand_seq
<< dendl
;
1084 // previously, seq #'s always started at 0.
1089 ssize_t
ProtocolV1::write_message(Message
*m
, bufferlist
&bl
, bool more
) {
1091 ceph_assert(connection
->center
->in_thread());
1092 m
->set_seq(++out_seq
);
1094 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1095 m
->calc_header_crc();
1098 ceph_msg_header
&header
= m
->get_header();
1099 ceph_msg_footer
&footer
= m
->get_footer();
1101 // TODO: let sign_message could be reentry?
1102 // Now that we have all the crcs calculated, handle the
1103 // digital signature for the message, if the AsyncConnection has session
1104 // security set up. Some session security options do not
1105 // actually calculate and check the signature, but they should
1106 // handle the calls to sign_message and check_signature. PLR
1107 if (session_security
.get() == NULL
) {
1108 ldout(cct
, 20) << __func__
<< " no session security" << dendl
;
1110 if (session_security
->sign_message(m
)) {
1111 ldout(cct
, 20) << __func__
<< " failed to sign m=" << m
1112 << "): sig = " << footer
.sig
<< dendl
;
1114 ldout(cct
, 20) << __func__
<< " signed m=" << m
1115 << "): sig = " << footer
.sig
<< dendl
;
1119 connection
->outcoming_bl
.append(CEPH_MSGR_TAG_MSG
);
1120 connection
->outcoming_bl
.append((char *)&header
, sizeof(header
));
1122 ldout(cct
, 20) << __func__
<< " sending message type=" << header
.type
1123 << " src " << entity_name_t(header
.src
)
1124 << " front=" << header
.front_len
<< " data=" << header
.data_len
1125 << " off " << header
.data_off
<< dendl
;
1127 if ((bl
.length() <= ASYNC_COALESCE_THRESHOLD
) && (bl
.buffers().size() > 1)) {
1128 for (const auto &pb
: bl
.buffers()) {
1129 connection
->outcoming_bl
.append((char *)pb
.c_str(), pb
.length());
1132 connection
->outcoming_bl
.claim_append(bl
);
1135 // send footer; if receiver doesn't support signatures, use the old footer
1137 ceph_msg_footer_old old_footer
;
1138 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
1139 connection
->outcoming_bl
.append((char *)&footer
, sizeof(footer
));
1141 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1142 old_footer
.front_crc
= footer
.front_crc
;
1143 old_footer
.middle_crc
= footer
.middle_crc
;
1144 old_footer
.data_crc
= footer
.data_crc
;
1146 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
1148 old_footer
.data_crc
=
1149 messenger
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
1150 old_footer
.flags
= footer
.flags
;
1151 connection
->outcoming_bl
.append((char *)&old_footer
, sizeof(old_footer
));
1154 m
->trace
.event("async writing message");
1155 ldout(cct
, 20) << __func__
<< " sending " << m
->get_seq() << " " << m
1157 ssize_t total_send_size
= connection
->outcoming_bl
.length();
1158 ssize_t rc
= connection
->_try_send(more
);
1160 ldout(cct
, 1) << __func__
<< " error sending " << m
<< ", "
1161 << cpp_strerror(rc
) << dendl
;
1163 connection
->logger
->inc(
1164 l_msgr_send_bytes
, total_send_size
- connection
->outcoming_bl
.length());
1165 ldout(cct
, 10) << __func__
<< " sending " << m
1166 << (rc
? " continuely." : " done.") << dendl
;
1168 if (m
->get_type() == CEPH_MSG_OSD_OP
)
1169 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
1170 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
1171 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
1177 void ProtocolV1::requeue_sent() {
1182 list
<pair
<bufferlist
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1183 out_seq
-= sent
.size();
1184 while (!sent
.empty()) {
1185 Message
*m
= sent
.back();
1187 ldout(cct
, 10) << __func__
<< " " << *m
<< " for resend "
1188 << " (" << m
->get_seq() << ")" << dendl
;
1189 rq
.push_front(make_pair(bufferlist(), m
));
1193 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
) {
1194 ldout(cct
, 10) << __func__
<< " " << seq
<< dendl
;
1195 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1196 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
1199 list
<pair
<bufferlist
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1200 uint64_t count
= out_seq
;
1201 while (!rq
.empty()) {
1202 pair
<bufferlist
, Message
*> p
= rq
.front();
1203 if (p
.second
->get_seq() == 0 || p
.second
->get_seq() > seq
) break;
1204 ldout(cct
, 10) << __func__
<< " " << *(p
.second
) << " for resend seq "
1205 << p
.second
->get_seq() << " <= " << seq
<< ", discarding"
1211 if (rq
.empty()) out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1216 * Tears down the message queues, and removes them from the
1217 * DispatchQueue Must hold write_lock prior to calling.
1219 void ProtocolV1::discard_out_queue() {
1220 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1222 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1223 ldout(cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
1227 for (map
<int, list
<pair
<bufferlist
, Message
*> > >::iterator p
=
1229 p
!= out_q
.end(); ++p
) {
1230 for (list
<pair
<bufferlist
, Message
*> >::iterator r
= p
->second
.begin();
1231 r
!= p
->second
.end(); ++r
) {
1232 ldout(cct
, 20) << __func__
<< " discard " << r
->second
<< dendl
;
1239 void ProtocolV1::reset_recv_state() {
1240 // clean up state internal variables and states
1241 if (state
== CONNECTING_SEND_CONNECT_MSG
) {
1245 authorizer
= nullptr;
1248 // clean read and write callbacks
1249 connection
->pendingReadLen
.reset();
1250 connection
->writeCallback
.reset();
1252 if (state
> THROTTLE_MESSAGE
&& state
<= READ_FOOTER_AND_DISPATCH
&&
1253 connection
->policy
.throttler_messages
) {
1254 ldout(cct
, 10) << __func__
<< " releasing " << 1
1255 << " message to policy throttler "
1256 << connection
->policy
.throttler_messages
->get_current()
1257 << "/" << connection
->policy
.throttler_messages
->get_max()
1259 connection
->policy
.throttler_messages
->put();
1261 if (state
> THROTTLE_BYTES
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1262 if (connection
->policy
.throttler_bytes
) {
1263 ldout(cct
, 10) << __func__
<< " releasing " << cur_msg_size
1264 << " bytes to policy throttler "
1265 << connection
->policy
.throttler_bytes
->get_current() << "/"
1266 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
1267 connection
->policy
.throttler_bytes
->put(cur_msg_size
);
1270 if (state
> THROTTLE_DISPATCH_QUEUE
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1272 << __func__
<< " releasing " << cur_msg_size
1273 << " bytes to dispatch_queue throttler "
1274 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
1275 << connection
->dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
1276 connection
->dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
1280 Message
*ProtocolV1::_get_next_outgoing(bufferlist
*bl
) {
1282 if (!out_q
.empty()) {
1283 map
<int, list
<pair
<bufferlist
, Message
*> > >::reverse_iterator it
=
1285 ceph_assert(!it
->second
.empty());
1286 list
<pair
<bufferlist
, Message
*> >::iterator p
= it
->second
.begin();
1288 if (bl
) bl
->swap(p
->first
);
1289 it
->second
.erase(p
);
1290 if (it
->second
.empty()) out_q
.erase(it
->first
);
1296 * Client Protocol V1
1299 CtPtr
ProtocolV1::send_client_banner() {
1300 ldout(cct
, 20) << __func__
<< dendl
;
1304 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1305 return WRITE(bl
, handle_client_banner_write
);
1308 CtPtr
ProtocolV1::handle_client_banner_write(int r
) {
1309 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1312 ldout(cct
, 1) << __func__
<< " write client banner failed" << dendl
;
1315 ldout(cct
, 10) << __func__
<< " connect write banner done: "
1316 << connection
->get_peer_addr() << dendl
;
1318 return wait_server_banner();
1321 CtPtr
ProtocolV1::wait_server_banner() {
1322 state
= CONNECTING_WAIT_BANNER_AND_IDENTIFY
;
1324 ldout(cct
, 20) << __func__
<< dendl
;
1326 bufferlist myaddrbl
;
1327 unsigned banner_len
= strlen(CEPH_BANNER
);
1328 unsigned need_len
= banner_len
+ sizeof(ceph_entity_addr
) * 2;
1329 return READ(need_len
, handle_server_banner_and_identify
);
1332 CtPtr
ProtocolV1::handle_server_banner_and_identify(char *buffer
, int r
) {
1333 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1336 ldout(cct
, 1) << __func__
<< " read banner and identify addresses failed"
1341 unsigned banner_len
= strlen(CEPH_BANNER
);
1342 if (memcmp(buffer
, CEPH_BANNER
, banner_len
)) {
1343 ldout(cct
, 0) << __func__
<< " connect protocol error (bad banner) on peer "
1344 << connection
->get_peer_addr() << dendl
;
1349 entity_addr_t paddr
, peer_addr_for_me
;
1351 bl
.append(buffer
+ banner_len
, sizeof(ceph_entity_addr
) * 2);
1352 auto p
= bl
.cbegin();
1355 decode(peer_addr_for_me
, p
);
1356 } catch (const buffer::error
&e
) {
1357 lderr(cct
) << __func__
<< " decode peer addr failed " << dendl
;
1360 ldout(cct
, 20) << __func__
<< " connect read peer addr " << paddr
1361 << " on socket " << connection
->cs
.fd() << dendl
;
1363 entity_addr_t peer_addr
= connection
->peer_addrs
->legacy_addr();
1364 if (peer_addr
!= paddr
) {
1365 if (paddr
.is_blank_ip() && peer_addr
.get_port() == paddr
.get_port() &&
1366 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1367 ldout(cct
, 0) << __func__
<< " connect claims to be " << paddr
<< " not "
1368 << peer_addr
<< " - presumably this is the same node!"
1371 ldout(cct
, 10) << __func__
<< " connect claims to be " << paddr
<< " not "
1372 << peer_addr
<< dendl
;
1377 ldout(cct
, 20) << __func__
<< " connect peer addr for me is "
1378 << peer_addr_for_me
<< dendl
;
1379 if (messenger
->get_myaddrs().empty() ||
1380 messenger
->get_myaddrs().front().is_blank_ip()) {
1381 sockaddr_storage ss
;
1382 socklen_t len
= sizeof(ss
);
1383 getsockname(connection
->cs
.fd(), (sockaddr
*)&ss
, &len
);
1385 if (cct
->_conf
->ms_learn_addr_from_peer
) {
1386 ldout(cct
, 1) << __func__
<< " peer " << connection
->target_addr
1387 << " says I am " << peer_addr_for_me
<< " (socket says "
1388 << (sockaddr
*)&ss
<< ")" << dendl
;
1389 a
= peer_addr_for_me
;
1391 ldout(cct
, 1) << __func__
<< " socket to " << connection
->target_addr
1392 << " says I am " << (sockaddr
*)&ss
1393 << " (peer says " << peer_addr_for_me
<< ")" << dendl
;
1394 a
.set_sockaddr((sockaddr
*)&ss
);
1396 a
.set_type(entity_addr_t::TYPE_LEGACY
); // anything but NONE; learned_addr ignores this
1398 connection
->lock
.unlock();
1399 messenger
->learned_addr(a
);
1400 if (cct
->_conf
->ms_inject_internal_delays
&&
1401 cct
->_conf
->ms_inject_socket_failures
) {
1402 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
1403 ldout(cct
, 10) << __func__
<< " sleep for "
1404 << cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1406 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
1410 connection
->lock
.lock();
1411 if (state
!= CONNECTING_WAIT_BANNER_AND_IDENTIFY
) {
1412 ldout(cct
, 1) << __func__
1413 << " state changed while learned_addr, mark_down or "
1414 << " replacing must be happened just now" << dendl
;
1419 bufferlist myaddrbl
;
1420 encode(messenger
->get_myaddr_legacy(), myaddrbl
, 0); // legacy
1421 return WRITE(myaddrbl
, handle_my_addr_write
);
1424 CtPtr
ProtocolV1::handle_my_addr_write(int r
) {
1425 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1428 ldout(cct
, 2) << __func__
<< " connect couldn't write my addr, "
1429 << cpp_strerror(r
) << dendl
;
1432 ldout(cct
, 10) << __func__
<< " connect sent my addr "
1433 << messenger
->get_myaddr_legacy() << dendl
;
1435 return CONTINUE(send_connect_message
);
1438 CtPtr
ProtocolV1::send_connect_message() {
1439 state
= CONNECTING_SEND_CONNECT_MSG
;
1441 ldout(cct
, 20) << __func__
<< dendl
;
1444 authorizer
= messenger
->ms_deliver_get_authorizer(connection
->peer_type
);
1447 ceph_msg_connect connect
;
1448 connect
.features
= connection
->policy
.features_supported
;
1449 connect
.host_type
= messenger
->get_myname().type();
1450 connect
.global_seq
= global_seq
;
1451 connect
.connect_seq
= connect_seq
;
1452 connect
.protocol_version
=
1453 messenger
->get_proto_version(connection
->peer_type
, true);
1454 connect
.authorizer_protocol
= authorizer
? authorizer
->protocol
: 0;
1455 connect
.authorizer_len
= authorizer
? authorizer
->bl
.length() : 0;
1458 ldout(cct
, 10) << __func__
1459 << " connect_msg.authorizer_len=" << connect
.authorizer_len
1460 << " protocol=" << connect
.authorizer_protocol
<< dendl
;
1464 if (connection
->policy
.lossy
) {
1466 CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1470 bl
.append((char *)&connect
, sizeof(connect
));
1472 bl
.append(authorizer
->bl
.c_str(), authorizer
->bl
.length());
1475 ldout(cct
, 10) << __func__
<< " connect sending gseq=" << global_seq
1476 << " cseq=" << connect_seq
1477 << " proto=" << connect
.protocol_version
<< dendl
;
1479 return WRITE(bl
, handle_connect_message_write
);
1482 CtPtr
ProtocolV1::handle_connect_message_write(int r
) {
1483 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1486 ldout(cct
, 2) << __func__
<< " connect couldn't send reply "
1487 << cpp_strerror(r
) << dendl
;
1491 ldout(cct
, 20) << __func__
1492 << " connect wrote (self +) cseq, waiting for reply" << dendl
;
1494 return wait_connect_reply();
1497 CtPtr
ProtocolV1::wait_connect_reply() {
1498 ldout(cct
, 20) << __func__
<< dendl
;
1500 memset(&connect_reply
, 0, sizeof(connect_reply
));
1501 return READ(sizeof(connect_reply
), handle_connect_reply_1
);
1504 CtPtr
ProtocolV1::handle_connect_reply_1(char *buffer
, int r
) {
1505 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1508 ldout(cct
, 1) << __func__
<< " read connect reply failed" << dendl
;
1512 connect_reply
= *((ceph_msg_connect_reply
*)buffer
);
1514 ldout(cct
, 20) << __func__
<< " connect got reply tag "
1515 << (int)connect_reply
.tag
<< " connect_seq "
1516 << connect_reply
.connect_seq
<< " global_seq "
1517 << connect_reply
.global_seq
<< " proto "
1518 << connect_reply
.protocol_version
<< " flags "
1519 << (int)connect_reply
.flags
<< " features "
1520 << connect_reply
.features
<< dendl
;
1522 if (connect_reply
.authorizer_len
) {
1523 return wait_connect_reply_auth();
1526 return handle_connect_reply_2();
1529 CtPtr
ProtocolV1::wait_connect_reply_auth() {
1530 ldout(cct
, 20) << __func__
<< dendl
;
1532 ldout(cct
, 10) << __func__
1533 << " reply.authorizer_len=" << connect_reply
.authorizer_len
1536 ceph_assert(connect_reply
.authorizer_len
< 4096);
1538 return READ(connect_reply
.authorizer_len
, handle_connect_reply_auth
);
1541 CtPtr
ProtocolV1::handle_connect_reply_auth(char *buffer
, int r
) {
1542 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1545 ldout(cct
, 1) << __func__
<< " read connect reply authorizer failed"
1550 bufferlist authorizer_reply
;
1551 authorizer_reply
.append(buffer
, connect_reply
.authorizer_len
);
1553 if (connect_reply
.tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
) {
1554 ldout(cct
, 10) << __func__
<< " connect got auth challenge" << dendl
;
1555 authorizer
->add_challenge(cct
, authorizer_reply
);
1556 return CONTINUE(send_connect_message
);
1559 auto iter
= authorizer_reply
.cbegin();
1560 if (authorizer
&& !authorizer
->verify_reply(iter
,
1561 nullptr /* connection_secret */)) {
1562 ldout(cct
, 0) << __func__
<< " failed verifying authorize reply" << dendl
;
1566 return handle_connect_reply_2();
1569 CtPtr
ProtocolV1::handle_connect_reply_2() {
1570 ldout(cct
, 20) << __func__
<< dendl
;
1572 if (connect_reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1573 ldout(cct
, 0) << __func__
<< " connect protocol feature mismatch, my "
1574 << std::hex
<< connection
->policy
.features_supported
1575 << " < peer " << connect_reply
.features
<< " missing "
1576 << (connect_reply
.features
&
1577 ~connection
->policy
.features_supported
)
1578 << std::dec
<< dendl
;
1582 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1583 ldout(cct
, 0) << __func__
<< " connect protocol version mismatch, my "
1584 << messenger
->get_proto_version(connection
->peer_type
, true)
1585 << " != " << connect_reply
.protocol_version
<< dendl
;
1589 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1590 ldout(cct
, 0) << __func__
<< " connect got BADAUTHORIZER" << dendl
;
1594 if (connect_reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1595 ldout(cct
, 0) << __func__
<< " connect got RESETSESSION" << dendl
;
1599 // see session_reset
1600 connection
->outcoming_bl
.clear();
1602 return CONTINUE(send_connect_message
);
1605 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1606 global_seq
= messenger
->get_global_seq(connect_reply
.global_seq
);
1607 ldout(cct
, 5) << __func__
<< " connect got RETRY_GLOBAL "
1608 << connect_reply
.global_seq
<< " chose new " << global_seq
1610 return CONTINUE(send_connect_message
);
1613 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1614 ceph_assert(connect_reply
.connect_seq
> connect_seq
);
1615 ldout(cct
, 5) << __func__
<< " connect got RETRY_SESSION " << connect_seq
1616 << " -> " << connect_reply
.connect_seq
<< dendl
;
1617 connect_seq
= connect_reply
.connect_seq
;
1618 return CONTINUE(send_connect_message
);
1621 if (connect_reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1622 ldout(cct
, 1) << __func__
<< " connect got WAIT (connection race)" << dendl
;
1627 uint64_t feat_missing
;
1629 connection
->policy
.features_required
& ~(uint64_t)connect_reply
.features
;
1631 ldout(cct
, 1) << __func__
<< " missing required features " << std::hex
1632 << feat_missing
<< std::dec
<< dendl
;
1636 if (connect_reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1639 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1642 return wait_ack_seq();
1645 if (connect_reply
.tag
== CEPH_MSGR_TAG_READY
) {
1646 ldout(cct
, 10) << __func__
<< " got CEPH_MSGR_TAG_READY " << dendl
;
1649 return client_ready();
1652 CtPtr
ProtocolV1::wait_ack_seq() {
1653 ldout(cct
, 20) << __func__
<< dendl
;
1655 return READ(sizeof(uint64_t), handle_ack_seq
);
1658 CtPtr
ProtocolV1::handle_ack_seq(char *buffer
, int r
) {
1659 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1662 ldout(cct
, 1) << __func__
<< " read connect ack seq failed" << dendl
;
1666 uint64_t newly_acked_seq
= 0;
1668 newly_acked_seq
= *((uint64_t *)buffer
);
1669 ldout(cct
, 2) << __func__
<< " got newly_acked_seq " << newly_acked_seq
1670 << " vs out_seq " << out_seq
<< dendl
;
1671 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
1674 uint64_t s
= in_seq
;
1675 bl
.append((char *)&s
, sizeof(s
));
1677 return WRITE(bl
, handle_in_seq_write
);
1680 CtPtr
ProtocolV1::handle_in_seq_write(int r
) {
1681 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1684 ldout(cct
, 10) << __func__
<< " failed to send in_seq " << dendl
;
1688 ldout(cct
, 10) << __func__
<< " send in_seq done " << dendl
;
1690 return client_ready();
1693 CtPtr
ProtocolV1::client_ready() {
1694 ldout(cct
, 20) << __func__
<< dendl
;
1697 peer_global_seq
= connect_reply
.global_seq
;
1698 connection
->policy
.lossy
= connect_reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1702 ceph_assert(connect_seq
== connect_reply
.connect_seq
);
1703 backoff
= utime_t();
1704 connection
->set_features((uint64_t)connect_reply
.features
&
1705 (uint64_t)connection
->policy
.features_supported
);
1706 ldout(cct
, 10) << __func__
<< " connect success " << connect_seq
1707 << ", lossy = " << connection
->policy
.lossy
<< ", features "
1708 << connection
->get_features() << dendl
;
1710 // If we have an authorizer, get a new AuthSessionHandler to deal with
1711 // ongoing security of the connection. PLR
1712 if (authorizer
!= NULL
) {
1713 ldout(cct
, 10) << __func__
<< " setting up session_security with auth "
1714 << authorizer
<< dendl
;
1715 session_security
.reset(get_auth_session_handler(
1716 cct
, authorizer
->protocol
,
1717 authorizer
->session_key
,
1718 connection
->get_features()));
1720 // We have no authorizer, so we shouldn't be applying security to messages
1721 // in this AsyncConnection. PLR
1722 ldout(cct
, 10) << __func__
<< " no authorizer, clearing session_security"
1724 session_security
.reset();
1727 if (connection
->delay_state
) {
1728 ceph_assert(connection
->delay_state
->ready());
1730 connection
->dispatch_queue
->queue_connect(connection
);
1731 messenger
->ms_deliver_handle_fast_connect(connection
);
1737 * Server Protocol V1
1740 CtPtr
ProtocolV1::send_server_banner() {
1741 ldout(cct
, 20) << __func__
<< dendl
;
1746 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1748 // as a server, we should have a legacy addr if we accepted this connection.
1749 auto legacy
= messenger
->get_myaddrs().legacy_addr();
1750 encode(legacy
, bl
, 0); // legacy
1751 connection
->port
= legacy
.get_port();
1752 encode(connection
->target_addr
, bl
, 0); // legacy
1754 ldout(cct
, 1) << __func__
<< " sd=" << connection
->cs
.fd()
1755 << " legacy " << legacy
1756 << " socket_addr " << connection
->socket_addr
1757 << " target_addr " << connection
->target_addr
1760 return WRITE(bl
, handle_server_banner_write
);
1763 CtPtr
ProtocolV1::handle_server_banner_write(int r
) {
1764 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1767 ldout(cct
, 1) << " write server banner failed" << dendl
;
1770 ldout(cct
, 10) << __func__
<< " write banner and addr done: "
1771 << connection
->get_peer_addr() << dendl
;
1773 return wait_client_banner();
1776 CtPtr
ProtocolV1::wait_client_banner() {
1777 ldout(cct
, 20) << __func__
<< dendl
;
1779 return READ(strlen(CEPH_BANNER
) + sizeof(ceph_entity_addr
),
1780 handle_client_banner
);
1783 CtPtr
ProtocolV1::handle_client_banner(char *buffer
, int r
) {
1784 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1787 ldout(cct
, 1) << __func__
<< " read peer banner and addr failed" << dendl
;
1791 if (memcmp(buffer
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1792 ldout(cct
, 1) << __func__
<< " accept peer sent bad banner '" << buffer
1793 << "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
1798 entity_addr_t peer_addr
;
1800 addr_bl
.append(buffer
+ strlen(CEPH_BANNER
), sizeof(ceph_entity_addr
));
1802 auto ti
= addr_bl
.cbegin();
1803 decode(peer_addr
, ti
);
1804 } catch (const buffer::error
&e
) {
1805 lderr(cct
) << __func__
<< " decode peer_addr failed " << dendl
;
1809 ldout(cct
, 10) << __func__
<< " accept peer addr is " << peer_addr
<< dendl
;
1810 if (peer_addr
.is_blank_ip()) {
1811 // peer apparently doesn't know what ip they have; figure it out for them.
1812 int port
= peer_addr
.get_port();
1813 peer_addr
.set_sockaddr(connection
->target_addr
.get_sockaddr());
1814 peer_addr
.set_port(port
);
1816 ldout(cct
, 0) << __func__
<< " accept peer addr is really " << peer_addr
1817 << " (socket is " << connection
->target_addr
<< ")" << dendl
;
1819 connection
->set_peer_addr(peer_addr
); // so that connection_state gets set up
1820 connection
->target_addr
= peer_addr
;
1822 return CONTINUE(wait_connect_message
);
1825 CtPtr
ProtocolV1::wait_connect_message() {
1826 ldout(cct
, 20) << __func__
<< dendl
;
1828 memset(&connect_msg
, 0, sizeof(connect_msg
));
1829 return READ(sizeof(connect_msg
), handle_connect_message_1
);
1832 CtPtr
ProtocolV1::handle_connect_message_1(char *buffer
, int r
) {
1833 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1836 ldout(cct
, 1) << __func__
<< " read connect msg failed" << dendl
;
1840 connect_msg
= *((ceph_msg_connect
*)buffer
);
1842 state
= ACCEPTING_WAIT_CONNECT_MSG_AUTH
;
1844 if (connect_msg
.authorizer_len
) {
1845 return wait_connect_message_auth();
1848 return handle_connect_message_2();
1851 CtPtr
ProtocolV1::wait_connect_message_auth() {
1852 ldout(cct
, 20) << __func__
<< dendl
;
1853 authorizer_buf
.clear();
1854 authorizer_buf
.push_back(buffer::create(connect_msg
.authorizer_len
));
1855 return READB(connect_msg
.authorizer_len
, authorizer_buf
.c_str(),
1856 handle_connect_message_auth
);
1859 CtPtr
ProtocolV1::handle_connect_message_auth(char *buffer
, int r
) {
1860 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1863 ldout(cct
, 1) << __func__
<< " read connect authorizer failed" << dendl
;
1867 return handle_connect_message_2();
1870 CtPtr
ProtocolV1::handle_connect_message_2() {
1871 ldout(cct
, 20) << __func__
<< dendl
;
1873 ldout(cct
, 20) << __func__
<< " accept got peer connect_seq "
1874 << connect_msg
.connect_seq
<< " global_seq "
1875 << connect_msg
.global_seq
<< dendl
;
1877 connection
->set_peer_type(connect_msg
.host_type
);
1878 connection
->policy
= messenger
->get_policy(connect_msg
.host_type
);
1880 ldout(cct
, 10) << __func__
<< " accept of host_type " << connect_msg
.host_type
1881 << ", policy.lossy=" << connection
->policy
.lossy
1882 << " policy.server=" << connection
->policy
.server
1883 << " policy.standby=" << connection
->policy
.standby
1884 << " policy.resetcheck=" << connection
->policy
.resetcheck
1885 << " features 0x" << std::hex
<< (uint64_t)connect_msg
.features
1889 ceph_msg_connect_reply reply
;
1890 bufferlist authorizer_reply
;
1892 memset(&reply
, 0, sizeof(reply
));
1893 reply
.protocol_version
=
1894 messenger
->get_proto_version(connection
->peer_type
, false);
1897 ldout(cct
, 10) << __func__
<< " accept my proto " << reply
.protocol_version
1898 << ", their proto " << connect_msg
.protocol_version
<< dendl
;
1900 if (connect_msg
.protocol_version
!= reply
.protocol_version
) {
1901 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER
, reply
,
1905 // require signatures for cephx?
1906 if (connect_msg
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
1907 if (connection
->peer_type
== CEPH_ENTITY_TYPE_OSD
||
1908 connection
->peer_type
== CEPH_ENTITY_TYPE_MDS
) {
1909 if (cct
->_conf
->cephx_require_signatures
||
1910 cct
->_conf
->cephx_cluster_require_signatures
) {
1913 << " using cephx, requiring MSG_AUTH feature bit for cluster"
1915 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
1918 if (cct
->_conf
->cephx_require_signatures
||
1919 cct
->_conf
->cephx_service_require_signatures
) {
1922 << " using cephx, requiring MSG_AUTH feature bit for service"
1924 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
1929 uint64_t feat_missing
=
1930 connection
->policy
.features_required
& ~(uint64_t)connect_msg
.features
;
1932 ldout(cct
, 1) << __func__
<< " peer missing required features " << std::hex
1933 << feat_missing
<< std::dec
<< dendl
;
1934 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES
, reply
,
1938 bufferlist auth_bl_copy
= authorizer_buf
;
1939 connection
->lock
.unlock();
1940 ldout(cct
,10) << __func__
<< " authorizor_protocol "
1941 << connect_msg
.authorizer_protocol
1942 << " len " << auth_bl_copy
.length()
1944 bool authorizer_valid
;
1945 bool need_challenge
= HAVE_FEATURE(connect_msg
.features
, CEPHX_V2
);
1946 bool had_challenge
= (bool)authorizer_challenge
;
1947 if (!messenger
->ms_deliver_verify_authorizer(
1948 connection
, connection
->peer_type
, connect_msg
.authorizer_protocol
,
1949 auth_bl_copy
, authorizer_reply
, authorizer_valid
, session_key
,
1950 nullptr /* connection_secret */,
1951 need_challenge
? &authorizer_challenge
: nullptr) ||
1952 !authorizer_valid
) {
1953 connection
->lock
.lock();
1954 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
1955 ldout(cct
, 1) << __func__
1956 << " state changed while accept, it must be mark_down"
1958 ceph_assert(state
== CLOSED
);
1962 if (need_challenge
&& !had_challenge
&& authorizer_challenge
) {
1963 ldout(cct
, 10) << __func__
<< ": challenging authorizer" << dendl
;
1964 ceph_assert(authorizer_reply
.length());
1965 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
,
1966 reply
, authorizer_reply
);
1968 ldout(cct
, 0) << __func__
<< ": got bad authorizer, auth_reply_len="
1969 << authorizer_reply
.length() << dendl
;
1970 session_security
.reset();
1971 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER
, reply
,
1976 // We've verified the authorizer for this AsyncConnection, so set up the
1977 // session security structure. PLR
1978 ldout(cct
, 10) << __func__
<< " accept setting up session_security." << dendl
;
1981 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
1983 connection
->inject_delay();
1985 connection
->lock
.lock();
1986 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
1987 ldout(cct
, 1) << __func__
1988 << " state changed while accept, it must be mark_down"
1990 ceph_assert(state
== CLOSED
);
1994 if (existing
== connection
) {
1997 if (existing
&& existing
->protocol
->proto_type
!= 1) {
1998 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
1999 << existing
->protocol
.get() << " version is "
2000 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
2001 existing
->mark_down();
2006 // There is no possible that existing connection will acquire this
2007 // connection's lock
2008 existing
->lock
.lock(); // skip lockdep check (we are locking a second
2009 // AsyncConnection here)
2011 ldout(cct
,10) << __func__
<< " existing=" << existing
<< " exproto="
2012 << existing
->protocol
.get() << dendl
;
2013 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
2014 ceph_assert(exproto
);
2015 ceph_assert(exproto
->proto_type
== 1);
2017 if (exproto
->state
== CLOSED
) {
2018 ldout(cct
, 1) << __func__
<< " existing " << existing
2019 << " already closed." << dendl
;
2020 existing
->lock
.unlock();
2023 return open(reply
, authorizer_reply
);
2026 if (exproto
->replacing
) {
2027 ldout(cct
, 1) << __func__
2028 << " existing racing replace happened while replacing."
2029 << " existing_state="
2030 << connection
->get_state_name(existing
->state
) << dendl
;
2031 reply
.global_seq
= exproto
->peer_global_seq
;
2032 existing
->lock
.unlock();
2033 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2037 if (connect_msg
.global_seq
< exproto
->peer_global_seq
) {
2038 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2039 << exproto
->peer_global_seq
<< " > "
2040 << connect_msg
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
2041 reply
.global_seq
= exproto
->peer_global_seq
; // so we can send it below..
2042 existing
->lock
.unlock();
2043 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2046 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2047 << exproto
->peer_global_seq
2048 << " <= " << connect_msg
.global_seq
<< ", looks ok"
2052 if (existing
->policy
.lossy
) {
2055 << " accept replacing existing (lossy) channel (new one lossy="
2056 << connection
->policy
.lossy
<< ")" << dendl
;
2057 exproto
->session_reset();
2058 return replace(existing
, reply
, authorizer_reply
);
2061 ldout(cct
, 1) << __func__
<< " accept connect_seq "
2062 << connect_msg
.connect_seq
2063 << " vs existing csq=" << exproto
->connect_seq
2064 << " existing_state="
2065 << connection
->get_state_name(existing
->state
) << dendl
;
2067 if (connect_msg
.connect_seq
== 0 && exproto
->connect_seq
> 0) {
2070 << " accept peer reset, then tried to connect to us, replacing"
2072 // this is a hard reset from peer
2073 is_reset_from_peer
= true;
2074 if (connection
->policy
.resetcheck
) {
2075 exproto
->session_reset(); // this resets out_queue, msg_ and
2078 return replace(existing
, reply
, authorizer_reply
);
2081 if (connect_msg
.connect_seq
< exproto
->connect_seq
) {
2082 // old attempt, or we sent READY but they didn't get it.
2083 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".cseq "
2084 << exproto
->connect_seq
<< " > " << connect_msg
.connect_seq
2085 << ", RETRY_SESSION" << dendl
;
2086 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2087 existing
->lock
.unlock();
2088 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2092 if (connect_msg
.connect_seq
== exproto
->connect_seq
) {
2093 // if the existing connection successfully opened, and/or
2094 // subsequently went to standby, then the peer should bump
2095 // their connect_seq and retry: this is not a connection race
2096 // we need to resolve here.
2097 if (exproto
->state
== OPENED
|| exproto
->state
== STANDBY
) {
2098 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2099 << existing
<< ".cseq " << exproto
->connect_seq
2100 << " == " << connect_msg
.connect_seq
2101 << ", OPEN|STANDBY, RETRY_SESSION " << dendl
;
2102 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2104 if (connection
->policy
.resetcheck
&& exproto
->connect_seq
== 0) {
2105 return replace(existing
, reply
, authorizer_reply
);
2108 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2109 existing
->lock
.unlock();
2110 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2115 if (connection
->peer_addrs
->legacy_addr() < messenger
->get_myaddr_legacy() ||
2116 existing
->policy
.server
) {
2118 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2119 << existing
<< ".cseq " << exproto
->connect_seq
2120 << " == " << connect_msg
.connect_seq
2121 << ", or we are server, replacing my attempt" << dendl
;
2122 return replace(existing
, reply
, authorizer_reply
);
2124 // our existing outgoing wins
2125 ldout(messenger
->cct
, 10)
2126 << __func__
<< " accept connection race, existing " << existing
2127 << ".cseq " << exproto
->connect_seq
2128 << " == " << connect_msg
.connect_seq
<< ", sending WAIT" << dendl
;
2129 ceph_assert(connection
->peer_addrs
->legacy_addr() >
2130 messenger
->get_myaddr_legacy());
2131 existing
->lock
.unlock();
2132 // make sure we follow through with opening the existing
2133 // connection (if it isn't yet open) since we know the peer
2134 // has something to send to us.
2135 existing
->send_keepalive();
2136 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT
, reply
,
2141 ceph_assert(connect_msg
.connect_seq
> exproto
->connect_seq
);
2142 ceph_assert(connect_msg
.global_seq
>= exproto
->peer_global_seq
);
2143 if (connection
->policy
.resetcheck
&& // RESETSESSION only used by servers;
2144 // peers do not reset each other
2145 exproto
->connect_seq
== 0) {
2146 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2147 << connect_msg
.connect_seq
<< ", " << existing
2148 << ".cseq = " << exproto
->connect_seq
2149 << "), sending RESETSESSION " << dendl
;
2150 existing
->lock
.unlock();
2151 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2156 ldout(cct
, 10) << __func__
<< " accept peer sent cseq "
2157 << connect_msg
.connect_seq
<< " > " << exproto
->connect_seq
2159 return replace(existing
, reply
, authorizer_reply
);
2161 else if (!replacing
&& connect_msg
.connect_seq
> 0) {
2162 // we reset, and they are opening a new session
2163 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2164 << connect_msg
.connect_seq
<< "), sending RESETSESSION"
2166 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2170 ldout(cct
, 10) << __func__
<< " accept new session" << dendl
;
2172 return open(reply
, authorizer_reply
);
2176 CtPtr
ProtocolV1::send_connect_message_reply(char tag
,
2177 ceph_msg_connect_reply
&reply
,
2178 bufferlist
&authorizer_reply
) {
2179 ldout(cct
, 20) << __func__
<< dendl
;
2180 bufferlist reply_bl
;
2183 ((uint64_t)connect_msg
.features
& connection
->policy
.features_supported
) |
2184 connection
->policy
.features_required
;
2185 reply
.authorizer_len
= authorizer_reply
.length();
2186 reply_bl
.append((char *)&reply
, sizeof(reply
));
2188 ldout(cct
, 10) << __func__
<< " reply features 0x" << std::hex
2189 << reply
.features
<< " = (policy sup 0x"
2190 << connection
->policy
.features_supported
2191 << " & connect 0x" << (uint64_t)connect_msg
.features
2192 << ") | policy req 0x"
2193 << connection
->policy
.features_required
2196 if (reply
.authorizer_len
) {
2197 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2198 authorizer_reply
.clear();
2201 return WRITE(reply_bl
, handle_connect_message_reply_write
);
2204 CtPtr
ProtocolV1::handle_connect_message_reply_write(int r
) {
2205 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2208 ldout(cct
, 1) << " write connect message reply failed" << dendl
;
2209 connection
->inject_delay();
2213 return CONTINUE(wait_connect_message
);
2216 CtPtr
ProtocolV1::replace(AsyncConnectionRef existing
,
2217 ceph_msg_connect_reply
&reply
,
2218 bufferlist
&authorizer_reply
) {
2219 ldout(cct
, 10) << __func__
<< " accept replacing " << existing
<< dendl
;
2221 connection
->inject_delay();
2222 if (existing
->policy
.lossy
) {
2223 // disconnect from the Connection
2224 ldout(cct
, 1) << __func__
<< " replacing on lossy channel, failing existing"
2226 existing
->protocol
->stop();
2227 existing
->dispatch_queue
->queue_reset(existing
.get());
2229 ceph_assert(can_write
== WriteStatus::NOWRITE
);
2230 existing
->write_lock
.lock();
2232 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
2234 // reset the in_seq if this is a hard reset from peer,
2235 // otherwise we respect our original connection's value
2236 if (is_reset_from_peer
) {
2237 exproto
->is_reset_from_peer
= true;
2240 connection
->center
->delete_file_event(connection
->cs
.fd(),
2241 EVENT_READABLE
| EVENT_WRITABLE
);
2243 if (existing
->delay_state
) {
2244 existing
->delay_state
->flush();
2245 ceph_assert(!connection
->delay_state
);
2247 exproto
->reset_recv_state();
2249 exproto
->connect_msg
.features
= connect_msg
.features
;
2251 auto temp_cs
= std::move(connection
->cs
);
2252 EventCenter
*new_center
= connection
->center
;
2253 Worker
*new_worker
= connection
->worker
;
2254 // avoid _stop shutdown replacing socket
2255 // queue a reset on the new connection, which we're dumping for the old
2258 connection
->dispatch_queue
->queue_reset(connection
);
2259 ldout(messenger
->cct
, 1)
2260 << __func__
<< " stop myself to swap existing" << dendl
;
2261 exproto
->can_write
= WriteStatus::REPLACING
;
2262 exproto
->replacing
= true;
2263 existing
->state_offset
= 0;
2264 // avoid previous thread modify event
2265 exproto
->state
= NONE
;
2266 existing
->state
= AsyncConnection::STATE_NONE
;
2267 // Discard existing prefetch buffer in `recv_buf`
2268 existing
->recv_start
= existing
->recv_end
= 0;
2269 // there shouldn't exist any buffer
2270 ceph_assert(connection
->recv_start
== connection
->recv_end
);
2272 exproto
->authorizer_challenge
.reset();
2274 auto deactivate_existing
= std::bind(
2275 [existing
, new_worker
, new_center
, exproto
, reply
,
2276 authorizer_reply
](ConnectedSocket
&cs
) mutable {
2277 // we need to delete time event in original thread
2279 std::lock_guard
<std::mutex
> l(existing
->lock
);
2280 existing
->write_lock
.lock();
2281 exproto
->requeue_sent();
2282 existing
->outcoming_bl
.clear();
2283 existing
->open_write
= false;
2284 existing
->write_lock
.unlock();
2285 if (exproto
->state
== NONE
) {
2286 existing
->shutdown_socket();
2287 existing
->cs
= std::move(cs
);
2288 existing
->worker
->references
--;
2289 new_worker
->references
++;
2290 existing
->logger
= new_worker
->get_perf_counter();
2291 existing
->worker
= new_worker
;
2292 existing
->center
= new_center
;
2293 if (existing
->delay_state
)
2294 existing
->delay_state
->set_center(new_center
);
2295 } else if (exproto
->state
== CLOSED
) {
2296 auto back_to_close
=
2297 std::bind([](ConnectedSocket
&cs
) mutable { cs
.close(); },
2299 new_center
->submit_to(new_center
->get_id(),
2300 std::move(back_to_close
), true);
2307 // Before changing existing->center, it may already exists some
2308 // events in existing->center's queue. Then if we mark down
2309 // `existing`, it will execute in another thread and clean up
2310 // connection. Previous event will result in segment fault
2311 auto transfer_existing
= [existing
, exproto
, reply
,
2312 authorizer_reply
]() mutable {
2313 std::lock_guard
<std::mutex
> l(existing
->lock
);
2314 if (exproto
->state
== CLOSED
) return;
2315 ceph_assert(exproto
->state
== NONE
);
2317 // we have called shutdown_socket above
2318 ceph_assert(existing
->last_tick_id
== 0);
2319 // restart timer since we are going to re-build connection
2320 existing
->last_connect_started
= ceph::coarse_mono_clock::now();
2321 existing
->last_tick_id
= existing
->center
->create_time_event(
2322 existing
->connect_timeout_us
, existing
->tick_handler
);
2323 existing
->state
= AsyncConnection::STATE_CONNECTION_ESTABLISHED
;
2324 exproto
->state
= ACCEPTING
;
2326 existing
->center
->create_file_event(
2327 existing
->cs
.fd(), EVENT_READABLE
, existing
->read_handler
);
2328 reply
.global_seq
= exproto
->peer_global_seq
;
2329 exproto
->run_continuation(exproto
->send_connect_message_reply(
2330 CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
, authorizer_reply
));
2332 if (existing
->center
->in_thread())
2333 transfer_existing();
2335 existing
->center
->submit_to(existing
->center
->get_id(),
2336 std::move(transfer_existing
), true);
2338 std::move(temp_cs
));
2340 existing
->center
->submit_to(existing
->center
->get_id(),
2341 std::move(deactivate_existing
), true);
2342 existing
->write_lock
.unlock();
2343 existing
->lock
.unlock();
2346 existing
->lock
.unlock();
2348 return open(reply
, authorizer_reply
);
2351 CtPtr
ProtocolV1::open(ceph_msg_connect_reply
&reply
,
2352 bufferlist
&authorizer_reply
) {
2353 ldout(cct
, 20) << __func__
<< dendl
;
2355 connect_seq
= connect_msg
.connect_seq
+ 1;
2356 peer_global_seq
= connect_msg
.global_seq
;
2357 ldout(cct
, 10) << __func__
<< " accept success, connect_seq = " << connect_seq
2358 << " in_seq=" << in_seq
<< ", sending READY" << dendl
;
2360 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2362 if ((connect_msg
.features
& CEPH_FEATURE_RECONNECT_SEQ
) &&
2363 !is_reset_from_peer
) {
2364 reply
.tag
= CEPH_MSGR_TAG_SEQ
;
2365 wait_for_seq
= true;
2367 reply
.tag
= CEPH_MSGR_TAG_READY
;
2368 wait_for_seq
= false;
2369 out_seq
= discard_requeued_up_to(out_seq
, 0);
2370 is_reset_from_peer
= false;
2375 reply
.features
= connection
->policy
.features_supported
;
2376 reply
.global_seq
= messenger
->get_global_seq();
2377 reply
.connect_seq
= connect_seq
;
2379 reply
.authorizer_len
= authorizer_reply
.length();
2380 if (connection
->policy
.lossy
) {
2381 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
2384 connection
->set_features((uint64_t)reply
.features
&
2385 (uint64_t)connect_msg
.features
);
2386 ldout(cct
, 10) << __func__
<< " accept features "
2387 << connection
->get_features()
2388 << " authorizer_protocol "
2389 << connect_msg
.authorizer_protocol
<< dendl
;
2391 session_security
.reset(
2392 get_auth_session_handler(cct
, connect_msg
.authorizer_protocol
,
2394 connection
->get_features()));
2396 bufferlist reply_bl
;
2397 reply_bl
.append((char *)&reply
, sizeof(reply
));
2399 if (reply
.authorizer_len
) {
2400 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2403 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
2404 uint64_t s
= in_seq
;
2405 reply_bl
.append((char *)&s
, sizeof(s
));
2408 connection
->lock
.unlock();
2409 // Because "replacing" will prevent other connections preempt this addr,
2410 // it's safe that here we don't acquire Connection's lock
2411 ssize_t r
= messenger
->accept_conn(connection
);
2413 connection
->inject_delay();
2415 connection
->lock
.lock();
2418 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2419 << connection
->peer_addrs
->legacy_addr()
2420 << " just fail later one(this)" << dendl
;
2421 ldout(cct
, 10) << "accept fault after register" << dendl
;
2422 connection
->inject_delay();
2425 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2426 ldout(cct
, 1) << __func__
2427 << " state changed while accept_conn, it must be mark_down"
2429 ceph_assert(state
== CLOSED
|| state
== NONE
);
2430 ldout(cct
, 10) << "accept fault after register" << dendl
;
2431 messenger
->unregister_conn(connection
);
2432 connection
->inject_delay();
2436 return WRITE(reply_bl
, handle_ready_connect_message_reply_write
);
2439 CtPtr
ProtocolV1::handle_ready_connect_message_reply_write(int r
) {
2440 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2443 ldout(cct
, 1) << __func__
<< " write ready connect message reply failed"
2449 connection
->dispatch_queue
->queue_accept(connection
);
2450 messenger
->ms_deliver_handle_fast_accept(connection
);
2453 state
= ACCEPTING_HANDLED_CONNECT_MSG
;
2459 return server_ready();
2462 CtPtr
ProtocolV1::wait_seq() {
2463 ldout(cct
, 20) << __func__
<< dendl
;
2465 return READ(sizeof(uint64_t), handle_seq
);
2468 CtPtr
ProtocolV1::handle_seq(char *buffer
, int r
) {
2469 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2472 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
2476 uint64_t newly_acked_seq
= *(uint64_t *)buffer
;
2477 ldout(cct
, 2) << __func__
<< " accept get newly_acked_seq " << newly_acked_seq
2479 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
2481 return server_ready();
2484 CtPtr
ProtocolV1::server_ready() {
2485 ldout(cct
, 20) << __func__
<< " session_security is "
2489 ldout(cct
, 20) << __func__
<< " accept done" << dendl
;
2490 memset(&connect_msg
, 0, sizeof(connect_msg
));
2492 if (connection
->delay_state
) {
2493 ceph_assert(connection
->delay_state
->ready());