1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV1.h"
6 #include "common/errno.h"
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
12 #include "auth/AuthClient.h"
13 #include "auth/AuthServer.h"
15 #define dout_subsys ceph_subsys_ms
17 #define dout_prefix _conn_prefix(_dout)
18 std::ostream
&ProtocolV1::_conn_prefix(std::ostream
*_dout
) {
19 return *_dout
<< "--1- " << messenger
->get_myaddrs() << " >> "
20 << *connection
->peer_addrs
22 << connection
<< " " << this
23 << " :" << connection
->port
<< " s=" << get_state_name(state
)
24 << " pgs=" << peer_global_seq
<< " cs=" << connect_seq
25 << " l=" << connection
->policy
.lossy
<< ").";
28 #define WRITE(B, C) write(CONTINUATION(C), B)
30 #define READ(L, C) read(CONTINUATION(C), L)
32 #define READB(L, B, C) read(CONTINUATION(C), L, B)
34 // Constant to limit starting sequence number to 2^31. Nothing special about
35 // it, just a big number. PLR
36 #define SEQ_MASK 0x7fffffff
38 const int ASYNC_COALESCE_THRESHOLD
= 256;
42 static void alloc_aligned_buffer(ceph::buffer::list
&data
, unsigned len
, unsigned off
) {
43 // create a buffer to read into that matches the data alignment
44 unsigned alloc_len
= 0;
47 if (off
& ~CEPH_PAGE_MASK
) {
49 alloc_len
+= CEPH_PAGE_SIZE
;
50 head
= std::min
<uint64_t>(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
54 ceph::bufferptr
ptr(ceph::buffer::create_small_page_aligned(alloc_len
));
55 if (head
) ptr
.set_offset(CEPH_PAGE_SIZE
- head
);
56 data
.push_back(std::move(ptr
));
63 ProtocolV1::ProtocolV1(AsyncConnection
*connection
)
64 : Protocol(1, connection
),
66 can_write(WriteStatus::NOWRITE
),
73 is_reset_from_peer(false),
78 temp_buffer
= new char[4096];
81 ProtocolV1::~ProtocolV1() {
82 ceph_assert(out_q
.empty());
83 ceph_assert(sent
.empty());
88 void ProtocolV1::connect() {
89 this->state
= START_CONNECT
;
91 // reset connect state variables
92 authorizer_buf
.clear();
93 // FIPS zeroization audit 20191115: these memsets are not security related.
94 memset(&connect_msg
, 0, sizeof(connect_msg
));
95 memset(&connect_reply
, 0, sizeof(connect_reply
));
97 global_seq
= messenger
->get_global_seq();
100 void ProtocolV1::accept() { this->state
= START_ACCEPT
; }
102 bool ProtocolV1::is_connected() {
103 return can_write
.load() == WriteStatus::CANWRITE
;
106 void ProtocolV1::stop() {
107 ldout(cct
, 20) << __func__
<< dendl
;
108 if (state
== CLOSED
) {
112 if (connection
->delay_state
) connection
->delay_state
->flush();
114 ldout(cct
, 2) << __func__
<< dendl
;
115 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
122 can_write
= WriteStatus::CLOSED
;
126 void ProtocolV1::fault() {
127 ldout(cct
, 20) << __func__
<< dendl
;
129 if (state
== CLOSED
|| state
== NONE
) {
130 ldout(cct
, 10) << __func__
<< " connection is already closed" << dendl
;
134 if (connection
->policy
.lossy
&& state
!= START_CONNECT
&&
135 state
!= CONNECTING
) {
136 ldout(cct
, 1) << __func__
<< " on lossy channel, failing" << dendl
;
138 connection
->dispatch_queue
->queue_reset(connection
);
142 connection
->write_lock
.lock();
143 can_write
= WriteStatus::NOWRITE
;
144 is_reset_from_peer
= false;
146 // requeue sent items
149 if (!once_ready
&& out_q
.empty() && state
>= START_ACCEPT
&&
150 state
<= ACCEPTING_WAIT_CONNECT_MSG_AUTH
&& !replacing
) {
151 ldout(cct
, 10) << __func__
<< " with nothing to send and in the half "
152 << " accept state just closed" << dendl
;
153 connection
->write_lock
.unlock();
155 connection
->dispatch_queue
->queue_reset(connection
);
164 if (connection
->policy
.standby
&& out_q
.empty() && !keepalive
&&
166 ldout(cct
, 10) << __func__
<< " with nothing to send, going to standby"
169 connection
->write_lock
.unlock();
173 connection
->write_lock
.unlock();
175 if ((state
>= START_CONNECT
&& state
<= CONNECTING_SEND_CONNECT_MSG
) ||
179 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
180 } else if (backoff
== utime_t()) {
181 backoff
.set_from_double(cct
->_conf
->ms_initial_backoff
);
184 if (backoff
> cct
->_conf
->ms_max_backoff
)
185 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
188 global_seq
= messenger
->get_global_seq();
189 state
= START_CONNECT
;
190 connection
->state
= AsyncConnection::STATE_CONNECTING
;
191 ldout(cct
, 10) << __func__
<< " waiting " << backoff
<< dendl
;
193 connection
->register_time_events
.insert(
194 connection
->center
->create_time_event(backoff
.to_nsec() / 1000,
195 connection
->wakeup_handler
));
197 // policy maybe empty when state is in accept
198 if (connection
->policy
.server
) {
199 ldout(cct
, 0) << __func__
<< " server, going to standby" << dendl
;
202 ldout(cct
, 0) << __func__
<< " initiating reconnect" << dendl
;
204 global_seq
= messenger
->get_global_seq();
205 state
= START_CONNECT
;
206 connection
->state
= AsyncConnection::STATE_CONNECTING
;
209 connection
->center
->dispatch_event_external(connection
->read_handler
);
213 void ProtocolV1::send_message(Message
*m
) {
214 ceph::buffer::list bl
;
215 uint64_t f
= connection
->get_features();
217 // TODO: Currently not all messages supports reencode like MOSDMap, so here
218 // only let fast dispatch support messages prepare message
219 bool can_fast_prepare
= messenger
->ms_can_fast_dispatch(m
);
220 if (can_fast_prepare
) {
221 prepare_send_message(f
, m
, bl
);
224 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
225 // "features" changes will change the payload encoding
226 if (can_fast_prepare
&&
227 (can_write
== WriteStatus::NOWRITE
|| connection
->get_features() != f
)) {
228 // ensure the correctness of message encoding
231 ldout(cct
, 5) << __func__
<< " clear encoded buffer previous " << f
232 << " != " << connection
->get_features() << dendl
;
234 if (can_write
== WriteStatus::CLOSED
) {
235 ldout(cct
, 10) << __func__
<< " connection closed."
236 << " Drop message " << m
<< dendl
;
239 m
->queue_start
= ceph::mono_clock::now();
240 m
->trace
.event("async enqueueing message");
241 out_q
[m
->get_priority()].emplace_back(std::move(bl
), m
);
242 ldout(cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
244 if (can_write
!= WriteStatus::REPLACING
&& !write_in_progress
) {
245 write_in_progress
= true;
246 connection
->center
->dispatch_event_external(connection
->write_handler
);
251 void ProtocolV1::prepare_send_message(uint64_t features
, Message
*m
,
252 ceph::buffer::list
&bl
) {
253 ldout(cct
, 20) << __func__
<< " m " << *m
<< dendl
;
255 // associate message with Connection (for benefit of encode_payload)
256 ldout(cct
, 20) << __func__
<< (m
->empty_payload() ? " encoding features " : " half-reencoding features ")
257 << features
<< " " << m
<< " " << *m
<< dendl
;
259 // encode and copy out of *m
260 // in write_message we update header.seq and need recalc crc
261 // so skip calc header in encode function.
262 m
->encode(features
, messenger
->crcflags
, true);
264 bl
.append(m
->get_payload());
265 bl
.append(m
->get_middle());
266 bl
.append(m
->get_data());
269 void ProtocolV1::send_keepalive() {
270 ldout(cct
, 10) << __func__
<< dendl
;
271 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
272 if (can_write
!= WriteStatus::CLOSED
) {
274 connection
->center
->dispatch_event_external(connection
->write_handler
);
278 void ProtocolV1::read_event() {
279 ldout(cct
, 20) << __func__
<< dendl
;
282 CONTINUATION_RUN(CONTINUATION(send_client_banner
));
285 CONTINUATION_RUN(CONTINUATION(send_server_banner
));
288 CONTINUATION_RUN(CONTINUATION(wait_message
));
290 case THROTTLE_MESSAGE
:
291 CONTINUATION_RUN(CONTINUATION(throttle_message
));
294 CONTINUATION_RUN(CONTINUATION(throttle_bytes
));
296 case THROTTLE_DISPATCH_QUEUE
:
297 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue
));
304 void ProtocolV1::write_event() {
305 ldout(cct
, 10) << __func__
<< dendl
;
308 connection
->write_lock
.lock();
309 if (can_write
== WriteStatus::CANWRITE
) {
311 append_keepalive_or_ack();
315 auto start
= ceph::mono_clock::now();
318 ceph::buffer::list data
;
319 Message
*m
= _get_next_outgoing(&data
);
324 if (!connection
->policy
.lossy
) {
329 more
= !out_q
.empty();
330 connection
->write_lock
.unlock();
332 // send_message or requeue messages may not encode message
333 if (!data
.length()) {
334 prepare_send_message(connection
->get_features(), m
, data
);
337 if (m
->queue_start
!= ceph::mono_time()) {
338 connection
->logger
->tinc(l_msgr_send_messages_queue_lat
,
339 ceph::mono_clock::now() - m
->queue_start
);
342 r
= write_message(m
, data
, more
);
344 connection
->write_lock
.lock();
348 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
351 // Outbound message in-progress, thread will be re-awoken
352 // when the outbound socket is writeable again
355 } while (can_write
== WriteStatus::CANWRITE
);
356 write_in_progress
= false;
357 connection
->write_lock
.unlock();
359 // if r > 0 mean data still lefted, so no need _try_send.
361 uint64_t left
= ack_left
;
365 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_ACK
);
366 connection
->outgoing_bl
.append((char *)&s
, sizeof(s
));
367 ldout(cct
, 10) << __func__
<< " try send msg ack, acked " << left
368 << " messages" << dendl
;
371 r
= connection
->_try_send(left
);
372 } else if (is_queued()) {
373 r
= connection
->_try_send();
377 connection
->logger
->tinc(l_msgr_running_send_time
,
378 ceph::mono_clock::now() - start
);
380 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
381 connection
->lock
.lock();
383 connection
->lock
.unlock();
387 write_in_progress
= false;
388 connection
->write_lock
.unlock();
389 connection
->lock
.lock();
390 connection
->write_lock
.lock();
391 if (state
== STANDBY
&& !connection
->policy
.server
&& is_queued()) {
392 ldout(cct
, 10) << __func__
<< " policy.server is false" << dendl
;
393 connection
->_connect();
394 } else if (connection
->cs
&& state
!= NONE
&& state
!= CLOSED
&&
395 state
!= START_CONNECT
) {
396 r
= connection
->_try_send();
398 ldout(cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
399 connection
->write_lock
.unlock();
401 connection
->lock
.unlock();
405 connection
->write_lock
.unlock();
406 connection
->lock
.unlock();
410 bool ProtocolV1::is_queued() {
411 return !out_q
.empty() || connection
->is_queued();
414 void ProtocolV1::run_continuation(CtPtr pcontinuation
) {
416 CONTINUATION_RUN(*pcontinuation
);
420 CtPtr
ProtocolV1::read(CONTINUATION_RX_TYPE
<ProtocolV1
> &next
,
421 int len
, char *buffer
) {
423 buffer
= temp_buffer
;
425 ssize_t r
= connection
->read(len
, buffer
,
426 [&next
, this](char *buffer
, int r
) {
427 next
.setParams(buffer
, r
);
428 CONTINUATION_RUN(next
);
431 next
.setParams(buffer
, r
);
438 CtPtr
ProtocolV1::write(CONTINUATION_TX_TYPE
<ProtocolV1
> &next
,
439 ceph::buffer::list
&buffer
) {
440 ssize_t r
= connection
->write(buffer
, [&next
, this](int r
) {
442 CONTINUATION_RUN(next
);
452 CtPtr
ProtocolV1::ready() {
453 ldout(cct
, 25) << __func__
<< dendl
;
455 // make sure no pending tick timer
456 if (connection
->last_tick_id
) {
457 connection
->center
->delete_time_event(connection
->last_tick_id
);
459 connection
->last_tick_id
= connection
->center
->create_time_event(
460 connection
->inactive_timeout_us
, connection
->tick_handler
);
462 connection
->write_lock
.lock();
463 can_write
= WriteStatus::CANWRITE
;
465 connection
->center
->dispatch_event_external(connection
->write_handler
);
467 connection
->write_lock
.unlock();
468 connection
->maybe_start_delay_thread();
471 return wait_message();
474 CtPtr
ProtocolV1::wait_message() {
475 if (state
!= OPENED
) { // must have changed due to a replace
479 ldout(cct
, 20) << __func__
<< dendl
;
481 return READ(sizeof(char), handle_message
);
484 CtPtr
ProtocolV1::handle_message(char *buffer
, int r
) {
485 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
488 ldout(cct
, 1) << __func__
<< " read tag failed" << dendl
;
492 char tag
= buffer
[0];
493 ldout(cct
, 20) << __func__
<< " process tag " << (int)tag
<< dendl
;
495 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
496 ldout(cct
, 20) << __func__
<< " got KEEPALIVE" << dendl
;
497 connection
->set_last_keepalive(ceph_clock_now());
498 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
499 return READ(sizeof(ceph_timespec
), handle_keepalive2
);
500 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
501 return READ(sizeof(ceph_timespec
), handle_keepalive2_ack
);
502 } else if (tag
== CEPH_MSGR_TAG_ACK
) {
503 return READ(sizeof(ceph_le64
), handle_tag_ack
);
504 } else if (tag
== CEPH_MSGR_TAG_MSG
) {
505 recv_stamp
= ceph_clock_now();
506 ldout(cct
, 20) << __func__
<< " begin MSG" << dendl
;
507 return READ(sizeof(ceph_msg_header
), handle_message_header
);
508 } else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
509 ldout(cct
, 20) << __func__
<< " got CLOSE" << dendl
;
512 ldout(cct
, 0) << __func__
<< " bad tag " << (int)tag
<< dendl
;
518 CtPtr
ProtocolV1::handle_keepalive2(char *buffer
, int r
) {
519 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
522 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
526 ldout(cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
529 t
= (ceph_timespec
*)buffer
;
530 utime_t kp_t
= utime_t(*t
);
531 connection
->write_lock
.lock();
532 append_keepalive_or_ack(true, &kp_t
);
533 connection
->write_lock
.unlock();
535 ldout(cct
, 20) << __func__
<< " got KEEPALIVE2 " << kp_t
<< dendl
;
536 connection
->set_last_keepalive(ceph_clock_now());
538 if (is_connected()) {
539 connection
->center
->dispatch_event_external(connection
->write_handler
);
542 return CONTINUE(wait_message
);
545 void ProtocolV1::append_keepalive_or_ack(bool ack
, utime_t
*tp
) {
546 ldout(cct
, 10) << __func__
<< dendl
;
549 struct ceph_timespec ts
;
550 tp
->encode_timeval(&ts
);
551 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK
);
552 connection
->outgoing_bl
.append((char *)&ts
, sizeof(ts
));
553 } else if (connection
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
554 struct ceph_timespec ts
;
555 utime_t t
= ceph_clock_now();
556 t
.encode_timeval(&ts
);
557 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2
);
558 connection
->outgoing_bl
.append((char *)&ts
, sizeof(ts
));
560 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_KEEPALIVE
);
564 CtPtr
ProtocolV1::handle_keepalive2_ack(char *buffer
, int r
) {
565 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
568 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
573 t
= (ceph_timespec
*)buffer
;
574 connection
->set_last_keepalive_ack(utime_t(*t
));
575 ldout(cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
577 return CONTINUE(wait_message
);
580 CtPtr
ProtocolV1::handle_tag_ack(char *buffer
, int r
) {
581 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
584 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
589 seq
= *(ceph_le64
*)buffer
;
590 ldout(cct
, 20) << __func__
<< " got ACK" << dendl
;
592 ldout(cct
, 15) << __func__
<< " got ack seq " << seq
<< dendl
;
594 static const int max_pending
= 128;
596 auto now
= ceph::mono_clock::now();
597 Message
*pending
[max_pending
];
598 connection
->write_lock
.lock();
599 while (!sent
.empty() && sent
.front()->get_seq() <= seq
&& i
< max_pending
) {
600 Message
*m
= sent
.front();
603 ldout(cct
, 10) << __func__
<< " got ack seq " << seq
604 << " >= " << m
->get_seq() << " on " << m
<< " " << *m
607 connection
->write_lock
.unlock();
608 connection
->logger
->tinc(l_msgr_handle_ack_lat
, ceph::mono_clock::now() - now
);
609 for (int k
= 0; k
< i
; k
++) {
613 return CONTINUE(wait_message
);
616 CtPtr
ProtocolV1::handle_message_header(char *buffer
, int r
) {
617 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
620 ldout(cct
, 1) << __func__
<< " read message header failed" << dendl
;
624 ldout(cct
, 20) << __func__
<< " got MSG header" << dendl
;
626 current_header
= *((ceph_msg_header
*)buffer
);
628 ldout(cct
, 20) << __func__
<< " got envelope type=" << current_header
.type
<< " src "
629 << entity_name_t(current_header
.src
) << " front=" << current_header
.front_len
630 << " data=" << current_header
.data_len
<< " off " << current_header
.data_off
633 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
634 __u32 header_crc
= 0;
635 header_crc
= ceph_crc32c(0, (unsigned char *)¤t_header
,
636 sizeof(current_header
) - sizeof(current_header
.crc
));
638 if (header_crc
!= current_header
.crc
) {
639 ldout(cct
, 0) << __func__
<< " got bad header crc " << header_crc
640 << " != " << current_header
.crc
<< dendl
;
651 state
= THROTTLE_MESSAGE
;
652 return CONTINUE(throttle_message
);
655 CtPtr
ProtocolV1::throttle_message() {
656 ldout(cct
, 20) << __func__
<< dendl
;
658 if (connection
->policy
.throttler_messages
) {
659 ldout(cct
, 10) << __func__
<< " wants " << 1
660 << " message from policy throttler "
661 << connection
->policy
.throttler_messages
->get_current()
662 << "/" << connection
->policy
.throttler_messages
->get_max()
664 if (!connection
->policy
.throttler_messages
->get_or_fail()) {
665 ldout(cct
, 10) << __func__
<< " wants 1 message from policy throttle "
666 << connection
->policy
.throttler_messages
->get_current()
667 << "/" << connection
->policy
.throttler_messages
->get_max()
668 << " failed, just wait." << dendl
;
669 // following thread pool deal with th full message queue isn't a
670 // short time, so we can wait a ms.
671 if (connection
->register_time_events
.empty()) {
672 connection
->register_time_events
.insert(
673 connection
->center
->create_time_event(1000,
674 connection
->wakeup_handler
));
680 state
= THROTTLE_BYTES
;
681 return CONTINUE(throttle_bytes
);
684 CtPtr
ProtocolV1::throttle_bytes() {
685 ldout(cct
, 20) << __func__
<< dendl
;
687 cur_msg_size
= current_header
.front_len
+ current_header
.middle_len
+
688 current_header
.data_len
;
690 if (connection
->policy
.throttler_bytes
) {
691 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
692 << " bytes from policy throttler "
693 << connection
->policy
.throttler_bytes
->get_current() << "/"
694 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
695 if (!connection
->policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
696 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
697 << " bytes from policy throttler "
698 << connection
->policy
.throttler_bytes
->get_current()
699 << "/" << connection
->policy
.throttler_bytes
->get_max()
700 << " failed, just wait." << dendl
;
701 // following thread pool deal with th full message queue isn't a
702 // short time, so we can wait a ms.
703 if (connection
->register_time_events
.empty()) {
704 connection
->register_time_events
.insert(
705 connection
->center
->create_time_event(
706 1000, connection
->wakeup_handler
));
713 state
= THROTTLE_DISPATCH_QUEUE
;
714 return CONTINUE(throttle_dispatch_queue
);
717 CtPtr
ProtocolV1::throttle_dispatch_queue() {
718 ldout(cct
, 20) << __func__
<< dendl
;
721 if (!connection
->dispatch_queue
->dispatch_throttler
.get_or_fail(
724 << __func__
<< " wants " << cur_msg_size
725 << " bytes from dispatch throttle "
726 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
727 << connection
->dispatch_queue
->dispatch_throttler
.get_max()
728 << " failed, just wait." << dendl
;
729 // following thread pool deal with th full message queue isn't a
730 // short time, so we can wait a ms.
731 if (connection
->register_time_events
.empty()) {
732 connection
->register_time_events
.insert(
733 connection
->center
->create_time_event(1000,
734 connection
->wakeup_handler
));
740 throttle_stamp
= ceph_clock_now();
742 state
= READ_MESSAGE_FRONT
;
743 return read_message_front();
746 CtPtr
ProtocolV1::read_message_front() {
747 ldout(cct
, 20) << __func__
<< dendl
;
749 unsigned front_len
= current_header
.front_len
;
751 if (!front
.length()) {
752 front
.push_back(ceph::buffer::create(front_len
));
754 return READB(front_len
, front
.c_str(), handle_message_front
);
756 return read_message_middle();
759 CtPtr
ProtocolV1::handle_message_front(char *buffer
, int r
) {
760 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
763 ldout(cct
, 1) << __func__
<< " read message front failed" << dendl
;
767 ldout(cct
, 20) << __func__
<< " got front " << front
.length() << dendl
;
769 return read_message_middle();
772 CtPtr
ProtocolV1::read_message_middle() {
773 ldout(cct
, 20) << __func__
<< dendl
;
775 if (current_header
.middle_len
) {
776 if (!middle
.length()) {
777 middle
.push_back(ceph::buffer::create(current_header
.middle_len
));
779 return READB(current_header
.middle_len
, middle
.c_str(),
780 handle_message_middle
);
783 return read_message_data_prepare();
786 CtPtr
ProtocolV1::handle_message_middle(char *buffer
, int r
) {
787 ldout(cct
, 20) << __func__
<< " r" << r
<< dendl
;
790 ldout(cct
, 1) << __func__
<< " read message middle failed" << dendl
;
794 ldout(cct
, 20) << __func__
<< " got middle " << middle
.length() << dendl
;
796 return read_message_data_prepare();
799 CtPtr
ProtocolV1::read_message_data_prepare() {
800 ldout(cct
, 20) << __func__
<< dendl
;
802 unsigned data_len
= current_header
.data_len
;
803 unsigned data_off
= current_header
.data_off
;
808 // rx_buffers is broken by design... see
809 // http://tracker.ceph.com/issues/22480
810 map
<ceph_tid_t
, pair
<ceph::buffer::list
, int> >::iterator p
=
811 connection
->rx_buffers
.find(current_header
.tid
);
812 if (p
!= connection
->rx_buffers
.end()) {
813 ldout(cct
, 10) << __func__
<< " seleting rx buffer v " << p
->second
.second
814 << " at offset " << data_off
<< " len "
815 << p
->second
.first
.length() << dendl
;
816 data_buf
= p
->second
.first
;
817 // make sure it's big enough
818 if (data_buf
.length() < data_len
)
819 data_buf
.push_back(buffer::create(data_len
- data_buf
.length()));
820 data_blp
= data_buf
.begin();
822 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
823 << data_off
<< dendl
;
824 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
825 data_blp
= data_buf
.begin();
828 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
829 << data_off
<< dendl
;
830 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
831 data_blp
= data_buf
.begin();
837 return CONTINUE(read_message_data
);
840 CtPtr
ProtocolV1::read_message_data() {
841 ldout(cct
, 20) << __func__
<< " msg_left=" << msg_left
<< dendl
;
844 auto bp
= data_blp
.get_current_ptr();
845 unsigned read_len
= std::min(bp
.length(), msg_left
);
847 return READB(read_len
, bp
.c_str(), handle_message_data
);
850 return read_message_footer();
853 CtPtr
ProtocolV1::handle_message_data(char *buffer
, int r
) {
854 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
857 ldout(cct
, 1) << __func__
<< " read data error " << dendl
;
861 auto bp
= data_blp
.get_current_ptr();
862 unsigned read_len
= std::min(bp
.length(), msg_left
);
863 ceph_assert(read_len
<
864 static_cast<unsigned>(std::numeric_limits
<int>::max()));
865 data_blp
+= read_len
;
866 data
.append(bp
, 0, read_len
);
867 msg_left
-= read_len
;
869 return CONTINUE(read_message_data
);
872 CtPtr
ProtocolV1::read_message_footer() {
873 ldout(cct
, 20) << __func__
<< dendl
;
875 state
= READ_FOOTER_AND_DISPATCH
;
878 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
879 len
= sizeof(ceph_msg_footer
);
881 len
= sizeof(ceph_msg_footer_old
);
884 return READ(len
, handle_message_footer
);
887 CtPtr
ProtocolV1::handle_message_footer(char *buffer
, int r
) {
888 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
891 ldout(cct
, 1) << __func__
<< " read footer data error " << dendl
;
895 ceph_msg_footer footer
;
896 ceph_msg_footer_old old_footer
;
898 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
899 footer
= *((ceph_msg_footer
*)buffer
);
901 old_footer
= *((ceph_msg_footer_old
*)buffer
);
902 footer
.front_crc
= old_footer
.front_crc
;
903 footer
.middle_crc
= old_footer
.middle_crc
;
904 footer
.data_crc
= old_footer
.data_crc
;
906 footer
.flags
= old_footer
.flags
;
909 int aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
910 ldout(cct
, 10) << __func__
<< " aborted = " << aborted
<< dendl
;
912 ldout(cct
, 0) << __func__
<< " got " << front
.length() << " + "
913 << middle
.length() << " + " << data
.length()
914 << " byte message.. ABORTED" << dendl
;
918 ldout(cct
, 20) << __func__
<< " got " << front
.length() << " + "
919 << middle
.length() << " + " << data
.length() << " byte message"
921 Message
*message
= decode_message(cct
, messenger
->crcflags
, current_header
,
922 footer
, front
, middle
, data
, connection
);
924 ldout(cct
, 1) << __func__
<< " decode message failed " << dendl
;
929 // Check the signature if one should be present. A zero return indicates
933 if (session_security
.get() == NULL
) {
934 ldout(cct
, 10) << __func__
<< " no session security set" << dendl
;
936 if (session_security
->check_message_signature(message
)) {
937 ldout(cct
, 0) << __func__
<< " Signature check failed" << dendl
;
942 message
->set_byte_throttler(connection
->policy
.throttler_bytes
);
943 message
->set_message_throttler(connection
->policy
.throttler_messages
);
945 // store reservation size in message, so we don't get confused
946 // by messages entering the dispatch queue through other paths.
947 message
->set_dispatch_throttle_size(cur_msg_size
);
949 message
->set_recv_stamp(recv_stamp
);
950 message
->set_throttle_stamp(throttle_stamp
);
951 message
->set_recv_complete_stamp(ceph_clock_now());
953 // check received seq#. if it is old, drop the message.
954 // note that incoming messages may skip ahead. this is convenient for the
955 // client side queueing because messages can't be renumbered, but the (kernel)
956 // client will occasionally pull a message out of the sent queue to send
957 // elsewhere. in that case it doesn't matter if we "got" it or not.
958 uint64_t cur_seq
= in_seq
;
959 if (message
->get_seq() <= cur_seq
) {
960 ldout(cct
, 0) << __func__
<< " got old message " << message
->get_seq()
961 << " <= " << cur_seq
<< " " << message
<< " " << *message
962 << ", discarding" << dendl
;
964 if (connection
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
965 cct
->_conf
->ms_die_on_old_message
) {
966 ceph_assert(0 == "old msgs despite reconnect_seq feature");
970 if (message
->get_seq() > cur_seq
+ 1) {
971 ldout(cct
, 0) << __func__
<< " missed message? skipped from seq "
972 << cur_seq
<< " to " << message
->get_seq() << dendl
;
973 if (cct
->_conf
->ms_die_on_skipped_message
) {
974 ceph_assert(0 == "skipped incoming seq");
978 #if defined(WITH_EVENTTRACE)
979 if (message
->get_type() == CEPH_MSG_OSD_OP
||
980 message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
981 utime_t ltt_processed_stamp
= ceph_clock_now();
982 double usecs_elapsed
=
983 ((double)(ltt_processed_stamp
.to_nsec() - recv_stamp
.to_nsec())) / 1000;
985 if (message
->get_type() == CEPH_MSG_OSD_OP
)
986 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP",
989 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY",
994 // note last received message.
995 in_seq
= message
->get_seq();
996 ldout(cct
, 5) << " rx " << message
->get_source() << " seq "
997 << message
->get_seq() << " " << message
<< " " << *message
1000 bool need_dispatch_writer
= false;
1001 if (!connection
->policy
.lossy
) {
1003 need_dispatch_writer
= true;
1008 ceph::mono_time fast_dispatch_time
;
1010 if (connection
->is_blackhole()) {
1011 ldout(cct
, 10) << __func__
<< " blackhole " << *message
<< dendl
;
1016 connection
->logger
->inc(l_msgr_recv_messages
);
1017 connection
->logger
->inc(
1019 cur_msg_size
+ sizeof(ceph_msg_header
) + sizeof(ceph_msg_footer
));
1021 messenger
->ms_fast_preprocess(message
);
1022 fast_dispatch_time
= ceph::mono_clock::now();
1023 connection
->logger
->tinc(l_msgr_running_recv_time
,
1024 fast_dispatch_time
- connection
->recv_start_time
);
1025 if (connection
->delay_state
) {
1026 double delay_period
= 0;
1027 if (rand() % 10000 < cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1029 cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1030 ldout(cct
, 1) << "queue_received will delay after "
1031 << (ceph_clock_now() + delay_period
) << " on " << message
1032 << " " << *message
<< dendl
;
1034 connection
->delay_state
->queue(delay_period
, message
);
1035 } else if (messenger
->ms_can_fast_dispatch(message
)) {
1036 connection
->lock
.unlock();
1037 connection
->dispatch_queue
->fast_dispatch(message
);
1038 connection
->recv_start_time
= ceph::mono_clock::now();
1039 connection
->logger
->tinc(l_msgr_running_fast_dispatch_time
,
1040 connection
->recv_start_time
- fast_dispatch_time
);
1041 connection
->lock
.lock();
1043 connection
->dispatch_queue
->enqueue(message
, message
->get_priority(),
1044 connection
->conn_id
);
1048 // clean up local buffer references
1054 if (need_dispatch_writer
&& connection
->is_connected()) {
1055 connection
->center
->dispatch_event_external(connection
->write_handler
);
1058 return CONTINUE(wait_message
);
1061 void ProtocolV1::session_reset() {
1062 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1064 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1065 if (connection
->delay_state
) {
1066 connection
->delay_state
->discard();
1069 connection
->dispatch_queue
->discard_queue(connection
->conn_id
);
1070 discard_out_queue();
1071 // note: we need to clear outgoing_bl here, but session_reset may be
1072 // called by other thread, so let caller clear this itself!
1073 // outgoing_bl.clear();
1075 connection
->dispatch_queue
->queue_remote_reset(connection
);
1077 randomize_out_seq();
1081 // it's safe to directly set 0, double locked
1084 can_write
= WriteStatus::NOWRITE
;
1087 void ProtocolV1::randomize_out_seq() {
1088 if (connection
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1089 // Set out_seq to a random value, so CRC won't be predictable.
1090 auto rand_seq
= ceph::util::generate_random_number
<uint64_t>(0, SEQ_MASK
);
1091 ldout(cct
, 10) << __func__
<< " randomize_out_seq " << rand_seq
<< dendl
;
1094 // previously, seq #'s always started at 0.
1099 ssize_t
ProtocolV1::write_message(Message
*m
, ceph::buffer::list
&bl
, bool more
) {
1101 ceph_assert(connection
->center
->in_thread());
1102 m
->set_seq(++out_seq
);
1104 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1105 m
->calc_header_crc();
1108 ceph_msg_header
&header
= m
->get_header();
1109 ceph_msg_footer
&footer
= m
->get_footer();
1111 // TODO: let sign_message could be reentry?
1112 // Now that we have all the crcs calculated, handle the
1113 // digital signature for the message, if the AsyncConnection has session
1114 // security set up. Some session security options do not
1115 // actually calculate and check the signature, but they should
1116 // handle the calls to sign_message and check_signature. PLR
1117 if (session_security
.get() == NULL
) {
1118 ldout(cct
, 20) << __func__
<< " no session security" << dendl
;
1120 if (session_security
->sign_message(m
)) {
1121 ldout(cct
, 20) << __func__
<< " failed to sign m=" << m
1122 << "): sig = " << footer
.sig
<< dendl
;
1124 ldout(cct
, 20) << __func__
<< " signed m=" << m
1125 << "): sig = " << footer
.sig
<< dendl
;
1129 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_MSG
);
1130 connection
->outgoing_bl
.append((char *)&header
, sizeof(header
));
1132 ldout(cct
, 20) << __func__
<< " sending message type=" << header
.type
1133 << " src " << entity_name_t(header
.src
)
1134 << " front=" << header
.front_len
<< " data=" << header
.data_len
1135 << " off " << header
.data_off
<< dendl
;
1137 if ((bl
.length() <= ASYNC_COALESCE_THRESHOLD
) && (bl
.get_num_buffers() > 1)) {
1138 for (const auto &pb
: bl
.buffers()) {
1139 connection
->outgoing_bl
.append((char *)pb
.c_str(), pb
.length());
1142 connection
->outgoing_bl
.claim_append(bl
);
1145 // send footer; if receiver doesn't support signatures, use the old footer
1147 ceph_msg_footer_old old_footer
;
1148 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
1149 connection
->outgoing_bl
.append((char *)&footer
, sizeof(footer
));
1151 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1152 old_footer
.front_crc
= footer
.front_crc
;
1153 old_footer
.middle_crc
= footer
.middle_crc
;
1155 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
1157 old_footer
.data_crc
=
1158 messenger
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
1159 old_footer
.flags
= footer
.flags
;
1160 connection
->outgoing_bl
.append((char *)&old_footer
, sizeof(old_footer
));
1163 m
->trace
.event("async writing message");
1164 ldout(cct
, 20) << __func__
<< " sending " << m
->get_seq() << " " << m
1166 ssize_t total_send_size
= connection
->outgoing_bl
.length();
1167 ssize_t rc
= connection
->_try_send(more
);
1169 ldout(cct
, 1) << __func__
<< " error sending " << m
<< ", "
1170 << cpp_strerror(rc
) << dendl
;
1172 connection
->logger
->inc(
1173 l_msgr_send_bytes
, total_send_size
- connection
->outgoing_bl
.length());
1174 ldout(cct
, 10) << __func__
<< " sending " << m
1175 << (rc
? " continuely." : " done.") << dendl
;
1178 #if defined(WITH_EVENTTRACE)
1179 if (m
->get_type() == CEPH_MSG_OSD_OP
)
1180 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
1181 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
1182 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
1189 void ProtocolV1::requeue_sent() {
1190 write_in_progress
= false;
1195 list
<pair
<ceph::buffer::list
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1196 out_seq
-= sent
.size();
1197 while (!sent
.empty()) {
1198 Message
*m
= sent
.back();
1200 ldout(cct
, 10) << __func__
<< " " << *m
<< " for resend "
1201 << " (" << m
->get_seq() << ")" << dendl
;
1203 rq
.push_front(make_pair(ceph::buffer::list(), m
));
1207 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
) {
1208 ldout(cct
, 10) << __func__
<< " " << seq
<< dendl
;
1209 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1210 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
1213 list
<pair
<ceph::buffer::list
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1214 uint64_t count
= out_seq
;
1215 while (!rq
.empty()) {
1216 pair
<ceph::buffer::list
, Message
*> p
= rq
.front();
1217 if (p
.second
->get_seq() == 0 || p
.second
->get_seq() > seq
) break;
1218 ldout(cct
, 10) << __func__
<< " " << *(p
.second
) << " for resend seq "
1219 << p
.second
->get_seq() << " <= " << seq
<< ", discarding"
1225 if (rq
.empty()) out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1230 * Tears down the message queues, and removes them from the
1231 * DispatchQueue Must hold write_lock prior to calling.
1233 void ProtocolV1::discard_out_queue() {
1234 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1236 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1237 ldout(cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
1241 for (map
<int, list
<pair
<ceph::buffer::list
, Message
*> > >::iterator p
=
1243 p
!= out_q
.end(); ++p
) {
1244 for (list
<pair
<ceph::buffer::list
, Message
*> >::iterator r
= p
->second
.begin();
1245 r
!= p
->second
.end(); ++r
) {
1246 ldout(cct
, 20) << __func__
<< " discard " << r
->second
<< dendl
;
1251 write_in_progress
= false;
1254 void ProtocolV1::reset_security()
1256 ldout(cct
, 5) << __func__
<< dendl
;
1258 auth_meta
.reset(new AuthConnectionMeta
);
1259 authorizer_more
.clear();
1260 session_security
.reset();
1263 void ProtocolV1::reset_recv_state()
1265 ldout(cct
, 5) << __func__
<< dendl
;
1267 // execute in the same thread that uses the `session_security`.
1268 // We need to do the warp because holding `write_lock` is not
1269 // enough as `write_event()` releases it just before calling
1270 // `write_message()`. `submit_to()` here is NOT blocking.
1271 if (!connection
->center
->in_thread()) {
1272 connection
->center
->submit_to(connection
->center
->get_id(), [this] {
1273 ldout(cct
, 5) << "reset_recv_state (warped) reseting security handlers"
1275 // Possibly unnecessary. See the comment in `deactivate_existing`.
1276 std::lock_guard
<std::mutex
> l(connection
->lock
);
1277 std::lock_guard
<std::mutex
> wl(connection
->write_lock
);
1279 }, /* always_async = */true);
1284 // clean read and write callbacks
1285 connection
->pendingReadLen
.reset();
1286 connection
->writeCallback
.reset();
1288 if (state
> THROTTLE_MESSAGE
&& state
<= READ_FOOTER_AND_DISPATCH
&&
1289 connection
->policy
.throttler_messages
) {
1290 ldout(cct
, 10) << __func__
<< " releasing " << 1
1291 << " message to policy throttler "
1292 << connection
->policy
.throttler_messages
->get_current()
1293 << "/" << connection
->policy
.throttler_messages
->get_max()
1295 connection
->policy
.throttler_messages
->put();
1297 if (state
> THROTTLE_BYTES
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1298 if (connection
->policy
.throttler_bytes
) {
1299 ldout(cct
, 10) << __func__
<< " releasing " << cur_msg_size
1300 << " bytes to policy throttler "
1301 << connection
->policy
.throttler_bytes
->get_current() << "/"
1302 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
1303 connection
->policy
.throttler_bytes
->put(cur_msg_size
);
1306 if (state
> THROTTLE_DISPATCH_QUEUE
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1308 << __func__
<< " releasing " << cur_msg_size
1309 << " bytes to dispatch_queue throttler "
1310 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
1311 << connection
->dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
1312 connection
->dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
1316 Message
*ProtocolV1::_get_next_outgoing(ceph::buffer::list
*bl
) {
1318 if (!out_q
.empty()) {
1319 map
<int, list
<pair
<ceph::buffer::list
, Message
*> > >::reverse_iterator it
=
1321 ceph_assert(!it
->second
.empty());
1322 list
<pair
<ceph::buffer::list
, Message
*> >::iterator p
= it
->second
.begin();
1324 if (p
->first
.length() && bl
) {
1325 assert(bl
->length() == 0);
1328 it
->second
.erase(p
);
1329 if (it
->second
.empty()) out_q
.erase(it
->first
);
1335 * Client Protocol V1
1338 CtPtr
ProtocolV1::send_client_banner() {
1339 ldout(cct
, 20) << __func__
<< dendl
;
1342 ceph::buffer::list bl
;
1343 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1344 return WRITE(bl
, handle_client_banner_write
);
1347 CtPtr
ProtocolV1::handle_client_banner_write(int r
) {
1348 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1351 ldout(cct
, 1) << __func__
<< " write client banner failed" << dendl
;
1354 ldout(cct
, 10) << __func__
<< " connect write banner done: "
1355 << connection
->get_peer_addr() << dendl
;
1357 return wait_server_banner();
1360 CtPtr
ProtocolV1::wait_server_banner() {
1361 state
= CONNECTING_WAIT_BANNER_AND_IDENTIFY
;
1363 ldout(cct
, 20) << __func__
<< dendl
;
1365 ceph::buffer::list myaddrbl
;
1366 unsigned banner_len
= strlen(CEPH_BANNER
);
1367 unsigned need_len
= banner_len
+ sizeof(ceph_entity_addr
) * 2;
1368 return READ(need_len
, handle_server_banner_and_identify
);
1371 CtPtr
ProtocolV1::handle_server_banner_and_identify(char *buffer
, int r
) {
1372 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1375 ldout(cct
, 1) << __func__
<< " read banner and identify addresses failed"
1380 unsigned banner_len
= strlen(CEPH_BANNER
);
1381 if (memcmp(buffer
, CEPH_BANNER
, banner_len
)) {
1382 ldout(cct
, 0) << __func__
<< " connect protocol error (bad banner) on peer "
1383 << connection
->get_peer_addr() << dendl
;
1387 ceph::buffer::list bl
;
1388 entity_addr_t paddr
, peer_addr_for_me
;
1390 bl
.append(buffer
+ banner_len
, sizeof(ceph_entity_addr
) * 2);
1391 auto p
= bl
.cbegin();
1394 decode(peer_addr_for_me
, p
);
1395 } catch (const ceph::buffer::error
&e
) {
1396 lderr(cct
) << __func__
<< " decode peer addr failed " << dendl
;
1399 ldout(cct
, 20) << __func__
<< " connect read peer addr " << paddr
1400 << " on socket " << connection
->cs
.fd() << dendl
;
1402 entity_addr_t peer_addr
= connection
->peer_addrs
->legacy_addr();
1403 if (peer_addr
!= paddr
) {
1404 if (paddr
.is_blank_ip() && peer_addr
.get_port() == paddr
.get_port() &&
1405 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1406 ldout(cct
, 0) << __func__
<< " connect claims to be " << paddr
<< " not "
1407 << peer_addr
<< " - presumably this is the same node!"
1410 ldout(cct
, 10) << __func__
<< " connect claims to be " << paddr
<< " not "
1411 << peer_addr
<< dendl
;
1416 ldout(cct
, 20) << __func__
<< " connect peer addr for me is "
1417 << peer_addr_for_me
<< dendl
;
1418 if (messenger
->get_myaddrs().empty() ||
1419 messenger
->get_myaddrs().front().is_blank_ip()) {
1420 sockaddr_storage ss
;
1421 socklen_t len
= sizeof(ss
);
1422 getsockname(connection
->cs
.fd(), (sockaddr
*)&ss
, &len
);
1424 if (cct
->_conf
->ms_learn_addr_from_peer
) {
1425 ldout(cct
, 1) << __func__
<< " peer " << connection
->target_addr
1426 << " says I am " << peer_addr_for_me
<< " (socket says "
1427 << (sockaddr
*)&ss
<< ")" << dendl
;
1428 a
= peer_addr_for_me
;
1430 ldout(cct
, 1) << __func__
<< " socket to " << connection
->target_addr
1431 << " says I am " << (sockaddr
*)&ss
1432 << " (peer says " << peer_addr_for_me
<< ")" << dendl
;
1433 a
.set_sockaddr((sockaddr
*)&ss
);
1435 a
.set_type(entity_addr_t::TYPE_LEGACY
); // anything but NONE; learned_addr ignores this
1437 connection
->lock
.unlock();
1438 messenger
->learned_addr(a
);
1439 if (cct
->_conf
->ms_inject_internal_delays
&&
1440 cct
->_conf
->ms_inject_socket_failures
) {
1441 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
1442 ldout(cct
, 10) << __func__
<< " sleep for "
1443 << cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1445 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
1449 connection
->lock
.lock();
1450 if (state
!= CONNECTING_WAIT_BANNER_AND_IDENTIFY
) {
1451 ldout(cct
, 1) << __func__
1452 << " state changed while learned_addr, mark_down or "
1453 << " replacing must be happened just now" << dendl
;
1458 ceph::buffer::list myaddrbl
;
1459 encode(messenger
->get_myaddr_legacy(), myaddrbl
, 0); // legacy
1460 return WRITE(myaddrbl
, handle_my_addr_write
);
1463 CtPtr
ProtocolV1::handle_my_addr_write(int r
) {
1464 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1467 ldout(cct
, 2) << __func__
<< " connect couldn't write my addr, "
1468 << cpp_strerror(r
) << dendl
;
1471 ldout(cct
, 10) << __func__
<< " connect sent my addr "
1472 << messenger
->get_myaddr_legacy() << dendl
;
1474 return CONTINUE(send_connect_message
);
1477 CtPtr
ProtocolV1::send_connect_message()
1479 state
= CONNECTING_SEND_CONNECT_MSG
;
1481 ldout(cct
, 20) << __func__
<< dendl
;
1482 ceph_assert(messenger
->auth_client
);
1484 ceph::buffer::list auth_bl
;
1485 vector
<uint32_t> preferred_modes
;
1487 if (connection
->peer_type
!= CEPH_ENTITY_TYPE_MON
||
1488 messenger
->get_myname().type() == CEPH_ENTITY_TYPE_MON
) {
1489 if (authorizer_more
.length()) {
1490 ldout(cct
,10) << __func__
<< " using augmented (challenge) auth payload"
1492 auth_bl
= authorizer_more
;
1494 auto am
= auth_meta
;
1495 authorizer_more
.clear();
1496 connection
->lock
.unlock();
1497 int r
= messenger
->auth_client
->get_auth_request(
1498 connection
, am
.get(),
1499 &am
->auth_method
, &preferred_modes
, &auth_bl
);
1500 connection
->lock
.lock();
1504 if (state
!= CONNECTING_SEND_CONNECT_MSG
) {
1505 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1511 ceph_msg_connect connect
;
1512 connect
.features
= connection
->policy
.features_supported
;
1513 connect
.host_type
= messenger
->get_myname().type();
1514 connect
.global_seq
= global_seq
;
1515 connect
.connect_seq
= connect_seq
;
1516 connect
.protocol_version
=
1517 messenger
->get_proto_version(connection
->peer_type
, true);
1518 if (auth_bl
.length()) {
1519 ldout(cct
, 10) << __func__
1520 << " connect_msg.authorizer_len=" << auth_bl
.length()
1521 << " protocol=" << auth_meta
->auth_method
<< dendl
;
1522 connect
.authorizer_protocol
= auth_meta
->auth_method
;
1523 connect
.authorizer_len
= auth_bl
.length();
1525 connect
.authorizer_protocol
= 0;
1526 connect
.authorizer_len
= 0;
1530 if (connection
->policy
.lossy
) {
1532 CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1535 ceph::buffer::list bl
;
1536 bl
.append((char *)&connect
, sizeof(connect
));
1537 if (auth_bl
.length()) {
1538 bl
.append(auth_bl
.c_str(), auth_bl
.length());
1541 ldout(cct
, 10) << __func__
<< " connect sending gseq=" << global_seq
1542 << " cseq=" << connect_seq
1543 << " proto=" << connect
.protocol_version
<< dendl
;
1545 return WRITE(bl
, handle_connect_message_write
);
1548 CtPtr
ProtocolV1::handle_connect_message_write(int r
) {
1549 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1552 ldout(cct
, 2) << __func__
<< " connect couldn't send reply "
1553 << cpp_strerror(r
) << dendl
;
1557 ldout(cct
, 20) << __func__
1558 << " connect wrote (self +) cseq, waiting for reply" << dendl
;
1560 return wait_connect_reply();
1563 CtPtr
ProtocolV1::wait_connect_reply() {
1564 ldout(cct
, 20) << __func__
<< dendl
;
1566 // FIPS zeroization audit 20191115: this memset is not security related.
1567 memset(&connect_reply
, 0, sizeof(connect_reply
));
1568 return READ(sizeof(connect_reply
), handle_connect_reply_1
);
1571 CtPtr
ProtocolV1::handle_connect_reply_1(char *buffer
, int r
) {
1572 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1575 ldout(cct
, 1) << __func__
<< " read connect reply failed" << dendl
;
1579 connect_reply
= *((ceph_msg_connect_reply
*)buffer
);
1581 ldout(cct
, 20) << __func__
<< " connect got reply tag "
1582 << (int)connect_reply
.tag
<< " connect_seq "
1583 << connect_reply
.connect_seq
<< " global_seq "
1584 << connect_reply
.global_seq
<< " proto "
1585 << connect_reply
.protocol_version
<< " flags "
1586 << (int)connect_reply
.flags
<< " features "
1587 << connect_reply
.features
<< dendl
;
1589 if (connect_reply
.authorizer_len
) {
1590 return wait_connect_reply_auth();
1593 return handle_connect_reply_2();
1596 CtPtr
ProtocolV1::wait_connect_reply_auth() {
1597 ldout(cct
, 20) << __func__
<< dendl
;
1599 ldout(cct
, 10) << __func__
1600 << " reply.authorizer_len=" << connect_reply
.authorizer_len
1603 ceph_assert(connect_reply
.authorizer_len
< 4096);
1605 return READ(connect_reply
.authorizer_len
, handle_connect_reply_auth
);
1608 CtPtr
ProtocolV1::handle_connect_reply_auth(char *buffer
, int r
) {
1609 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1612 ldout(cct
, 1) << __func__
<< " read connect reply authorizer failed"
1617 ceph::buffer::list authorizer_reply
;
1618 authorizer_reply
.append(buffer
, connect_reply
.authorizer_len
);
1620 if (connection
->peer_type
!= CEPH_ENTITY_TYPE_MON
||
1621 messenger
->get_myname().type() == CEPH_ENTITY_TYPE_MON
) {
1622 auto am
= auth_meta
;
1623 bool more
= (connect_reply
.tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
);
1624 ceph::buffer::list auth_retry_bl
;
1626 connection
->lock
.unlock();
1628 r
= messenger
->auth_client
->handle_auth_reply_more(
1629 connection
, am
.get(), authorizer_reply
, &auth_retry_bl
);
1631 // these aren't used for v1
1634 r
= messenger
->auth_client
->handle_auth_done(
1635 connection
, am
.get(),
1636 0 /* global id */, 0 /* con mode */,
1638 &skey
, &con_secret
);
1640 connection
->lock
.lock();
1641 if (state
!= CONNECTING_SEND_CONNECT_MSG
) {
1642 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
1648 if (more
&& r
== 0) {
1649 authorizer_more
= auth_retry_bl
;
1650 return CONTINUE(send_connect_message
);
1654 return handle_connect_reply_2();
1657 CtPtr
ProtocolV1::handle_connect_reply_2() {
1658 ldout(cct
, 20) << __func__
<< dendl
;
1660 if (connect_reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1661 ldout(cct
, 0) << __func__
<< " connect protocol feature mismatch, my "
1662 << std::hex
<< connection
->policy
.features_supported
1663 << " < peer " << connect_reply
.features
<< " missing "
1664 << (connect_reply
.features
&
1665 ~connection
->policy
.features_supported
)
1666 << std::dec
<< dendl
;
1670 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1671 ldout(cct
, 0) << __func__
<< " connect protocol version mismatch, my "
1672 << messenger
->get_proto_version(connection
->peer_type
, true)
1673 << " != " << connect_reply
.protocol_version
<< dendl
;
1677 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1678 ldout(cct
, 0) << __func__
<< " connect got BADAUTHORIZER" << dendl
;
1679 authorizer_more
.clear();
1683 if (connect_reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1684 ldout(cct
, 0) << __func__
<< " connect got RESETSESSION" << dendl
;
1688 // see session_reset
1689 connection
->outgoing_bl
.clear();
1691 return CONTINUE(send_connect_message
);
1694 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1695 global_seq
= messenger
->get_global_seq(connect_reply
.global_seq
);
1696 ldout(cct
, 5) << __func__
<< " connect got RETRY_GLOBAL "
1697 << connect_reply
.global_seq
<< " chose new " << global_seq
1699 return CONTINUE(send_connect_message
);
1702 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1703 ceph_assert(connect_reply
.connect_seq
> connect_seq
);
1704 ldout(cct
, 5) << __func__
<< " connect got RETRY_SESSION " << connect_seq
1705 << " -> " << connect_reply
.connect_seq
<< dendl
;
1706 connect_seq
= connect_reply
.connect_seq
;
1707 return CONTINUE(send_connect_message
);
1710 if (connect_reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1711 ldout(cct
, 1) << __func__
<< " connect got WAIT (connection race)" << dendl
;
1716 uint64_t feat_missing
;
1718 connection
->policy
.features_required
& ~(uint64_t)connect_reply
.features
;
1720 ldout(cct
, 1) << __func__
<< " missing required features " << std::hex
1721 << feat_missing
<< std::dec
<< dendl
;
1725 if (connect_reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1728 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1731 return wait_ack_seq();
1734 if (connect_reply
.tag
== CEPH_MSGR_TAG_READY
) {
1735 ldout(cct
, 10) << __func__
<< " got CEPH_MSGR_TAG_READY " << dendl
;
1738 return client_ready();
1741 CtPtr
ProtocolV1::wait_ack_seq() {
1742 ldout(cct
, 20) << __func__
<< dendl
;
1744 return READ(sizeof(uint64_t), handle_ack_seq
);
1747 CtPtr
ProtocolV1::handle_ack_seq(char *buffer
, int r
) {
1748 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1751 ldout(cct
, 1) << __func__
<< " read connect ack seq failed" << dendl
;
1755 uint64_t newly_acked_seq
= 0;
1757 newly_acked_seq
= *((uint64_t *)buffer
);
1758 ldout(cct
, 2) << __func__
<< " got newly_acked_seq " << newly_acked_seq
1759 << " vs out_seq " << out_seq
<< dendl
;
1760 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
1762 ceph::buffer::list bl
;
1763 uint64_t s
= in_seq
;
1764 bl
.append((char *)&s
, sizeof(s
));
1766 return WRITE(bl
, handle_in_seq_write
);
1769 CtPtr
ProtocolV1::handle_in_seq_write(int r
) {
1770 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1773 ldout(cct
, 10) << __func__
<< " failed to send in_seq " << dendl
;
1777 ldout(cct
, 10) << __func__
<< " send in_seq done " << dendl
;
1779 return client_ready();
1782 CtPtr
ProtocolV1::client_ready() {
1783 ldout(cct
, 20) << __func__
<< dendl
;
1786 peer_global_seq
= connect_reply
.global_seq
;
1787 connection
->policy
.lossy
= connect_reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1791 ceph_assert(connect_seq
== connect_reply
.connect_seq
);
1792 backoff
= utime_t();
1793 connection
->set_features((uint64_t)connect_reply
.features
&
1794 (uint64_t)connection
->policy
.features_supported
);
1795 ldout(cct
, 10) << __func__
<< " connect success " << connect_seq
1796 << ", lossy = " << connection
->policy
.lossy
<< ", features "
1797 << connection
->get_features() << dendl
;
1799 // If we have an authorizer, get a new AuthSessionHandler to deal with
1800 // ongoing security of the connection. PLR
1801 if (auth_meta
->authorizer
) {
1802 ldout(cct
, 10) << __func__
<< " setting up session_security with auth "
1803 << auth_meta
->authorizer
.get() << dendl
;
1804 session_security
.reset(get_auth_session_handler(
1805 cct
, auth_meta
->authorizer
->protocol
,
1806 auth_meta
->session_key
,
1807 connection
->get_features()));
1809 // We have no authorizer, so we shouldn't be applying security to messages
1810 // in this AsyncConnection. PLR
1811 ldout(cct
, 10) << __func__
<< " no authorizer, clearing session_security"
1813 session_security
.reset();
1816 if (connection
->delay_state
) {
1817 ceph_assert(connection
->delay_state
->ready());
1819 connection
->dispatch_queue
->queue_connect(connection
);
1820 messenger
->ms_deliver_handle_fast_connect(connection
);
1826 * Server Protocol V1
1829 CtPtr
ProtocolV1::send_server_banner() {
1830 ldout(cct
, 20) << __func__
<< dendl
;
1833 ceph::buffer::list bl
;
1835 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1837 // as a server, we should have a legacy addr if we accepted this connection.
1838 auto legacy
= messenger
->get_myaddrs().legacy_addr();
1839 encode(legacy
, bl
, 0); // legacy
1840 connection
->port
= legacy
.get_port();
1841 encode(connection
->target_addr
, bl
, 0); // legacy
1843 ldout(cct
, 1) << __func__
<< " sd=" << connection
->cs
.fd()
1844 << " legacy " << legacy
1845 << " socket_addr " << connection
->socket_addr
1846 << " target_addr " << connection
->target_addr
1849 return WRITE(bl
, handle_server_banner_write
);
1852 CtPtr
ProtocolV1::handle_server_banner_write(int r
) {
1853 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1856 ldout(cct
, 1) << " write server banner failed" << dendl
;
1859 ldout(cct
, 10) << __func__
<< " write banner and addr done: "
1860 << connection
->get_peer_addr() << dendl
;
1862 return wait_client_banner();
1865 CtPtr
ProtocolV1::wait_client_banner() {
1866 ldout(cct
, 20) << __func__
<< dendl
;
1868 return READ(strlen(CEPH_BANNER
) + sizeof(ceph_entity_addr
),
1869 handle_client_banner
);
1872 CtPtr
ProtocolV1::handle_client_banner(char *buffer
, int r
) {
1873 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1876 ldout(cct
, 1) << __func__
<< " read peer banner and addr failed" << dendl
;
1880 if (memcmp(buffer
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1881 ldout(cct
, 1) << __func__
<< " accept peer sent bad banner '" << buffer
1882 << "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
1886 ceph::buffer::list addr_bl
;
1887 entity_addr_t peer_addr
;
1889 addr_bl
.append(buffer
+ strlen(CEPH_BANNER
), sizeof(ceph_entity_addr
));
1891 auto ti
= addr_bl
.cbegin();
1892 decode(peer_addr
, ti
);
1893 } catch (const ceph::buffer::error
&e
) {
1894 lderr(cct
) << __func__
<< " decode peer_addr failed " << dendl
;
1898 ldout(cct
, 10) << __func__
<< " accept peer addr is " << peer_addr
<< dendl
;
1899 if (peer_addr
.is_blank_ip()) {
1900 // peer apparently doesn't know what ip they have; figure it out for them.
1901 int port
= peer_addr
.get_port();
1902 peer_addr
.set_sockaddr(connection
->target_addr
.get_sockaddr());
1903 peer_addr
.set_port(port
);
1905 ldout(cct
, 0) << __func__
<< " accept peer addr is really " << peer_addr
1906 << " (socket is " << connection
->target_addr
<< ")" << dendl
;
1908 connection
->set_peer_addr(peer_addr
); // so that connection_state gets set up
1909 connection
->target_addr
= peer_addr
;
1911 return CONTINUE(wait_connect_message
);
1914 CtPtr
ProtocolV1::wait_connect_message() {
1915 ldout(cct
, 20) << __func__
<< dendl
;
1917 // FIPS zeroization audit 20191115: this memset is not security related.
1918 memset(&connect_msg
, 0, sizeof(connect_msg
));
1919 return READ(sizeof(connect_msg
), handle_connect_message_1
);
1922 CtPtr
ProtocolV1::handle_connect_message_1(char *buffer
, int r
) {
1923 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1926 ldout(cct
, 1) << __func__
<< " read connect msg failed" << dendl
;
1930 connect_msg
= *((ceph_msg_connect
*)buffer
);
1932 state
= ACCEPTING_WAIT_CONNECT_MSG_AUTH
;
1934 if (connect_msg
.authorizer_len
) {
1935 return wait_connect_message_auth();
1938 return handle_connect_message_2();
1941 CtPtr
ProtocolV1::wait_connect_message_auth() {
1942 ldout(cct
, 20) << __func__
<< dendl
;
1943 authorizer_buf
.clear();
1944 authorizer_buf
.push_back(ceph::buffer::create(connect_msg
.authorizer_len
));
1945 return READB(connect_msg
.authorizer_len
, authorizer_buf
.c_str(),
1946 handle_connect_message_auth
);
1949 CtPtr
ProtocolV1::handle_connect_message_auth(char *buffer
, int r
) {
1950 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1953 ldout(cct
, 1) << __func__
<< " read connect authorizer failed" << dendl
;
1957 return handle_connect_message_2();
1960 CtPtr
ProtocolV1::handle_connect_message_2() {
1961 ldout(cct
, 20) << __func__
<< dendl
;
1963 ldout(cct
, 20) << __func__
<< " accept got peer connect_seq "
1964 << connect_msg
.connect_seq
<< " global_seq "
1965 << connect_msg
.global_seq
<< dendl
;
1967 connection
->set_peer_type(connect_msg
.host_type
);
1968 connection
->policy
= messenger
->get_policy(connect_msg
.host_type
);
1970 ldout(cct
, 10) << __func__
<< " accept of host_type " << connect_msg
.host_type
1971 << ", policy.lossy=" << connection
->policy
.lossy
1972 << " policy.server=" << connection
->policy
.server
1973 << " policy.standby=" << connection
->policy
.standby
1974 << " policy.resetcheck=" << connection
->policy
.resetcheck
1975 << " features 0x" << std::hex
<< (uint64_t)connect_msg
.features
1979 ceph_msg_connect_reply reply
;
1980 ceph::buffer::list authorizer_reply
;
1982 // FIPS zeroization audit 20191115: this memset is not security related.
1983 memset(&reply
, 0, sizeof(reply
));
1984 reply
.protocol_version
=
1985 messenger
->get_proto_version(connection
->peer_type
, false);
1988 ldout(cct
, 10) << __func__
<< " accept my proto " << reply
.protocol_version
1989 << ", their proto " << connect_msg
.protocol_version
<< dendl
;
1991 if (connect_msg
.protocol_version
!= reply
.protocol_version
) {
1992 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER
, reply
,
1996 // require signatures for cephx?
1997 if (connect_msg
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
1998 if (connection
->peer_type
== CEPH_ENTITY_TYPE_OSD
||
1999 connection
->peer_type
== CEPH_ENTITY_TYPE_MDS
||
2000 connection
->peer_type
== CEPH_ENTITY_TYPE_MGR
) {
2001 if (cct
->_conf
->cephx_require_signatures
||
2002 cct
->_conf
->cephx_cluster_require_signatures
) {
2005 << " using cephx, requiring MSG_AUTH feature bit for cluster"
2007 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
2009 if (cct
->_conf
->cephx_require_version
>= 2 ||
2010 cct
->_conf
->cephx_cluster_require_version
>= 2) {
2013 << " using cephx, requiring cephx v2 feature bit for cluster"
2015 connection
->policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
2018 if (cct
->_conf
->cephx_require_signatures
||
2019 cct
->_conf
->cephx_service_require_signatures
) {
2022 << " using cephx, requiring MSG_AUTH feature bit for service"
2024 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
2026 if (cct
->_conf
->cephx_require_version
>= 2 ||
2027 cct
->_conf
->cephx_service_require_version
>= 2) {
2030 << " using cephx, requiring cephx v2 feature bit for service"
2032 connection
->policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
2037 uint64_t feat_missing
=
2038 connection
->policy
.features_required
& ~(uint64_t)connect_msg
.features
;
2040 ldout(cct
, 1) << __func__
<< " peer missing required features " << std::hex
2041 << feat_missing
<< std::dec
<< dendl
;
2042 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES
, reply
,
2046 ceph::buffer::list auth_bl_copy
= authorizer_buf
;
2047 auto am
= auth_meta
;
2048 am
->auth_method
= connect_msg
.authorizer_protocol
;
2049 if (!HAVE_FEATURE((uint64_t)connect_msg
.features
, CEPHX_V2
)) {
2050 // peer doesn't support it and we won't get here if we require it
2051 am
->skip_authorizer_challenge
= true;
2053 connection
->lock
.unlock();
2054 ldout(cct
,10) << __func__
<< " authorizor_protocol "
2055 << connect_msg
.authorizer_protocol
2056 << " len " << auth_bl_copy
.length()
2058 bool more
= (bool)auth_meta
->authorizer_challenge
;
2059 int r
= messenger
->auth_server
->handle_auth_request(
2067 connection
->lock
.lock();
2068 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2069 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
2072 ldout(cct
, 0) << __func__
<< ": got bad authorizer, auth_reply_len="
2073 << authorizer_reply
.length() << dendl
;
2074 session_security
.reset();
2075 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER
, reply
,
2079 connection
->lock
.lock();
2080 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2081 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
2084 ldout(cct
, 10) << __func__
<< ": challenging authorizer" << dendl
;
2085 ceph_assert(authorizer_reply
.length());
2086 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
,
2087 reply
, authorizer_reply
);
2090 // We've verified the authorizer for this AsyncConnection, so set up the
2091 // session security structure. PLR
2092 ldout(cct
, 10) << __func__
<< " accept setting up session_security." << dendl
;
2094 if (connection
->policy
.server
&&
2095 connection
->policy
.lossy
&&
2096 !connection
->policy
.register_lossy_clients
) {
2097 // incoming lossy client, no need to register this connection
2099 ldout(cct
, 10) << __func__
<< " accept new session" << dendl
;
2100 connection
->lock
.lock();
2101 return open(reply
, authorizer_reply
);
2104 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
2106 connection
->inject_delay();
2108 connection
->lock
.lock();
2109 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2110 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
2114 if (existing
== connection
) {
2117 if (existing
&& existing
->protocol
->proto_type
!= 1) {
2118 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2119 << existing
->protocol
.get() << " version is "
2120 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
2121 existing
->mark_down();
2126 // There is no possible that existing connection will acquire this
2127 // connection's lock
2128 existing
->lock
.lock(); // skip lockdep check (we are locking a second
2129 // AsyncConnection here)
2131 ldout(cct
,10) << __func__
<< " existing=" << existing
<< " exproto="
2132 << existing
->protocol
.get() << dendl
;
2133 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
2134 ceph_assert(exproto
);
2135 ceph_assert(exproto
->proto_type
== 1);
2137 if (exproto
->state
== CLOSED
) {
2138 ldout(cct
, 1) << __func__
<< " existing " << existing
2139 << " already closed." << dendl
;
2140 existing
->lock
.unlock();
2143 return open(reply
, authorizer_reply
);
2146 if (exproto
->replacing
) {
2147 ldout(cct
, 1) << __func__
2148 << " existing racing replace happened while replacing."
2149 << " existing_state="
2150 << connection
->get_state_name(existing
->state
) << dendl
;
2151 reply
.global_seq
= exproto
->peer_global_seq
;
2152 existing
->lock
.unlock();
2153 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2157 if (connect_msg
.global_seq
< exproto
->peer_global_seq
) {
2158 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2159 << exproto
->peer_global_seq
<< " > "
2160 << connect_msg
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
2161 reply
.global_seq
= exproto
->peer_global_seq
; // so we can send it below..
2162 existing
->lock
.unlock();
2163 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2166 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2167 << exproto
->peer_global_seq
2168 << " <= " << connect_msg
.global_seq
<< ", looks ok"
2172 if (existing
->policy
.lossy
) {
2175 << " accept replacing existing (lossy) channel (new one lossy="
2176 << connection
->policy
.lossy
<< ")" << dendl
;
2177 exproto
->session_reset();
2178 return replace(existing
, reply
, authorizer_reply
);
2181 ldout(cct
, 1) << __func__
<< " accept connect_seq "
2182 << connect_msg
.connect_seq
2183 << " vs existing csq=" << exproto
->connect_seq
2184 << " existing_state="
2185 << connection
->get_state_name(existing
->state
) << dendl
;
2187 if (connect_msg
.connect_seq
== 0 && exproto
->connect_seq
> 0) {
2190 << " accept peer reset, then tried to connect to us, replacing"
2192 // this is a hard reset from peer
2193 is_reset_from_peer
= true;
2194 if (connection
->policy
.resetcheck
) {
2195 exproto
->session_reset(); // this resets out_queue, msg_ and
2198 return replace(existing
, reply
, authorizer_reply
);
2201 if (connect_msg
.connect_seq
< exproto
->connect_seq
) {
2202 // old attempt, or we sent READY but they didn't get it.
2203 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".cseq "
2204 << exproto
->connect_seq
<< " > " << connect_msg
.connect_seq
2205 << ", RETRY_SESSION" << dendl
;
2206 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2207 existing
->lock
.unlock();
2208 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2212 if (connect_msg
.connect_seq
== exproto
->connect_seq
) {
2213 // if the existing connection successfully opened, and/or
2214 // subsequently went to standby, then the peer should bump
2215 // their connect_seq and retry: this is not a connection race
2216 // we need to resolve here.
2217 if (exproto
->state
== OPENED
|| exproto
->state
== STANDBY
) {
2218 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2219 << existing
<< ".cseq " << exproto
->connect_seq
2220 << " == " << connect_msg
.connect_seq
2221 << ", OPEN|STANDBY, RETRY_SESSION " << dendl
;
2222 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2224 if (connection
->policy
.resetcheck
&& exproto
->connect_seq
== 0) {
2225 return replace(existing
, reply
, authorizer_reply
);
2228 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2229 existing
->lock
.unlock();
2230 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2235 if (connection
->peer_addrs
->legacy_addr() < messenger
->get_myaddr_legacy() ||
2236 existing
->policy
.server
) {
2238 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2239 << existing
<< ".cseq " << exproto
->connect_seq
2240 << " == " << connect_msg
.connect_seq
2241 << ", or we are server, replacing my attempt" << dendl
;
2242 return replace(existing
, reply
, authorizer_reply
);
2244 // our existing outgoing wins
2245 ldout(messenger
->cct
, 10)
2246 << __func__
<< " accept connection race, existing " << existing
2247 << ".cseq " << exproto
->connect_seq
2248 << " == " << connect_msg
.connect_seq
<< ", sending WAIT" << dendl
;
2249 ceph_assert(connection
->peer_addrs
->legacy_addr() >
2250 messenger
->get_myaddr_legacy());
2251 existing
->lock
.unlock();
2252 // make sure we follow through with opening the existing
2253 // connection (if it isn't yet open) since we know the peer
2254 // has something to send to us.
2255 existing
->send_keepalive();
2256 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT
, reply
,
2261 ceph_assert(connect_msg
.connect_seq
> exproto
->connect_seq
);
2262 ceph_assert(connect_msg
.global_seq
>= exproto
->peer_global_seq
);
2263 if (connection
->policy
.resetcheck
&& // RESETSESSION only used by servers;
2264 // peers do not reset each other
2265 exproto
->connect_seq
== 0) {
2266 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2267 << connect_msg
.connect_seq
<< ", " << existing
2268 << ".cseq = " << exproto
->connect_seq
2269 << "), sending RESETSESSION " << dendl
;
2270 existing
->lock
.unlock();
2271 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2276 ldout(cct
, 10) << __func__
<< " accept peer sent cseq "
2277 << connect_msg
.connect_seq
<< " > " << exproto
->connect_seq
2279 return replace(existing
, reply
, authorizer_reply
);
2281 else if (!replacing
&& connect_msg
.connect_seq
> 0) {
2282 // we reset, and they are opening a new session
2283 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2284 << connect_msg
.connect_seq
<< "), sending RESETSESSION"
2286 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2290 ldout(cct
, 10) << __func__
<< " accept new session" << dendl
;
2292 return open(reply
, authorizer_reply
);
2296 CtPtr
ProtocolV1::send_connect_message_reply(char tag
,
2297 ceph_msg_connect_reply
&reply
,
2298 ceph::buffer::list
&authorizer_reply
) {
2299 ldout(cct
, 20) << __func__
<< dendl
;
2300 ceph::buffer::list reply_bl
;
2303 ((uint64_t)connect_msg
.features
& connection
->policy
.features_supported
) |
2304 connection
->policy
.features_required
;
2305 reply
.authorizer_len
= authorizer_reply
.length();
2306 reply_bl
.append((char *)&reply
, sizeof(reply
));
2308 ldout(cct
, 10) << __func__
<< " reply features 0x" << std::hex
2309 << reply
.features
<< " = (policy sup 0x"
2310 << connection
->policy
.features_supported
2311 << " & connect 0x" << (uint64_t)connect_msg
.features
2312 << ") | policy req 0x"
2313 << connection
->policy
.features_required
2316 if (reply
.authorizer_len
) {
2317 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2318 authorizer_reply
.clear();
2321 return WRITE(reply_bl
, handle_connect_message_reply_write
);
2324 CtPtr
ProtocolV1::handle_connect_message_reply_write(int r
) {
2325 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2328 ldout(cct
, 1) << " write connect message reply failed" << dendl
;
2329 connection
->inject_delay();
2333 return CONTINUE(wait_connect_message
);
2336 CtPtr
ProtocolV1::replace(const AsyncConnectionRef
& existing
,
2337 ceph_msg_connect_reply
&reply
,
2338 ceph::buffer::list
&authorizer_reply
) {
2339 ldout(cct
, 10) << __func__
<< " accept replacing " << existing
<< dendl
;
2341 connection
->inject_delay();
2342 if (existing
->policy
.lossy
) {
2343 // disconnect from the Connection
2344 ldout(cct
, 1) << __func__
<< " replacing on lossy channel, failing existing"
2346 existing
->protocol
->stop();
2347 existing
->dispatch_queue
->queue_reset(existing
.get());
2349 ceph_assert(can_write
== WriteStatus::NOWRITE
);
2350 existing
->write_lock
.lock();
2352 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
2354 // reset the in_seq if this is a hard reset from peer,
2355 // otherwise we respect our original connection's value
2356 if (is_reset_from_peer
) {
2357 exproto
->is_reset_from_peer
= true;
2360 connection
->center
->delete_file_event(connection
->cs
.fd(),
2361 EVENT_READABLE
| EVENT_WRITABLE
);
2363 if (existing
->delay_state
) {
2364 existing
->delay_state
->flush();
2365 ceph_assert(!connection
->delay_state
);
2367 exproto
->reset_recv_state();
2369 exproto
->connect_msg
.features
= connect_msg
.features
;
2371 auto temp_cs
= std::move(connection
->cs
);
2372 EventCenter
*new_center
= connection
->center
;
2373 Worker
*new_worker
= connection
->worker
;
2374 // avoid _stop shutdown replacing socket
2375 // queue a reset on the new connection, which we're dumping for the old
2378 connection
->dispatch_queue
->queue_reset(connection
);
2379 ldout(messenger
->cct
, 1)
2380 << __func__
<< " stop myself to swap existing" << dendl
;
2381 exproto
->can_write
= WriteStatus::REPLACING
;
2382 exproto
->replacing
= true;
2383 exproto
->write_in_progress
= false;
2384 existing
->state_offset
= 0;
2385 // avoid previous thread modify event
2386 exproto
->state
= NONE
;
2387 existing
->state
= AsyncConnection::STATE_NONE
;
2388 // Discard existing prefetch buffer in `recv_buf`
2389 existing
->recv_start
= existing
->recv_end
= 0;
2390 // there shouldn't exist any buffer
2391 ceph_assert(connection
->recv_start
== connection
->recv_end
);
2393 auto deactivate_existing
= std::bind(
2394 [existing
, new_worker
, new_center
, exproto
, reply
,
2395 authorizer_reply
](ConnectedSocket
&cs
) mutable {
2396 // we need to delete time event in original thread
2398 std::lock_guard
<std::mutex
> l(existing
->lock
);
2399 existing
->write_lock
.lock();
2400 exproto
->requeue_sent();
2401 existing
->outgoing_bl
.clear();
2402 existing
->open_write
= false;
2403 existing
->write_lock
.unlock();
2404 if (exproto
->state
== NONE
) {
2405 existing
->shutdown_socket();
2406 existing
->cs
= std::move(cs
);
2407 existing
->worker
->references
--;
2408 new_worker
->references
++;
2409 existing
->logger
= new_worker
->get_perf_counter();
2410 existing
->worker
= new_worker
;
2411 existing
->center
= new_center
;
2412 if (existing
->delay_state
)
2413 existing
->delay_state
->set_center(new_center
);
2414 } else if (exproto
->state
== CLOSED
) {
2415 auto back_to_close
=
2416 std::bind([](ConnectedSocket
&cs
) mutable { cs
.close(); },
2418 new_center
->submit_to(new_center
->get_id(),
2419 std::move(back_to_close
), true);
2426 // Before changing existing->center, it may already exists some
2427 // events in existing->center's queue. Then if we mark down
2428 // `existing`, it will execute in another thread and clean up
2429 // connection. Previous event will result in segment fault
2430 auto transfer_existing
= [existing
, exproto
, reply
,
2431 authorizer_reply
]() mutable {
2432 std::lock_guard
<std::mutex
> l(existing
->lock
);
2433 if (exproto
->state
== CLOSED
) return;
2434 ceph_assert(exproto
->state
== NONE
);
2436 // we have called shutdown_socket above
2437 ceph_assert(existing
->last_tick_id
== 0);
2438 // restart timer since we are going to re-build connection
2439 existing
->last_connect_started
= ceph::coarse_mono_clock::now();
2440 existing
->last_tick_id
= existing
->center
->create_time_event(
2441 existing
->connect_timeout_us
, existing
->tick_handler
);
2442 existing
->state
= AsyncConnection::STATE_CONNECTION_ESTABLISHED
;
2443 exproto
->state
= ACCEPTING
;
2445 existing
->center
->create_file_event(
2446 existing
->cs
.fd(), EVENT_READABLE
, existing
->read_handler
);
2447 reply
.global_seq
= exproto
->peer_global_seq
;
2448 exproto
->run_continuation(exproto
->send_connect_message_reply(
2449 CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
, authorizer_reply
));
2451 if (existing
->center
->in_thread())
2452 transfer_existing();
2454 existing
->center
->submit_to(existing
->center
->get_id(),
2455 std::move(transfer_existing
), true);
2457 std::move(temp_cs
));
2459 existing
->center
->submit_to(existing
->center
->get_id(),
2460 std::move(deactivate_existing
), true);
2461 existing
->write_lock
.unlock();
2462 existing
->lock
.unlock();
2465 existing
->lock
.unlock();
2467 return open(reply
, authorizer_reply
);
2470 CtPtr
ProtocolV1::open(ceph_msg_connect_reply
&reply
,
2471 ceph::buffer::list
&authorizer_reply
) {
2472 ldout(cct
, 20) << __func__
<< dendl
;
2474 connect_seq
= connect_msg
.connect_seq
+ 1;
2475 peer_global_seq
= connect_msg
.global_seq
;
2476 ldout(cct
, 10) << __func__
<< " accept success, connect_seq = " << connect_seq
2477 << " in_seq=" << in_seq
<< ", sending READY" << dendl
;
2479 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2481 if ((connect_msg
.features
& CEPH_FEATURE_RECONNECT_SEQ
) &&
2482 !is_reset_from_peer
) {
2483 reply
.tag
= CEPH_MSGR_TAG_SEQ
;
2484 wait_for_seq
= true;
2486 reply
.tag
= CEPH_MSGR_TAG_READY
;
2487 wait_for_seq
= false;
2488 out_seq
= discard_requeued_up_to(out_seq
, 0);
2489 is_reset_from_peer
= false;
2494 reply
.features
= connection
->policy
.features_supported
;
2495 reply
.global_seq
= messenger
->get_global_seq();
2496 reply
.connect_seq
= connect_seq
;
2498 reply
.authorizer_len
= authorizer_reply
.length();
2499 if (connection
->policy
.lossy
) {
2500 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
2503 connection
->set_features((uint64_t)reply
.features
&
2504 (uint64_t)connect_msg
.features
);
2505 ldout(cct
, 10) << __func__
<< " accept features "
2506 << connection
->get_features()
2507 << " authorizer_protocol "
2508 << connect_msg
.authorizer_protocol
<< dendl
;
2510 session_security
.reset(
2511 get_auth_session_handler(cct
, auth_meta
->auth_method
,
2512 auth_meta
->session_key
,
2513 connection
->get_features()));
2515 ceph::buffer::list reply_bl
;
2516 reply_bl
.append((char *)&reply
, sizeof(reply
));
2518 if (reply
.authorizer_len
) {
2519 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2522 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
2523 uint64_t s
= in_seq
;
2524 reply_bl
.append((char *)&s
, sizeof(s
));
2527 connection
->lock
.unlock();
2528 // Because "replacing" will prevent other connections preempt this addr,
2529 // it's safe that here we don't acquire Connection's lock
2530 ssize_t r
= messenger
->accept_conn(connection
);
2532 connection
->inject_delay();
2534 connection
->lock
.lock();
2537 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2538 << connection
->peer_addrs
->legacy_addr()
2539 << " just fail later one(this)" << dendl
;
2540 ldout(cct
, 10) << "accept fault after register" << dendl
;
2541 connection
->inject_delay();
2544 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2545 ldout(cct
, 1) << __func__
2546 << " state changed while accept_conn, it must be mark_down"
2548 ceph_assert(state
== CLOSED
|| state
== NONE
);
2549 ldout(cct
, 10) << "accept fault after register" << dendl
;
2550 messenger
->unregister_conn(connection
);
2551 connection
->inject_delay();
2555 return WRITE(reply_bl
, handle_ready_connect_message_reply_write
);
2558 CtPtr
ProtocolV1::handle_ready_connect_message_reply_write(int r
) {
2559 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2562 ldout(cct
, 1) << __func__
<< " write ready connect message reply failed"
2568 connection
->dispatch_queue
->queue_accept(connection
);
2569 messenger
->ms_deliver_handle_fast_accept(connection
);
2572 state
= ACCEPTING_HANDLED_CONNECT_MSG
;
2578 return server_ready();
2581 CtPtr
ProtocolV1::wait_seq() {
2582 ldout(cct
, 20) << __func__
<< dendl
;
2584 return READ(sizeof(uint64_t), handle_seq
);
2587 CtPtr
ProtocolV1::handle_seq(char *buffer
, int r
) {
2588 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2591 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
2595 uint64_t newly_acked_seq
= *(uint64_t *)buffer
;
2596 ldout(cct
, 2) << __func__
<< " accept get newly_acked_seq " << newly_acked_seq
2598 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
2600 return server_ready();
2603 CtPtr
ProtocolV1::server_ready() {
2604 ldout(cct
, 20) << __func__
<< " session_security is "
2608 ldout(cct
, 20) << __func__
<< " accept done" << dendl
;
2609 // FIPS zeroization audit 20191115: this memset is not security related.
2610 memset(&connect_msg
, 0, sizeof(connect_msg
));
2612 if (connection
->delay_state
) {
2613 ceph_assert(connection
->delay_state
->ready());