1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV1.h"
6 #include "common/errno.h"
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
12 #include "auth/AuthClient.h"
13 #include "auth/AuthServer.h"
15 #define dout_subsys ceph_subsys_ms
17 #define dout_prefix _conn_prefix(_dout)
18 std::ostream
&ProtocolV1::_conn_prefix(std::ostream
*_dout
) {
19 return *_dout
<< "--1- " << messenger
->get_myaddrs() << " >> "
20 << *connection
->peer_addrs
22 << connection
<< " " << this
23 << " :" << connection
->port
<< " s=" << get_state_name(state
)
24 << " pgs=" << peer_global_seq
<< " cs=" << connect_seq
25 << " l=" << connection
->policy
.lossy
<< ").";
28 #define WRITE(B, C) write(CONTINUATION(C), B)
30 #define READ(L, C) read(CONTINUATION(C), L)
32 #define READB(L, B, C) read(CONTINUATION(C), L, B)
34 // Constant to limit starting sequence number to 2^31. Nothing special about
35 // it, just a big number. PLR
36 #define SEQ_MASK 0x7fffffff
38 const int ASYNC_COALESCE_THRESHOLD
= 256;
42 static void alloc_aligned_buffer(ceph::buffer::list
&data
, unsigned len
, unsigned off
) {
43 // create a buffer to read into that matches the data alignment
44 unsigned alloc_len
= 0;
47 if (off
& ~CEPH_PAGE_MASK
) {
49 alloc_len
+= CEPH_PAGE_SIZE
;
50 head
= std::min
<uint64_t>(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
54 ceph::bufferptr
ptr(ceph::buffer::create_small_page_aligned(alloc_len
));
55 if (head
) ptr
.set_offset(CEPH_PAGE_SIZE
- head
);
56 data
.push_back(std::move(ptr
));
63 ProtocolV1::ProtocolV1(AsyncConnection
*connection
)
64 : Protocol(1, connection
),
66 can_write(WriteStatus::NOWRITE
),
73 is_reset_from_peer(false),
78 temp_buffer
= new char[4096];
81 ProtocolV1::~ProtocolV1() {
82 ceph_assert(out_q
.empty());
83 ceph_assert(sent
.empty());
88 void ProtocolV1::connect() {
89 this->state
= START_CONNECT
;
91 // reset connect state variables
92 authorizer_buf
.clear();
93 // FIPS zeroization audit 20191115: these memsets are not security related.
94 memset(&connect_msg
, 0, sizeof(connect_msg
));
95 memset(&connect_reply
, 0, sizeof(connect_reply
));
97 global_seq
= messenger
->get_global_seq();
100 void ProtocolV1::accept() { this->state
= START_ACCEPT
; }
102 bool ProtocolV1::is_connected() {
103 return can_write
.load() == WriteStatus::CANWRITE
;
106 void ProtocolV1::stop() {
107 ldout(cct
, 20) << __func__
<< dendl
;
108 if (state
== CLOSED
) {
112 if (connection
->delay_state
) connection
->delay_state
->flush();
114 ldout(cct
, 2) << __func__
<< dendl
;
115 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
122 can_write
= WriteStatus::CLOSED
;
126 void ProtocolV1::fault() {
127 ldout(cct
, 20) << __func__
<< dendl
;
129 if (state
== CLOSED
|| state
== NONE
) {
130 ldout(cct
, 10) << __func__
<< " connection is already closed" << dendl
;
134 if (connection
->policy
.lossy
&& state
!= START_CONNECT
&&
135 state
!= CONNECTING
) {
136 ldout(cct
, 1) << __func__
<< " on lossy channel, failing" << dendl
;
138 connection
->dispatch_queue
->queue_reset(connection
);
142 connection
->write_lock
.lock();
143 can_write
= WriteStatus::NOWRITE
;
144 is_reset_from_peer
= false;
146 // requeue sent items
149 if (!once_ready
&& out_q
.empty() && state
>= START_ACCEPT
&&
150 state
<= ACCEPTING_WAIT_CONNECT_MSG_AUTH
&& !replacing
) {
151 ldout(cct
, 10) << __func__
<< " with nothing to send and in the half "
152 << " accept state just closed" << dendl
;
153 connection
->write_lock
.unlock();
155 connection
->dispatch_queue
->queue_reset(connection
);
164 if (connection
->policy
.standby
&& out_q
.empty() && !keepalive
&&
166 ldout(cct
, 10) << __func__
<< " with nothing to send, going to standby"
169 connection
->write_lock
.unlock();
173 connection
->write_lock
.unlock();
175 if ((state
>= START_CONNECT
&& state
<= CONNECTING_SEND_CONNECT_MSG
) ||
179 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
180 } else if (backoff
== utime_t()) {
181 backoff
.set_from_double(cct
->_conf
->ms_initial_backoff
);
184 if (backoff
> cct
->_conf
->ms_max_backoff
)
185 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
188 global_seq
= messenger
->get_global_seq();
189 state
= START_CONNECT
;
190 connection
->state
= AsyncConnection::STATE_CONNECTING
;
191 ldout(cct
, 10) << __func__
<< " waiting " << backoff
<< dendl
;
193 connection
->register_time_events
.insert(
194 connection
->center
->create_time_event(backoff
.to_nsec() / 1000,
195 connection
->wakeup_handler
));
197 // policy maybe empty when state is in accept
198 if (connection
->policy
.server
) {
199 ldout(cct
, 0) << __func__
<< " server, going to standby" << dendl
;
202 ldout(cct
, 0) << __func__
<< " initiating reconnect" << dendl
;
204 global_seq
= messenger
->get_global_seq();
205 state
= START_CONNECT
;
206 connection
->state
= AsyncConnection::STATE_CONNECTING
;
209 connection
->center
->dispatch_event_external(connection
->read_handler
);
213 void ProtocolV1::send_message(Message
*m
) {
214 ceph::buffer::list bl
;
215 uint64_t f
= connection
->get_features();
217 // TODO: Currently not all messages supports reencode like MOSDMap, so here
218 // only let fast dispatch support messages prepare message
219 bool can_fast_prepare
= messenger
->ms_can_fast_dispatch(m
);
220 if (can_fast_prepare
) {
221 prepare_send_message(f
, m
, bl
);
224 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
225 // "features" changes will change the payload encoding
226 if (can_fast_prepare
&&
227 (can_write
== WriteStatus::NOWRITE
|| connection
->get_features() != f
)) {
228 // ensure the correctness of message encoding
231 ldout(cct
, 5) << __func__
<< " clear encoded buffer previous " << f
232 << " != " << connection
->get_features() << dendl
;
234 if (can_write
== WriteStatus::CLOSED
) {
235 ldout(cct
, 10) << __func__
<< " connection closed."
236 << " Drop message " << m
<< dendl
;
239 m
->queue_start
= ceph::mono_clock::now();
240 m
->trace
.event("async enqueueing message");
241 out_q
[m
->get_priority()].emplace_back(std::move(bl
), m
);
242 ldout(cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
244 if (can_write
!= WriteStatus::REPLACING
&& !write_in_progress
) {
245 write_in_progress
= true;
246 connection
->center
->dispatch_event_external(connection
->write_handler
);
251 void ProtocolV1::prepare_send_message(uint64_t features
, Message
*m
,
252 ceph::buffer::list
&bl
) {
253 ldout(cct
, 20) << __func__
<< " m " << *m
<< dendl
;
255 // associate message with Connection (for benefit of encode_payload)
256 ldout(cct
, 20) << __func__
<< (m
->empty_payload() ? " encoding features " : " half-reencoding features ")
257 << features
<< " " << m
<< " " << *m
<< dendl
;
259 // encode and copy out of *m
260 // in write_message we update header.seq and need recalc crc
261 // so skip calc header in encode function.
262 m
->encode(features
, messenger
->crcflags
, true);
264 bl
.append(m
->get_payload());
265 bl
.append(m
->get_middle());
266 bl
.append(m
->get_data());
269 void ProtocolV1::send_keepalive() {
270 ldout(cct
, 10) << __func__
<< dendl
;
271 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
272 if (can_write
!= WriteStatus::CLOSED
) {
274 connection
->center
->dispatch_event_external(connection
->write_handler
);
278 void ProtocolV1::read_event() {
279 ldout(cct
, 20) << __func__
<< dendl
;
282 CONTINUATION_RUN(CONTINUATION(send_client_banner
));
285 CONTINUATION_RUN(CONTINUATION(send_server_banner
));
288 CONTINUATION_RUN(CONTINUATION(wait_message
));
290 case THROTTLE_MESSAGE
:
291 CONTINUATION_RUN(CONTINUATION(throttle_message
));
294 CONTINUATION_RUN(CONTINUATION(throttle_bytes
));
296 case THROTTLE_DISPATCH_QUEUE
:
297 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue
));
304 void ProtocolV1::write_event() {
305 ldout(cct
, 10) << __func__
<< dendl
;
308 connection
->write_lock
.lock();
309 if (can_write
== WriteStatus::CANWRITE
) {
311 append_keepalive_or_ack();
315 auto start
= ceph::mono_clock::now();
318 if (connection
->is_queued()) {
319 if (r
= connection
->_try_send(); r
!= 0) {
320 // either fails to send or not all queued buffer is sent
325 ceph::buffer::list data
;
326 Message
*m
= _get_next_outgoing(&data
);
331 if (!connection
->policy
.lossy
) {
336 more
= !out_q
.empty();
337 connection
->write_lock
.unlock();
339 // send_message or requeue messages may not encode message
340 if (!data
.length()) {
341 prepare_send_message(connection
->get_features(), m
, data
);
344 if (m
->queue_start
!= ceph::mono_time()) {
345 connection
->logger
->tinc(l_msgr_send_messages_queue_lat
,
346 ceph::mono_clock::now() - m
->queue_start
);
349 r
= write_message(m
, data
, more
);
351 connection
->write_lock
.lock();
355 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
358 // Outbound message in-progress, thread will be re-awoken
359 // when the outbound socket is writeable again
362 } while (can_write
== WriteStatus::CANWRITE
);
363 write_in_progress
= false;
364 connection
->write_lock
.unlock();
366 // if r > 0 mean data still lefted, so no need _try_send.
368 uint64_t left
= ack_left
;
372 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_ACK
);
373 connection
->outgoing_bl
.append((char *)&s
, sizeof(s
));
374 ldout(cct
, 10) << __func__
<< " try send msg ack, acked " << left
375 << " messages" << dendl
;
378 r
= connection
->_try_send(left
);
379 } else if (is_queued()) {
380 r
= connection
->_try_send();
384 connection
->logger
->tinc(l_msgr_running_send_time
,
385 ceph::mono_clock::now() - start
);
387 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
388 connection
->lock
.lock();
390 connection
->lock
.unlock();
394 write_in_progress
= false;
395 connection
->write_lock
.unlock();
396 connection
->lock
.lock();
397 connection
->write_lock
.lock();
398 if (state
== STANDBY
&& !connection
->policy
.server
&& is_queued()) {
399 ldout(cct
, 10) << __func__
<< " policy.server is false" << dendl
;
400 connection
->_connect();
401 } else if (connection
->cs
&& state
!= NONE
&& state
!= CLOSED
&&
402 state
!= START_CONNECT
) {
403 r
= connection
->_try_send();
405 ldout(cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
406 connection
->write_lock
.unlock();
408 connection
->lock
.unlock();
412 connection
->write_lock
.unlock();
413 connection
->lock
.unlock();
417 bool ProtocolV1::is_queued() {
418 return !out_q
.empty() || connection
->is_queued();
421 void ProtocolV1::run_continuation(CtPtr pcontinuation
) {
423 CONTINUATION_RUN(*pcontinuation
);
427 CtPtr
ProtocolV1::read(CONTINUATION_RX_TYPE
<ProtocolV1
> &next
,
428 int len
, char *buffer
) {
430 buffer
= temp_buffer
;
432 ssize_t r
= connection
->read(len
, buffer
,
433 [&next
, this](char *buffer
, int r
) {
434 next
.setParams(buffer
, r
);
435 CONTINUATION_RUN(next
);
438 next
.setParams(buffer
, r
);
445 CtPtr
ProtocolV1::write(CONTINUATION_TX_TYPE
<ProtocolV1
> &next
,
446 ceph::buffer::list
&buffer
) {
447 ssize_t r
= connection
->write(buffer
, [&next
, this](int r
) {
449 CONTINUATION_RUN(next
);
459 CtPtr
ProtocolV1::ready() {
460 ldout(cct
, 25) << __func__
<< dendl
;
462 // make sure no pending tick timer
463 if (connection
->last_tick_id
) {
464 connection
->center
->delete_time_event(connection
->last_tick_id
);
466 connection
->last_tick_id
= connection
->center
->create_time_event(
467 connection
->inactive_timeout_us
, connection
->tick_handler
);
469 connection
->write_lock
.lock();
470 can_write
= WriteStatus::CANWRITE
;
472 connection
->center
->dispatch_event_external(connection
->write_handler
);
474 connection
->write_lock
.unlock();
475 connection
->maybe_start_delay_thread();
478 return wait_message();
481 CtPtr
ProtocolV1::wait_message() {
482 if (state
!= OPENED
) { // must have changed due to a replace
486 ldout(cct
, 20) << __func__
<< dendl
;
488 return READ(sizeof(char), handle_message
);
491 CtPtr
ProtocolV1::handle_message(char *buffer
, int r
) {
492 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
495 ldout(cct
, 1) << __func__
<< " read tag failed" << dendl
;
499 char tag
= buffer
[0];
500 ldout(cct
, 20) << __func__
<< " process tag " << (int)tag
<< dendl
;
502 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
503 ldout(cct
, 20) << __func__
<< " got KEEPALIVE" << dendl
;
504 connection
->set_last_keepalive(ceph_clock_now());
505 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
506 return READ(sizeof(ceph_timespec
), handle_keepalive2
);
507 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
508 return READ(sizeof(ceph_timespec
), handle_keepalive2_ack
);
509 } else if (tag
== CEPH_MSGR_TAG_ACK
) {
510 return READ(sizeof(ceph_le64
), handle_tag_ack
);
511 } else if (tag
== CEPH_MSGR_TAG_MSG
) {
512 recv_stamp
= ceph_clock_now();
513 ldout(cct
, 20) << __func__
<< " begin MSG" << dendl
;
514 return READ(sizeof(ceph_msg_header
), handle_message_header
);
515 } else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
516 ldout(cct
, 20) << __func__
<< " got CLOSE" << dendl
;
519 ldout(cct
, 0) << __func__
<< " bad tag " << (int)tag
<< dendl
;
525 CtPtr
ProtocolV1::handle_keepalive2(char *buffer
, int r
) {
526 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
529 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
533 ldout(cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
536 t
= (ceph_timespec
*)buffer
;
537 utime_t kp_t
= utime_t(*t
);
538 connection
->write_lock
.lock();
539 append_keepalive_or_ack(true, &kp_t
);
540 connection
->write_lock
.unlock();
542 ldout(cct
, 20) << __func__
<< " got KEEPALIVE2 " << kp_t
<< dendl
;
543 connection
->set_last_keepalive(ceph_clock_now());
545 if (is_connected()) {
546 connection
->center
->dispatch_event_external(connection
->write_handler
);
549 return CONTINUE(wait_message
);
552 void ProtocolV1::append_keepalive_or_ack(bool ack
, utime_t
*tp
) {
553 ldout(cct
, 10) << __func__
<< dendl
;
556 struct ceph_timespec ts
;
557 tp
->encode_timeval(&ts
);
558 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK
);
559 connection
->outgoing_bl
.append((char *)&ts
, sizeof(ts
));
560 } else if (connection
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
561 struct ceph_timespec ts
;
562 utime_t t
= ceph_clock_now();
563 t
.encode_timeval(&ts
);
564 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2
);
565 connection
->outgoing_bl
.append((char *)&ts
, sizeof(ts
));
567 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_KEEPALIVE
);
571 CtPtr
ProtocolV1::handle_keepalive2_ack(char *buffer
, int r
) {
572 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
575 ldout(cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
580 t
= (ceph_timespec
*)buffer
;
581 connection
->set_last_keepalive_ack(utime_t(*t
));
582 ldout(cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
584 return CONTINUE(wait_message
);
587 CtPtr
ProtocolV1::handle_tag_ack(char *buffer
, int r
) {
588 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
591 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
596 seq
= *(ceph_le64
*)buffer
;
597 ldout(cct
, 20) << __func__
<< " got ACK" << dendl
;
599 ldout(cct
, 15) << __func__
<< " got ack seq " << seq
<< dendl
;
601 static const int max_pending
= 128;
603 auto now
= ceph::mono_clock::now();
604 Message
*pending
[max_pending
];
605 connection
->write_lock
.lock();
606 while (!sent
.empty() && sent
.front()->get_seq() <= seq
&& i
< max_pending
) {
607 Message
*m
= sent
.front();
610 ldout(cct
, 10) << __func__
<< " got ack seq " << seq
611 << " >= " << m
->get_seq() << " on " << m
<< " " << *m
614 connection
->write_lock
.unlock();
615 connection
->logger
->tinc(l_msgr_handle_ack_lat
, ceph::mono_clock::now() - now
);
616 for (int k
= 0; k
< i
; k
++) {
620 return CONTINUE(wait_message
);
623 CtPtr
ProtocolV1::handle_message_header(char *buffer
, int r
) {
624 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
627 ldout(cct
, 1) << __func__
<< " read message header failed" << dendl
;
631 ldout(cct
, 20) << __func__
<< " got MSG header" << dendl
;
633 current_header
= *((ceph_msg_header
*)buffer
);
635 ldout(cct
, 20) << __func__
<< " got envelope type=" << current_header
.type
<< " src "
636 << entity_name_t(current_header
.src
) << " front=" << current_header
.front_len
637 << " data=" << current_header
.data_len
<< " off " << current_header
.data_off
640 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
641 __u32 header_crc
= 0;
642 header_crc
= ceph_crc32c(0, (unsigned char *)¤t_header
,
643 sizeof(current_header
) - sizeof(current_header
.crc
));
645 if (header_crc
!= current_header
.crc
) {
646 ldout(cct
, 0) << __func__
<< " got bad header crc " << header_crc
647 << " != " << current_header
.crc
<< dendl
;
658 state
= THROTTLE_MESSAGE
;
659 return CONTINUE(throttle_message
);
662 CtPtr
ProtocolV1::throttle_message() {
663 ldout(cct
, 20) << __func__
<< dendl
;
665 if (connection
->policy
.throttler_messages
) {
666 ldout(cct
, 10) << __func__
<< " wants " << 1
667 << " message from policy throttler "
668 << connection
->policy
.throttler_messages
->get_current()
669 << "/" << connection
->policy
.throttler_messages
->get_max()
671 if (!connection
->policy
.throttler_messages
->get_or_fail()) {
672 ldout(cct
, 1) << __func__
<< " wants 1 message from policy throttle "
673 << connection
->policy
.throttler_messages
->get_current()
674 << "/" << connection
->policy
.throttler_messages
->get_max()
675 << " failed, just wait." << dendl
;
676 // following thread pool deal with th full message queue isn't a
677 // short time, so we can wait a ms.
678 if (connection
->register_time_events
.empty()) {
679 connection
->register_time_events
.insert(
680 connection
->center
->create_time_event(1000,
681 connection
->wakeup_handler
));
687 state
= THROTTLE_BYTES
;
688 return CONTINUE(throttle_bytes
);
691 CtPtr
ProtocolV1::throttle_bytes() {
692 ldout(cct
, 20) << __func__
<< dendl
;
694 cur_msg_size
= current_header
.front_len
+ current_header
.middle_len
+
695 current_header
.data_len
;
697 if (connection
->policy
.throttler_bytes
) {
698 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
699 << " bytes from policy throttler "
700 << connection
->policy
.throttler_bytes
->get_current() << "/"
701 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
702 if (!connection
->policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
703 ldout(cct
, 1) << __func__
<< " wants " << cur_msg_size
704 << " bytes from policy throttler "
705 << connection
->policy
.throttler_bytes
->get_current()
706 << "/" << connection
->policy
.throttler_bytes
->get_max()
707 << " failed, just wait." << dendl
;
708 // following thread pool deal with th full message queue isn't a
709 // short time, so we can wait a ms.
710 if (connection
->register_time_events
.empty()) {
711 connection
->register_time_events
.insert(
712 connection
->center
->create_time_event(
713 1000, connection
->wakeup_handler
));
720 state
= THROTTLE_DISPATCH_QUEUE
;
721 return CONTINUE(throttle_dispatch_queue
);
724 CtPtr
ProtocolV1::throttle_dispatch_queue() {
725 ldout(cct
, 20) << __func__
<< dendl
;
728 if (!connection
->dispatch_queue
->dispatch_throttler
.get_or_fail(
731 << __func__
<< " wants " << cur_msg_size
732 << " bytes from dispatch throttle "
733 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
734 << connection
->dispatch_queue
->dispatch_throttler
.get_max()
735 << " failed, just wait." << dendl
;
736 // following thread pool deal with th full message queue isn't a
737 // short time, so we can wait a ms.
738 if (connection
->register_time_events
.empty()) {
739 connection
->register_time_events
.insert(
740 connection
->center
->create_time_event(1000,
741 connection
->wakeup_handler
));
747 throttle_stamp
= ceph_clock_now();
749 state
= READ_MESSAGE_FRONT
;
750 return read_message_front();
753 CtPtr
ProtocolV1::read_message_front() {
754 ldout(cct
, 20) << __func__
<< dendl
;
756 unsigned front_len
= current_header
.front_len
;
758 if (!front
.length()) {
759 front
.push_back(ceph::buffer::create(front_len
));
761 return READB(front_len
, front
.c_str(), handle_message_front
);
763 return read_message_middle();
766 CtPtr
ProtocolV1::handle_message_front(char *buffer
, int r
) {
767 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
770 ldout(cct
, 1) << __func__
<< " read message front failed" << dendl
;
774 ldout(cct
, 20) << __func__
<< " got front " << front
.length() << dendl
;
776 return read_message_middle();
779 CtPtr
ProtocolV1::read_message_middle() {
780 ldout(cct
, 20) << __func__
<< dendl
;
782 if (current_header
.middle_len
) {
783 if (!middle
.length()) {
784 middle
.push_back(ceph::buffer::create(current_header
.middle_len
));
786 return READB(current_header
.middle_len
, middle
.c_str(),
787 handle_message_middle
);
790 return read_message_data_prepare();
793 CtPtr
ProtocolV1::handle_message_middle(char *buffer
, int r
) {
794 ldout(cct
, 20) << __func__
<< " r" << r
<< dendl
;
797 ldout(cct
, 1) << __func__
<< " read message middle failed" << dendl
;
801 ldout(cct
, 20) << __func__
<< " got middle " << middle
.length() << dendl
;
803 return read_message_data_prepare();
806 CtPtr
ProtocolV1::read_message_data_prepare() {
807 ldout(cct
, 20) << __func__
<< dendl
;
809 unsigned data_len
= current_header
.data_len
;
810 unsigned data_off
= current_header
.data_off
;
815 // rx_buffers is broken by design... see
816 // http://tracker.ceph.com/issues/22480
817 map
<ceph_tid_t
, pair
<ceph::buffer::list
, int> >::iterator p
=
818 connection
->rx_buffers
.find(current_header
.tid
);
819 if (p
!= connection
->rx_buffers
.end()) {
820 ldout(cct
, 10) << __func__
<< " seleting rx buffer v " << p
->second
.second
821 << " at offset " << data_off
<< " len "
822 << p
->second
.first
.length() << dendl
;
823 data_buf
= p
->second
.first
;
824 // make sure it's big enough
825 if (data_buf
.length() < data_len
)
826 data_buf
.push_back(buffer::create(data_len
- data_buf
.length()));
827 data_blp
= data_buf
.begin();
829 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
830 << data_off
<< dendl
;
831 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
832 data_blp
= data_buf
.begin();
835 ldout(cct
, 20) << __func__
<< " allocating new rx buffer at offset "
836 << data_off
<< dendl
;
837 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
838 data_blp
= data_buf
.begin();
844 return CONTINUE(read_message_data
);
847 CtPtr
ProtocolV1::read_message_data() {
848 ldout(cct
, 20) << __func__
<< " msg_left=" << msg_left
<< dendl
;
851 auto bp
= data_blp
.get_current_ptr();
852 unsigned read_len
= std::min(bp
.length(), msg_left
);
854 return READB(read_len
, bp
.c_str(), handle_message_data
);
857 return read_message_footer();
860 CtPtr
ProtocolV1::handle_message_data(char *buffer
, int r
) {
861 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
864 ldout(cct
, 1) << __func__
<< " read data error " << dendl
;
868 auto bp
= data_blp
.get_current_ptr();
869 unsigned read_len
= std::min(bp
.length(), msg_left
);
870 ceph_assert(read_len
<
871 static_cast<unsigned>(std::numeric_limits
<int>::max()));
872 data_blp
+= read_len
;
873 data
.append(bp
, 0, read_len
);
874 msg_left
-= read_len
;
876 return CONTINUE(read_message_data
);
879 CtPtr
ProtocolV1::read_message_footer() {
880 ldout(cct
, 20) << __func__
<< dendl
;
882 state
= READ_FOOTER_AND_DISPATCH
;
885 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
886 len
= sizeof(ceph_msg_footer
);
888 len
= sizeof(ceph_msg_footer_old
);
891 return READ(len
, handle_message_footer
);
894 CtPtr
ProtocolV1::handle_message_footer(char *buffer
, int r
) {
895 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
898 ldout(cct
, 1) << __func__
<< " read footer data error " << dendl
;
902 ceph_msg_footer footer
;
903 ceph_msg_footer_old old_footer
;
905 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
906 footer
= *((ceph_msg_footer
*)buffer
);
908 old_footer
= *((ceph_msg_footer_old
*)buffer
);
909 footer
.front_crc
= old_footer
.front_crc
;
910 footer
.middle_crc
= old_footer
.middle_crc
;
911 footer
.data_crc
= old_footer
.data_crc
;
913 footer
.flags
= old_footer
.flags
;
916 int aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
917 ldout(cct
, 10) << __func__
<< " aborted = " << aborted
<< dendl
;
919 ldout(cct
, 0) << __func__
<< " got " << front
.length() << " + "
920 << middle
.length() << " + " << data
.length()
921 << " byte message.. ABORTED" << dendl
;
925 ldout(cct
, 20) << __func__
<< " got " << front
.length() << " + "
926 << middle
.length() << " + " << data
.length() << " byte message"
928 Message
*message
= decode_message(cct
, messenger
->crcflags
, current_header
,
929 footer
, front
, middle
, data
, connection
);
931 ldout(cct
, 1) << __func__
<< " decode message failed " << dendl
;
936 // Check the signature if one should be present. A zero return indicates
940 if (session_security
.get() == NULL
) {
941 ldout(cct
, 10) << __func__
<< " no session security set" << dendl
;
943 if (session_security
->check_message_signature(message
)) {
944 ldout(cct
, 0) << __func__
<< " Signature check failed" << dendl
;
949 message
->set_byte_throttler(connection
->policy
.throttler_bytes
);
950 message
->set_message_throttler(connection
->policy
.throttler_messages
);
952 // store reservation size in message, so we don't get confused
953 // by messages entering the dispatch queue through other paths.
954 message
->set_dispatch_throttle_size(cur_msg_size
);
956 message
->set_recv_stamp(recv_stamp
);
957 message
->set_throttle_stamp(throttle_stamp
);
958 message
->set_recv_complete_stamp(ceph_clock_now());
960 // check received seq#. if it is old, drop the message.
961 // note that incoming messages may skip ahead. this is convenient for the
962 // client side queueing because messages can't be renumbered, but the (kernel)
963 // client will occasionally pull a message out of the sent queue to send
964 // elsewhere. in that case it doesn't matter if we "got" it or not.
965 uint64_t cur_seq
= in_seq
;
966 if (message
->get_seq() <= cur_seq
) {
967 ldout(cct
, 0) << __func__
<< " got old message " << message
->get_seq()
968 << " <= " << cur_seq
<< " " << message
<< " " << *message
969 << ", discarding" << dendl
;
971 if (connection
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
972 cct
->_conf
->ms_die_on_old_message
) {
973 ceph_assert(0 == "old msgs despite reconnect_seq feature");
977 if (message
->get_seq() > cur_seq
+ 1) {
978 ldout(cct
, 0) << __func__
<< " missed message? skipped from seq "
979 << cur_seq
<< " to " << message
->get_seq() << dendl
;
980 if (cct
->_conf
->ms_die_on_skipped_message
) {
981 ceph_assert(0 == "skipped incoming seq");
985 #if defined(WITH_EVENTTRACE)
986 if (message
->get_type() == CEPH_MSG_OSD_OP
||
987 message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
988 utime_t ltt_processed_stamp
= ceph_clock_now();
989 double usecs_elapsed
=
990 ((double)(ltt_processed_stamp
.to_nsec() - recv_stamp
.to_nsec())) / 1000;
992 if (message
->get_type() == CEPH_MSG_OSD_OP
)
993 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP",
996 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY",
1001 // note last received message.
1002 in_seq
= message
->get_seq();
1003 ldout(cct
, 5) << " rx " << message
->get_source() << " seq "
1004 << message
->get_seq() << " " << message
<< " " << *message
1007 bool need_dispatch_writer
= false;
1008 if (!connection
->policy
.lossy
) {
1010 need_dispatch_writer
= true;
1015 ceph::mono_time fast_dispatch_time
;
1017 if (connection
->is_blackhole()) {
1018 ldout(cct
, 10) << __func__
<< " blackhole " << *message
<< dendl
;
1023 connection
->logger
->inc(l_msgr_recv_messages
);
1024 connection
->logger
->inc(
1026 cur_msg_size
+ sizeof(ceph_msg_header
) + sizeof(ceph_msg_footer
));
1028 messenger
->ms_fast_preprocess(message
);
1029 fast_dispatch_time
= ceph::mono_clock::now();
1030 connection
->logger
->tinc(l_msgr_running_recv_time
,
1031 fast_dispatch_time
- connection
->recv_start_time
);
1032 if (connection
->delay_state
) {
1033 double delay_period
= 0;
1034 if (rand() % 10000 < cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1036 cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1037 ldout(cct
, 1) << "queue_received will delay after "
1038 << (ceph_clock_now() + delay_period
) << " on " << message
1039 << " " << *message
<< dendl
;
1041 connection
->delay_state
->queue(delay_period
, message
);
1042 } else if (messenger
->ms_can_fast_dispatch(message
)) {
1043 connection
->lock
.unlock();
1044 connection
->dispatch_queue
->fast_dispatch(message
);
1045 connection
->recv_start_time
= ceph::mono_clock::now();
1046 connection
->logger
->tinc(l_msgr_running_fast_dispatch_time
,
1047 connection
->recv_start_time
- fast_dispatch_time
);
1048 connection
->lock
.lock();
1050 connection
->dispatch_queue
->enqueue(message
, message
->get_priority(),
1051 connection
->conn_id
);
1055 // clean up local buffer references
1061 if (need_dispatch_writer
&& connection
->is_connected()) {
1062 connection
->center
->dispatch_event_external(connection
->write_handler
);
1065 return CONTINUE(wait_message
);
1068 void ProtocolV1::session_reset() {
1069 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1071 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1072 if (connection
->delay_state
) {
1073 connection
->delay_state
->discard();
1076 connection
->dispatch_queue
->discard_queue(connection
->conn_id
);
1077 discard_out_queue();
1078 // note: we need to clear outgoing_bl here, but session_reset may be
1079 // called by other thread, so let caller clear this itself!
1080 // outgoing_bl.clear();
1082 connection
->dispatch_queue
->queue_remote_reset(connection
);
1084 randomize_out_seq();
1088 // it's safe to directly set 0, double locked
1091 can_write
= WriteStatus::NOWRITE
;
1094 void ProtocolV1::randomize_out_seq() {
1095 if (connection
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1096 // Set out_seq to a random value, so CRC won't be predictable.
1097 auto rand_seq
= ceph::util::generate_random_number
<uint64_t>(0, SEQ_MASK
);
1098 ldout(cct
, 10) << __func__
<< " randomize_out_seq " << rand_seq
<< dendl
;
1101 // previously, seq #'s always started at 0.
1106 ssize_t
ProtocolV1::write_message(Message
*m
, ceph::buffer::list
&bl
, bool more
) {
1108 ceph_assert(connection
->center
->in_thread());
1109 m
->set_seq(++out_seq
);
1111 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1112 m
->calc_header_crc();
1115 ceph_msg_header
&header
= m
->get_header();
1116 ceph_msg_footer
&footer
= m
->get_footer();
1118 // TODO: let sign_message could be reentry?
1119 // Now that we have all the crcs calculated, handle the
1120 // digital signature for the message, if the AsyncConnection has session
1121 // security set up. Some session security options do not
1122 // actually calculate and check the signature, but they should
1123 // handle the calls to sign_message and check_signature. PLR
1124 if (session_security
.get() == NULL
) {
1125 ldout(cct
, 20) << __func__
<< " no session security" << dendl
;
1127 if (session_security
->sign_message(m
)) {
1128 ldout(cct
, 20) << __func__
<< " failed to sign m=" << m
1129 << "): sig = " << footer
.sig
<< dendl
;
1131 ldout(cct
, 20) << __func__
<< " signed m=" << m
1132 << "): sig = " << footer
.sig
<< dendl
;
1136 connection
->outgoing_bl
.append(CEPH_MSGR_TAG_MSG
);
1137 connection
->outgoing_bl
.append((char *)&header
, sizeof(header
));
1139 ldout(cct
, 20) << __func__
<< " sending message type=" << header
.type
1140 << " src " << entity_name_t(header
.src
)
1141 << " front=" << header
.front_len
<< " data=" << header
.data_len
1142 << " off " << header
.data_off
<< dendl
;
1144 if ((bl
.length() <= ASYNC_COALESCE_THRESHOLD
) && (bl
.get_num_buffers() > 1)) {
1145 for (const auto &pb
: bl
.buffers()) {
1146 connection
->outgoing_bl
.append((char *)pb
.c_str(), pb
.length());
1149 connection
->outgoing_bl
.claim_append(bl
);
1152 // send footer; if receiver doesn't support signatures, use the old footer
1154 ceph_msg_footer_old old_footer
;
1155 if (connection
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
1156 connection
->outgoing_bl
.append((char *)&footer
, sizeof(footer
));
1158 if (messenger
->crcflags
& MSG_CRC_HEADER
) {
1159 old_footer
.front_crc
= footer
.front_crc
;
1160 old_footer
.middle_crc
= footer
.middle_crc
;
1162 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
1164 old_footer
.data_crc
=
1165 messenger
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
1166 old_footer
.flags
= footer
.flags
;
1167 connection
->outgoing_bl
.append((char *)&old_footer
, sizeof(old_footer
));
1170 m
->trace
.event("async writing message");
1171 ldout(cct
, 20) << __func__
<< " sending " << m
->get_seq() << " " << m
1173 ssize_t total_send_size
= connection
->outgoing_bl
.length();
1174 ssize_t rc
= connection
->_try_send(more
);
1176 ldout(cct
, 1) << __func__
<< " error sending " << m
<< ", "
1177 << cpp_strerror(rc
) << dendl
;
1179 connection
->logger
->inc(
1180 l_msgr_send_bytes
, total_send_size
- connection
->outgoing_bl
.length());
1181 ldout(cct
, 10) << __func__
<< " sending " << m
1182 << (rc
? " continuely." : " done.") << dendl
;
1185 #if defined(WITH_EVENTTRACE)
1186 if (m
->get_type() == CEPH_MSG_OSD_OP
)
1187 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
1188 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
1189 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
1196 void ProtocolV1::requeue_sent() {
1197 write_in_progress
= false;
1202 list
<pair
<ceph::buffer::list
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1203 out_seq
-= sent
.size();
1204 while (!sent
.empty()) {
1205 Message
*m
= sent
.back();
1207 ldout(cct
, 10) << __func__
<< " " << *m
<< " for resend "
1208 << " (" << m
->get_seq() << ")" << dendl
;
1210 rq
.push_front(make_pair(ceph::buffer::list(), m
));
1214 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
) {
1215 ldout(cct
, 10) << __func__
<< " " << seq
<< dendl
;
1216 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1217 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
1220 list
<pair
<ceph::buffer::list
, Message
*> > &rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1221 uint64_t count
= out_seq
;
1222 while (!rq
.empty()) {
1223 pair
<ceph::buffer::list
, Message
*> p
= rq
.front();
1224 if (p
.second
->get_seq() == 0 || p
.second
->get_seq() > seq
) break;
1225 ldout(cct
, 10) << __func__
<< " " << *(p
.second
) << " for resend seq "
1226 << p
.second
->get_seq() << " <= " << seq
<< ", discarding"
1232 if (rq
.empty()) out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1237 * Tears down the message queues, and removes them from the
1238 * DispatchQueue Must hold write_lock prior to calling.
1240 void ProtocolV1::discard_out_queue() {
1241 ldout(cct
, 10) << __func__
<< " started" << dendl
;
1243 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1244 ldout(cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
1248 for (map
<int, list
<pair
<ceph::buffer::list
, Message
*> > >::iterator p
=
1250 p
!= out_q
.end(); ++p
) {
1251 for (list
<pair
<ceph::buffer::list
, Message
*> >::iterator r
= p
->second
.begin();
1252 r
!= p
->second
.end(); ++r
) {
1253 ldout(cct
, 20) << __func__
<< " discard " << r
->second
<< dendl
;
1258 write_in_progress
= false;
1261 void ProtocolV1::reset_security()
1263 ldout(cct
, 5) << __func__
<< dendl
;
1265 auth_meta
.reset(new AuthConnectionMeta
);
1266 authorizer_more
.clear();
1267 session_security
.reset();
1270 void ProtocolV1::reset_recv_state()
1272 ldout(cct
, 5) << __func__
<< dendl
;
1274 // execute in the same thread that uses the `session_security`.
1275 // We need to do the warp because holding `write_lock` is not
1276 // enough as `write_event()` releases it just before calling
1277 // `write_message()`. `submit_to()` here is NOT blocking.
1278 if (!connection
->center
->in_thread()) {
1279 connection
->center
->submit_to(connection
->center
->get_id(), [this] {
1280 ldout(cct
, 5) << "reset_recv_state (warped) reseting security handlers"
1282 // Possibly unnecessary. See the comment in `deactivate_existing`.
1283 std::lock_guard
<std::mutex
> l(connection
->lock
);
1284 std::lock_guard
<std::mutex
> wl(connection
->write_lock
);
1286 }, /* always_async = */true);
1291 // clean read and write callbacks
1292 connection
->pendingReadLen
.reset();
1293 connection
->writeCallback
.reset();
1295 if (state
> THROTTLE_MESSAGE
&& state
<= READ_FOOTER_AND_DISPATCH
&&
1296 connection
->policy
.throttler_messages
) {
1297 ldout(cct
, 10) << __func__
<< " releasing " << 1
1298 << " message to policy throttler "
1299 << connection
->policy
.throttler_messages
->get_current()
1300 << "/" << connection
->policy
.throttler_messages
->get_max()
1302 connection
->policy
.throttler_messages
->put();
1304 if (state
> THROTTLE_BYTES
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1305 if (connection
->policy
.throttler_bytes
) {
1306 ldout(cct
, 10) << __func__
<< " releasing " << cur_msg_size
1307 << " bytes to policy throttler "
1308 << connection
->policy
.throttler_bytes
->get_current() << "/"
1309 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
1310 connection
->policy
.throttler_bytes
->put(cur_msg_size
);
1313 if (state
> THROTTLE_DISPATCH_QUEUE
&& state
<= READ_FOOTER_AND_DISPATCH
) {
1315 << __func__
<< " releasing " << cur_msg_size
1316 << " bytes to dispatch_queue throttler "
1317 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
1318 << connection
->dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
1319 connection
->dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
1323 Message
*ProtocolV1::_get_next_outgoing(ceph::buffer::list
*bl
) {
1325 if (!out_q
.empty()) {
1326 map
<int, list
<pair
<ceph::buffer::list
, Message
*> > >::reverse_iterator it
=
1328 ceph_assert(!it
->second
.empty());
1329 list
<pair
<ceph::buffer::list
, Message
*> >::iterator p
= it
->second
.begin();
1331 if (p
->first
.length() && bl
) {
1332 assert(bl
->length() == 0);
1335 it
->second
.erase(p
);
1336 if (it
->second
.empty()) out_q
.erase(it
->first
);
1342 * Client Protocol V1
1345 CtPtr
ProtocolV1::send_client_banner() {
1346 ldout(cct
, 20) << __func__
<< dendl
;
1349 ceph::buffer::list bl
;
1350 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1351 return WRITE(bl
, handle_client_banner_write
);
1354 CtPtr
ProtocolV1::handle_client_banner_write(int r
) {
1355 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1358 ldout(cct
, 1) << __func__
<< " write client banner failed" << dendl
;
1361 ldout(cct
, 10) << __func__
<< " connect write banner done: "
1362 << connection
->get_peer_addr() << dendl
;
1364 return wait_server_banner();
1367 CtPtr
ProtocolV1::wait_server_banner() {
1368 state
= CONNECTING_WAIT_BANNER_AND_IDENTIFY
;
1370 ldout(cct
, 20) << __func__
<< dendl
;
1372 ceph::buffer::list myaddrbl
;
1373 unsigned banner_len
= strlen(CEPH_BANNER
);
1374 unsigned need_len
= banner_len
+ sizeof(ceph_entity_addr
) * 2;
1375 return READ(need_len
, handle_server_banner_and_identify
);
1378 CtPtr
ProtocolV1::handle_server_banner_and_identify(char *buffer
, int r
) {
1379 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1382 ldout(cct
, 1) << __func__
<< " read banner and identify addresses failed"
1387 unsigned banner_len
= strlen(CEPH_BANNER
);
1388 if (memcmp(buffer
, CEPH_BANNER
, banner_len
)) {
1389 ldout(cct
, 0) << __func__
<< " connect protocol error (bad banner) on peer "
1390 << connection
->get_peer_addr() << dendl
;
1394 ceph::buffer::list bl
;
1395 entity_addr_t paddr
, peer_addr_for_me
;
1397 bl
.append(buffer
+ banner_len
, sizeof(ceph_entity_addr
) * 2);
1398 auto p
= bl
.cbegin();
1401 decode(peer_addr_for_me
, p
);
1402 } catch (const ceph::buffer::error
&e
) {
1403 lderr(cct
) << __func__
<< " decode peer addr failed " << dendl
;
1406 ldout(cct
, 20) << __func__
<< " connect read peer addr " << paddr
1407 << " on socket " << connection
->cs
.fd() << dendl
;
1409 entity_addr_t peer_addr
= connection
->peer_addrs
->legacy_addr();
1410 if (peer_addr
!= paddr
) {
1411 if (paddr
.is_blank_ip() && peer_addr
.get_port() == paddr
.get_port() &&
1412 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1413 ldout(cct
, 0) << __func__
<< " connect claims to be " << paddr
<< " not "
1414 << peer_addr
<< " - presumably this is the same node!"
1417 ldout(cct
, 10) << __func__
<< " connect claims to be " << paddr
<< " not "
1418 << peer_addr
<< dendl
;
1423 ldout(cct
, 20) << __func__
<< " connect peer addr for me is "
1424 << peer_addr_for_me
<< dendl
;
1425 if (messenger
->get_myaddrs().empty() ||
1426 messenger
->get_myaddrs().front().is_blank_ip()) {
1427 sockaddr_storage ss
;
1428 socklen_t len
= sizeof(ss
);
1429 getsockname(connection
->cs
.fd(), (sockaddr
*)&ss
, &len
);
1431 if (cct
->_conf
->ms_learn_addr_from_peer
) {
1432 ldout(cct
, 1) << __func__
<< " peer " << connection
->target_addr
1433 << " says I am " << peer_addr_for_me
<< " (socket says "
1434 << (sockaddr
*)&ss
<< ")" << dendl
;
1435 a
= peer_addr_for_me
;
1437 ldout(cct
, 1) << __func__
<< " socket to " << connection
->target_addr
1438 << " says I am " << (sockaddr
*)&ss
1439 << " (peer says " << peer_addr_for_me
<< ")" << dendl
;
1440 a
.set_sockaddr((sockaddr
*)&ss
);
1442 a
.set_type(entity_addr_t::TYPE_LEGACY
); // anything but NONE; learned_addr ignores this
1444 connection
->lock
.unlock();
1445 messenger
->learned_addr(a
);
1446 if (cct
->_conf
->ms_inject_internal_delays
&&
1447 cct
->_conf
->ms_inject_socket_failures
) {
1448 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
1449 ldout(cct
, 10) << __func__
<< " sleep for "
1450 << cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1452 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
1456 connection
->lock
.lock();
1457 if (state
!= CONNECTING_WAIT_BANNER_AND_IDENTIFY
) {
1458 ldout(cct
, 1) << __func__
1459 << " state changed while learned_addr, mark_down or "
1460 << " replacing must be happened just now" << dendl
;
1465 ceph::buffer::list myaddrbl
;
1466 encode(messenger
->get_myaddr_legacy(), myaddrbl
, 0); // legacy
1467 return WRITE(myaddrbl
, handle_my_addr_write
);
1470 CtPtr
ProtocolV1::handle_my_addr_write(int r
) {
1471 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1474 ldout(cct
, 2) << __func__
<< " connect couldn't write my addr, "
1475 << cpp_strerror(r
) << dendl
;
1478 ldout(cct
, 10) << __func__
<< " connect sent my addr "
1479 << messenger
->get_myaddr_legacy() << dendl
;
1481 return CONTINUE(send_connect_message
);
1484 CtPtr
ProtocolV1::send_connect_message()
1486 state
= CONNECTING_SEND_CONNECT_MSG
;
1488 ldout(cct
, 20) << __func__
<< dendl
;
1489 ceph_assert(messenger
->auth_client
);
1491 ceph::buffer::list auth_bl
;
1492 vector
<uint32_t> preferred_modes
;
1494 if (connection
->peer_type
!= CEPH_ENTITY_TYPE_MON
||
1495 messenger
->get_myname().type() == CEPH_ENTITY_TYPE_MON
) {
1496 if (authorizer_more
.length()) {
1497 ldout(cct
,10) << __func__
<< " using augmented (challenge) auth payload"
1499 auth_bl
= authorizer_more
;
1501 auto am
= auth_meta
;
1502 authorizer_more
.clear();
1503 connection
->lock
.unlock();
1504 int r
= messenger
->auth_client
->get_auth_request(
1505 connection
, am
.get(),
1506 &am
->auth_method
, &preferred_modes
, &auth_bl
);
1507 connection
->lock
.lock();
1511 if (state
!= CONNECTING_SEND_CONNECT_MSG
) {
1512 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1518 ceph_msg_connect connect
;
1519 connect
.features
= connection
->policy
.features_supported
;
1520 connect
.host_type
= messenger
->get_myname().type();
1521 connect
.global_seq
= global_seq
;
1522 connect
.connect_seq
= connect_seq
;
1523 connect
.protocol_version
=
1524 messenger
->get_proto_version(connection
->peer_type
, true);
1525 if (auth_bl
.length()) {
1526 ldout(cct
, 10) << __func__
1527 << " connect_msg.authorizer_len=" << auth_bl
.length()
1528 << " protocol=" << auth_meta
->auth_method
<< dendl
;
1529 connect
.authorizer_protocol
= auth_meta
->auth_method
;
1530 connect
.authorizer_len
= auth_bl
.length();
1532 connect
.authorizer_protocol
= 0;
1533 connect
.authorizer_len
= 0;
1537 if (connection
->policy
.lossy
) {
1539 CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1542 ceph::buffer::list bl
;
1543 bl
.append((char *)&connect
, sizeof(connect
));
1544 if (auth_bl
.length()) {
1545 bl
.append(auth_bl
.c_str(), auth_bl
.length());
1548 ldout(cct
, 10) << __func__
<< " connect sending gseq=" << global_seq
1549 << " cseq=" << connect_seq
1550 << " proto=" << connect
.protocol_version
<< dendl
;
1552 return WRITE(bl
, handle_connect_message_write
);
1555 CtPtr
ProtocolV1::handle_connect_message_write(int r
) {
1556 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1559 ldout(cct
, 2) << __func__
<< " connect couldn't send reply "
1560 << cpp_strerror(r
) << dendl
;
1564 ldout(cct
, 20) << __func__
1565 << " connect wrote (self +) cseq, waiting for reply" << dendl
;
1567 return wait_connect_reply();
1570 CtPtr
ProtocolV1::wait_connect_reply() {
1571 ldout(cct
, 20) << __func__
<< dendl
;
1573 // FIPS zeroization audit 20191115: this memset is not security related.
1574 memset(&connect_reply
, 0, sizeof(connect_reply
));
1575 return READ(sizeof(connect_reply
), handle_connect_reply_1
);
1578 CtPtr
ProtocolV1::handle_connect_reply_1(char *buffer
, int r
) {
1579 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1582 ldout(cct
, 1) << __func__
<< " read connect reply failed" << dendl
;
1586 connect_reply
= *((ceph_msg_connect_reply
*)buffer
);
1588 ldout(cct
, 20) << __func__
<< " connect got reply tag "
1589 << (int)connect_reply
.tag
<< " connect_seq "
1590 << connect_reply
.connect_seq
<< " global_seq "
1591 << connect_reply
.global_seq
<< " proto "
1592 << connect_reply
.protocol_version
<< " flags "
1593 << (int)connect_reply
.flags
<< " features "
1594 << connect_reply
.features
<< dendl
;
1596 if (connect_reply
.authorizer_len
) {
1597 return wait_connect_reply_auth();
1600 return handle_connect_reply_2();
1603 CtPtr
ProtocolV1::wait_connect_reply_auth() {
1604 ldout(cct
, 20) << __func__
<< dendl
;
1606 ldout(cct
, 10) << __func__
1607 << " reply.authorizer_len=" << connect_reply
.authorizer_len
1610 ceph_assert(connect_reply
.authorizer_len
< 4096);
1612 return READ(connect_reply
.authorizer_len
, handle_connect_reply_auth
);
1615 CtPtr
ProtocolV1::handle_connect_reply_auth(char *buffer
, int r
) {
1616 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1619 ldout(cct
, 1) << __func__
<< " read connect reply authorizer failed"
1624 ceph::buffer::list authorizer_reply
;
1625 authorizer_reply
.append(buffer
, connect_reply
.authorizer_len
);
1627 if (connection
->peer_type
!= CEPH_ENTITY_TYPE_MON
||
1628 messenger
->get_myname().type() == CEPH_ENTITY_TYPE_MON
) {
1629 auto am
= auth_meta
;
1630 bool more
= (connect_reply
.tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
);
1631 ceph::buffer::list auth_retry_bl
;
1633 connection
->lock
.unlock();
1635 r
= messenger
->auth_client
->handle_auth_reply_more(
1636 connection
, am
.get(), authorizer_reply
, &auth_retry_bl
);
1638 // these aren't used for v1
1641 r
= messenger
->auth_client
->handle_auth_done(
1642 connection
, am
.get(),
1643 0 /* global id */, 0 /* con mode */,
1645 &skey
, &con_secret
);
1647 connection
->lock
.lock();
1648 if (state
!= CONNECTING_SEND_CONNECT_MSG
) {
1649 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
1655 if (more
&& r
== 0) {
1656 authorizer_more
= auth_retry_bl
;
1657 return CONTINUE(send_connect_message
);
1661 return handle_connect_reply_2();
1664 CtPtr
ProtocolV1::handle_connect_reply_2() {
1665 ldout(cct
, 20) << __func__
<< dendl
;
1667 if (connect_reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1668 ldout(cct
, 0) << __func__
<< " connect protocol feature mismatch, my "
1669 << std::hex
<< connection
->policy
.features_supported
1670 << " < peer " << connect_reply
.features
<< " missing "
1671 << (connect_reply
.features
&
1672 ~connection
->policy
.features_supported
)
1673 << std::dec
<< dendl
;
1677 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1678 ldout(cct
, 0) << __func__
<< " connect protocol version mismatch, my "
1679 << messenger
->get_proto_version(connection
->peer_type
, true)
1680 << " != " << connect_reply
.protocol_version
<< dendl
;
1684 if (connect_reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1685 ldout(cct
, 0) << __func__
<< " connect got BADAUTHORIZER" << dendl
;
1686 authorizer_more
.clear();
1690 if (connect_reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1691 ldout(cct
, 0) << __func__
<< " connect got RESETSESSION" << dendl
;
1695 // see session_reset
1696 connection
->outgoing_bl
.clear();
1698 return CONTINUE(send_connect_message
);
1701 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1702 global_seq
= messenger
->get_global_seq(connect_reply
.global_seq
);
1703 ldout(cct
, 5) << __func__
<< " connect got RETRY_GLOBAL "
1704 << connect_reply
.global_seq
<< " chose new " << global_seq
1706 return CONTINUE(send_connect_message
);
1709 if (connect_reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1710 ceph_assert(connect_reply
.connect_seq
> connect_seq
);
1711 ldout(cct
, 5) << __func__
<< " connect got RETRY_SESSION " << connect_seq
1712 << " -> " << connect_reply
.connect_seq
<< dendl
;
1713 connect_seq
= connect_reply
.connect_seq
;
1714 return CONTINUE(send_connect_message
);
1717 if (connect_reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1718 ldout(cct
, 1) << __func__
<< " connect got WAIT (connection race)" << dendl
;
1723 uint64_t feat_missing
;
1725 connection
->policy
.features_required
& ~(uint64_t)connect_reply
.features
;
1727 ldout(cct
, 1) << __func__
<< " missing required features " << std::hex
1728 << feat_missing
<< std::dec
<< dendl
;
1732 if (connect_reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1735 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1738 return wait_ack_seq();
1741 if (connect_reply
.tag
== CEPH_MSGR_TAG_READY
) {
1742 ldout(cct
, 10) << __func__
<< " got CEPH_MSGR_TAG_READY " << dendl
;
1745 return client_ready();
1748 CtPtr
ProtocolV1::wait_ack_seq() {
1749 ldout(cct
, 20) << __func__
<< dendl
;
1751 return READ(sizeof(uint64_t), handle_ack_seq
);
1754 CtPtr
ProtocolV1::handle_ack_seq(char *buffer
, int r
) {
1755 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1758 ldout(cct
, 1) << __func__
<< " read connect ack seq failed" << dendl
;
1762 uint64_t newly_acked_seq
= 0;
1764 newly_acked_seq
= *((uint64_t *)buffer
);
1765 ldout(cct
, 2) << __func__
<< " got newly_acked_seq " << newly_acked_seq
1766 << " vs out_seq " << out_seq
<< dendl
;
1767 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
1769 ceph::buffer::list bl
;
1770 uint64_t s
= in_seq
;
1771 bl
.append((char *)&s
, sizeof(s
));
1773 return WRITE(bl
, handle_in_seq_write
);
1776 CtPtr
ProtocolV1::handle_in_seq_write(int r
) {
1777 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1780 ldout(cct
, 10) << __func__
<< " failed to send in_seq " << dendl
;
1784 ldout(cct
, 10) << __func__
<< " send in_seq done " << dendl
;
1786 return client_ready();
1789 CtPtr
ProtocolV1::client_ready() {
1790 ldout(cct
, 20) << __func__
<< dendl
;
1793 peer_global_seq
= connect_reply
.global_seq
;
1794 connection
->policy
.lossy
= connect_reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1798 ceph_assert(connect_seq
== connect_reply
.connect_seq
);
1799 backoff
= utime_t();
1800 connection
->set_features((uint64_t)connect_reply
.features
&
1801 (uint64_t)connection
->policy
.features_supported
);
1802 ldout(cct
, 10) << __func__
<< " connect success " << connect_seq
1803 << ", lossy = " << connection
->policy
.lossy
<< ", features "
1804 << connection
->get_features() << dendl
;
1806 // If we have an authorizer, get a new AuthSessionHandler to deal with
1807 // ongoing security of the connection. PLR
1808 if (auth_meta
->authorizer
) {
1809 ldout(cct
, 10) << __func__
<< " setting up session_security with auth "
1810 << auth_meta
->authorizer
.get() << dendl
;
1811 session_security
.reset(get_auth_session_handler(
1812 cct
, auth_meta
->authorizer
->protocol
,
1813 auth_meta
->session_key
,
1814 connection
->get_features()));
1816 // We have no authorizer, so we shouldn't be applying security to messages
1817 // in this AsyncConnection. PLR
1818 ldout(cct
, 10) << __func__
<< " no authorizer, clearing session_security"
1820 session_security
.reset();
1823 if (connection
->delay_state
) {
1824 ceph_assert(connection
->delay_state
->ready());
1826 connection
->dispatch_queue
->queue_connect(connection
);
1827 messenger
->ms_deliver_handle_fast_connect(connection
);
1833 * Server Protocol V1
1836 CtPtr
ProtocolV1::send_server_banner() {
1837 ldout(cct
, 20) << __func__
<< dendl
;
1840 ceph::buffer::list bl
;
1842 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1844 // as a server, we should have a legacy addr if we accepted this connection.
1845 auto legacy
= messenger
->get_myaddrs().legacy_addr();
1846 encode(legacy
, bl
, 0); // legacy
1847 connection
->port
= legacy
.get_port();
1848 encode(connection
->target_addr
, bl
, 0); // legacy
1850 ldout(cct
, 1) << __func__
<< " sd=" << connection
->cs
.fd()
1851 << " legacy " << legacy
1852 << " socket_addr " << connection
->socket_addr
1853 << " target_addr " << connection
->target_addr
1856 return WRITE(bl
, handle_server_banner_write
);
1859 CtPtr
ProtocolV1::handle_server_banner_write(int r
) {
1860 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1863 ldout(cct
, 1) << " write server banner failed" << dendl
;
1866 ldout(cct
, 10) << __func__
<< " write banner and addr done: "
1867 << connection
->get_peer_addr() << dendl
;
1869 return wait_client_banner();
1872 CtPtr
ProtocolV1::wait_client_banner() {
1873 ldout(cct
, 20) << __func__
<< dendl
;
1875 return READ(strlen(CEPH_BANNER
) + sizeof(ceph_entity_addr
),
1876 handle_client_banner
);
1879 CtPtr
ProtocolV1::handle_client_banner(char *buffer
, int r
) {
1880 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1883 ldout(cct
, 1) << __func__
<< " read peer banner and addr failed" << dendl
;
1887 if (memcmp(buffer
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1888 ldout(cct
, 1) << __func__
<< " accept peer sent bad banner '" << buffer
1889 << "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
1893 ceph::buffer::list addr_bl
;
1894 entity_addr_t peer_addr
;
1896 addr_bl
.append(buffer
+ strlen(CEPH_BANNER
), sizeof(ceph_entity_addr
));
1898 auto ti
= addr_bl
.cbegin();
1899 decode(peer_addr
, ti
);
1900 } catch (const ceph::buffer::error
&e
) {
1901 lderr(cct
) << __func__
<< " decode peer_addr failed " << dendl
;
1905 ldout(cct
, 10) << __func__
<< " accept peer addr is " << peer_addr
<< dendl
;
1906 if (peer_addr
.is_blank_ip()) {
1907 // peer apparently doesn't know what ip they have; figure it out for them.
1908 int port
= peer_addr
.get_port();
1909 peer_addr
.set_sockaddr(connection
->target_addr
.get_sockaddr());
1910 peer_addr
.set_port(port
);
1912 ldout(cct
, 0) << __func__
<< " accept peer addr is really " << peer_addr
1913 << " (socket is " << connection
->target_addr
<< ")" << dendl
;
1915 connection
->set_peer_addr(peer_addr
); // so that connection_state gets set up
1916 connection
->target_addr
= peer_addr
;
1918 return CONTINUE(wait_connect_message
);
1921 CtPtr
ProtocolV1::wait_connect_message() {
1922 ldout(cct
, 20) << __func__
<< dendl
;
1924 // FIPS zeroization audit 20191115: this memset is not security related.
1925 memset(&connect_msg
, 0, sizeof(connect_msg
));
1926 return READ(sizeof(connect_msg
), handle_connect_message_1
);
1929 CtPtr
ProtocolV1::handle_connect_message_1(char *buffer
, int r
) {
1930 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1933 ldout(cct
, 1) << __func__
<< " read connect msg failed" << dendl
;
1937 connect_msg
= *((ceph_msg_connect
*)buffer
);
1939 state
= ACCEPTING_WAIT_CONNECT_MSG_AUTH
;
1941 if (connect_msg
.authorizer_len
) {
1942 return wait_connect_message_auth();
1945 return handle_connect_message_2();
1948 CtPtr
ProtocolV1::wait_connect_message_auth() {
1949 ldout(cct
, 20) << __func__
<< dendl
;
1950 authorizer_buf
.clear();
1951 authorizer_buf
.push_back(ceph::buffer::create(connect_msg
.authorizer_len
));
1952 return READB(connect_msg
.authorizer_len
, authorizer_buf
.c_str(),
1953 handle_connect_message_auth
);
1956 CtPtr
ProtocolV1::handle_connect_message_auth(char *buffer
, int r
) {
1957 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1960 ldout(cct
, 1) << __func__
<< " read connect authorizer failed" << dendl
;
1964 return handle_connect_message_2();
1967 CtPtr
ProtocolV1::handle_connect_message_2() {
1968 ldout(cct
, 20) << __func__
<< dendl
;
1970 ldout(cct
, 20) << __func__
<< " accept got peer connect_seq "
1971 << connect_msg
.connect_seq
<< " global_seq "
1972 << connect_msg
.global_seq
<< dendl
;
1974 connection
->set_peer_type(connect_msg
.host_type
);
1975 connection
->policy
= messenger
->get_policy(connect_msg
.host_type
);
1977 ldout(cct
, 10) << __func__
<< " accept of host_type " << connect_msg
.host_type
1978 << ", policy.lossy=" << connection
->policy
.lossy
1979 << " policy.server=" << connection
->policy
.server
1980 << " policy.standby=" << connection
->policy
.standby
1981 << " policy.resetcheck=" << connection
->policy
.resetcheck
1982 << " features 0x" << std::hex
<< (uint64_t)connect_msg
.features
1986 ceph_msg_connect_reply reply
;
1987 ceph::buffer::list authorizer_reply
;
1989 // FIPS zeroization audit 20191115: this memset is not security related.
1990 memset(&reply
, 0, sizeof(reply
));
1991 reply
.protocol_version
=
1992 messenger
->get_proto_version(connection
->peer_type
, false);
1995 ldout(cct
, 10) << __func__
<< " accept my proto " << reply
.protocol_version
1996 << ", their proto " << connect_msg
.protocol_version
<< dendl
;
1998 if (connect_msg
.protocol_version
!= reply
.protocol_version
) {
1999 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER
, reply
,
2003 // require signatures for cephx?
2004 if (connect_msg
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
2005 if (connection
->peer_type
== CEPH_ENTITY_TYPE_OSD
||
2006 connection
->peer_type
== CEPH_ENTITY_TYPE_MDS
||
2007 connection
->peer_type
== CEPH_ENTITY_TYPE_MGR
) {
2008 if (cct
->_conf
->cephx_require_signatures
||
2009 cct
->_conf
->cephx_cluster_require_signatures
) {
2012 << " using cephx, requiring MSG_AUTH feature bit for cluster"
2014 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
2016 if (cct
->_conf
->cephx_require_version
>= 2 ||
2017 cct
->_conf
->cephx_cluster_require_version
>= 2) {
2020 << " using cephx, requiring cephx v2 feature bit for cluster"
2022 connection
->policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
2025 if (cct
->_conf
->cephx_require_signatures
||
2026 cct
->_conf
->cephx_service_require_signatures
) {
2029 << " using cephx, requiring MSG_AUTH feature bit for service"
2031 connection
->policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
2033 if (cct
->_conf
->cephx_require_version
>= 2 ||
2034 cct
->_conf
->cephx_service_require_version
>= 2) {
2037 << " using cephx, requiring cephx v2 feature bit for service"
2039 connection
->policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
2044 uint64_t feat_missing
=
2045 connection
->policy
.features_required
& ~(uint64_t)connect_msg
.features
;
2047 ldout(cct
, 1) << __func__
<< " peer missing required features " << std::hex
2048 << feat_missing
<< std::dec
<< dendl
;
2049 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES
, reply
,
2053 ceph::buffer::list auth_bl_copy
= authorizer_buf
;
2054 auto am
= auth_meta
;
2055 am
->auth_method
= connect_msg
.authorizer_protocol
;
2056 if (!HAVE_FEATURE((uint64_t)connect_msg
.features
, CEPHX_V2
)) {
2057 // peer doesn't support it and we won't get here if we require it
2058 am
->skip_authorizer_challenge
= true;
2060 connection
->lock
.unlock();
2061 ldout(cct
,10) << __func__
<< " authorizor_protocol "
2062 << connect_msg
.authorizer_protocol
2063 << " len " << auth_bl_copy
.length()
2065 bool more
= (bool)auth_meta
->authorizer_challenge
;
2066 int r
= messenger
->auth_server
->handle_auth_request(
2074 connection
->lock
.lock();
2075 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2076 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
2079 ldout(cct
, 0) << __func__
<< ": got bad authorizer, auth_reply_len="
2080 << authorizer_reply
.length() << dendl
;
2081 session_security
.reset();
2082 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER
, reply
,
2086 connection
->lock
.lock();
2087 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2088 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
2091 ldout(cct
, 10) << __func__
<< ": challenging authorizer" << dendl
;
2092 ceph_assert(authorizer_reply
.length());
2093 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
,
2094 reply
, authorizer_reply
);
2097 // We've verified the authorizer for this AsyncConnection, so set up the
2098 // session security structure. PLR
2099 ldout(cct
, 10) << __func__
<< " accept setting up session_security." << dendl
;
2101 if (connection
->policy
.server
&&
2102 connection
->policy
.lossy
&&
2103 !connection
->policy
.register_lossy_clients
) {
2104 // incoming lossy client, no need to register this connection
2106 ldout(cct
, 10) << __func__
<< " accept new session" << dendl
;
2107 connection
->lock
.lock();
2108 return open(reply
, authorizer_reply
);
2111 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
2113 connection
->inject_delay();
2115 connection
->lock
.lock();
2116 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2117 ldout(cct
, 1) << __func__
<< " state changed" << dendl
;
2121 if (existing
== connection
) {
2124 if (existing
&& existing
->protocol
->proto_type
!= 1) {
2125 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2126 << existing
->protocol
.get() << " version is "
2127 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
2128 existing
->mark_down();
2133 // There is no possible that existing connection will acquire this
2134 // connection's lock
2135 existing
->lock
.lock(); // skip lockdep check (we are locking a second
2136 // AsyncConnection here)
2138 ldout(cct
,10) << __func__
<< " existing=" << existing
<< " exproto="
2139 << existing
->protocol
.get() << dendl
;
2140 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
2141 ceph_assert(exproto
);
2142 ceph_assert(exproto
->proto_type
== 1);
2144 if (exproto
->state
== CLOSED
) {
2145 ldout(cct
, 1) << __func__
<< " existing " << existing
2146 << " already closed." << dendl
;
2147 existing
->lock
.unlock();
2150 return open(reply
, authorizer_reply
);
2153 if (exproto
->replacing
) {
2154 ldout(cct
, 1) << __func__
2155 << " existing racing replace happened while replacing."
2156 << " existing_state="
2157 << connection
->get_state_name(existing
->state
) << dendl
;
2158 reply
.global_seq
= exproto
->peer_global_seq
;
2159 existing
->lock
.unlock();
2160 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2164 if (connect_msg
.global_seq
< exproto
->peer_global_seq
) {
2165 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2166 << exproto
->peer_global_seq
<< " > "
2167 << connect_msg
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
2168 reply
.global_seq
= exproto
->peer_global_seq
; // so we can send it below..
2169 existing
->lock
.unlock();
2170 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
,
2173 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".gseq "
2174 << exproto
->peer_global_seq
2175 << " <= " << connect_msg
.global_seq
<< ", looks ok"
2179 if (existing
->policy
.lossy
) {
2182 << " accept replacing existing (lossy) channel (new one lossy="
2183 << connection
->policy
.lossy
<< ")" << dendl
;
2184 exproto
->session_reset();
2185 return replace(existing
, reply
, authorizer_reply
);
2188 ldout(cct
, 1) << __func__
<< " accept connect_seq "
2189 << connect_msg
.connect_seq
2190 << " vs existing csq=" << exproto
->connect_seq
2191 << " existing_state="
2192 << connection
->get_state_name(existing
->state
) << dendl
;
2194 if (connect_msg
.connect_seq
== 0 && exproto
->connect_seq
> 0) {
2197 << " accept peer reset, then tried to connect to us, replacing"
2199 // this is a hard reset from peer
2200 is_reset_from_peer
= true;
2201 if (connection
->policy
.resetcheck
) {
2202 exproto
->session_reset(); // this resets out_queue, msg_ and
2205 return replace(existing
, reply
, authorizer_reply
);
2208 if (connect_msg
.connect_seq
< exproto
->connect_seq
) {
2209 // old attempt, or we sent READY but they didn't get it.
2210 ldout(cct
, 10) << __func__
<< " accept existing " << existing
<< ".cseq "
2211 << exproto
->connect_seq
<< " > " << connect_msg
.connect_seq
2212 << ", RETRY_SESSION" << dendl
;
2213 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2214 existing
->lock
.unlock();
2215 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2219 if (connect_msg
.connect_seq
== exproto
->connect_seq
) {
2220 // if the existing connection successfully opened, and/or
2221 // subsequently went to standby, then the peer should bump
2222 // their connect_seq and retry: this is not a connection race
2223 // we need to resolve here.
2224 if (exproto
->state
== OPENED
|| exproto
->state
== STANDBY
) {
2225 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2226 << existing
<< ".cseq " << exproto
->connect_seq
2227 << " == " << connect_msg
.connect_seq
2228 << ", OPEN|STANDBY, RETRY_SESSION " << dendl
;
2229 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2231 if (connection
->policy
.resetcheck
&& exproto
->connect_seq
== 0) {
2232 return replace(existing
, reply
, authorizer_reply
);
2235 reply
.connect_seq
= exproto
->connect_seq
+ 1;
2236 existing
->lock
.unlock();
2237 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION
, reply
,
2242 if (connection
->peer_addrs
->legacy_addr() < messenger
->get_myaddr_legacy() ||
2243 existing
->policy
.server
) {
2245 ldout(cct
, 10) << __func__
<< " accept connection race, existing "
2246 << existing
<< ".cseq " << exproto
->connect_seq
2247 << " == " << connect_msg
.connect_seq
2248 << ", or we are server, replacing my attempt" << dendl
;
2249 return replace(existing
, reply
, authorizer_reply
);
2251 // our existing outgoing wins
2252 ldout(messenger
->cct
, 10)
2253 << __func__
<< " accept connection race, existing " << existing
2254 << ".cseq " << exproto
->connect_seq
2255 << " == " << connect_msg
.connect_seq
<< ", sending WAIT" << dendl
;
2256 ceph_assert(connection
->peer_addrs
->legacy_addr() >
2257 messenger
->get_myaddr_legacy());
2258 existing
->lock
.unlock();
2259 // make sure we follow through with opening the existing
2260 // connection (if it isn't yet open) since we know the peer
2261 // has something to send to us.
2262 existing
->send_keepalive();
2263 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT
, reply
,
2268 ceph_assert(connect_msg
.connect_seq
> exproto
->connect_seq
);
2269 ceph_assert(connect_msg
.global_seq
>= exproto
->peer_global_seq
);
2270 if (connection
->policy
.resetcheck
&& // RESETSESSION only used by servers;
2271 // peers do not reset each other
2272 exproto
->connect_seq
== 0) {
2273 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2274 << connect_msg
.connect_seq
<< ", " << existing
2275 << ".cseq = " << exproto
->connect_seq
2276 << "), sending RESETSESSION " << dendl
;
2277 existing
->lock
.unlock();
2278 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2283 ldout(cct
, 10) << __func__
<< " accept peer sent cseq "
2284 << connect_msg
.connect_seq
<< " > " << exproto
->connect_seq
2286 return replace(existing
, reply
, authorizer_reply
);
2288 else if (!replacing
&& connect_msg
.connect_seq
> 0) {
2289 // we reset, and they are opening a new session
2290 ldout(cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
2291 << connect_msg
.connect_seq
<< "), sending RESETSESSION"
2293 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION
, reply
,
2297 ldout(cct
, 10) << __func__
<< " accept new session" << dendl
;
2299 return open(reply
, authorizer_reply
);
2303 CtPtr
ProtocolV1::send_connect_message_reply(char tag
,
2304 ceph_msg_connect_reply
&reply
,
2305 ceph::buffer::list
&authorizer_reply
) {
2306 ldout(cct
, 20) << __func__
<< dendl
;
2307 ceph::buffer::list reply_bl
;
2310 ((uint64_t)connect_msg
.features
& connection
->policy
.features_supported
) |
2311 connection
->policy
.features_required
;
2312 reply
.authorizer_len
= authorizer_reply
.length();
2313 reply_bl
.append((char *)&reply
, sizeof(reply
));
2315 ldout(cct
, 10) << __func__
<< " reply features 0x" << std::hex
2316 << reply
.features
<< " = (policy sup 0x"
2317 << connection
->policy
.features_supported
2318 << " & connect 0x" << (uint64_t)connect_msg
.features
2319 << ") | policy req 0x"
2320 << connection
->policy
.features_required
2323 if (reply
.authorizer_len
) {
2324 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2325 authorizer_reply
.clear();
2328 return WRITE(reply_bl
, handle_connect_message_reply_write
);
2331 CtPtr
ProtocolV1::handle_connect_message_reply_write(int r
) {
2332 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2335 ldout(cct
, 1) << " write connect message reply failed" << dendl
;
2336 connection
->inject_delay();
2340 return CONTINUE(wait_connect_message
);
2343 CtPtr
ProtocolV1::replace(const AsyncConnectionRef
& existing
,
2344 ceph_msg_connect_reply
&reply
,
2345 ceph::buffer::list
&authorizer_reply
) {
2346 ldout(cct
, 10) << __func__
<< " accept replacing " << existing
<< dendl
;
2348 connection
->inject_delay();
2349 if (existing
->policy
.lossy
) {
2350 // disconnect from the Connection
2351 ldout(cct
, 1) << __func__
<< " replacing on lossy channel, failing existing"
2353 existing
->protocol
->stop();
2354 existing
->dispatch_queue
->queue_reset(existing
.get());
2356 ceph_assert(can_write
== WriteStatus::NOWRITE
);
2357 existing
->write_lock
.lock();
2359 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
2361 // reset the in_seq if this is a hard reset from peer,
2362 // otherwise we respect our original connection's value
2363 if (is_reset_from_peer
) {
2364 exproto
->is_reset_from_peer
= true;
2367 connection
->center
->delete_file_event(connection
->cs
.fd(),
2368 EVENT_READABLE
| EVENT_WRITABLE
);
2370 if (existing
->delay_state
) {
2371 existing
->delay_state
->flush();
2372 ceph_assert(!connection
->delay_state
);
2374 exproto
->reset_recv_state();
2376 exproto
->connect_msg
.features
= connect_msg
.features
;
2378 auto temp_cs
= std::move(connection
->cs
);
2379 EventCenter
*new_center
= connection
->center
;
2380 Worker
*new_worker
= connection
->worker
;
2381 // avoid _stop shutdown replacing socket
2382 // queue a reset on the new connection, which we're dumping for the old
2385 connection
->dispatch_queue
->queue_reset(connection
);
2386 ldout(messenger
->cct
, 1)
2387 << __func__
<< " stop myself to swap existing" << dendl
;
2388 exproto
->can_write
= WriteStatus::REPLACING
;
2389 exproto
->replacing
= true;
2390 exproto
->write_in_progress
= false;
2391 existing
->state_offset
= 0;
2392 // avoid previous thread modify event
2393 exproto
->state
= NONE
;
2394 existing
->state
= AsyncConnection::STATE_NONE
;
2395 // Discard existing prefetch buffer in `recv_buf`
2396 existing
->recv_start
= existing
->recv_end
= 0;
2397 // there shouldn't exist any buffer
2398 ceph_assert(connection
->recv_start
== connection
->recv_end
);
2400 auto deactivate_existing
= std::bind(
2401 [existing
, new_worker
, new_center
, exproto
, reply
,
2402 authorizer_reply
](ConnectedSocket
&cs
) mutable {
2403 // we need to delete time event in original thread
2405 std::lock_guard
<std::mutex
> l(existing
->lock
);
2406 existing
->write_lock
.lock();
2407 exproto
->requeue_sent();
2408 existing
->outgoing_bl
.clear();
2409 existing
->open_write
= false;
2410 existing
->write_lock
.unlock();
2411 if (exproto
->state
== NONE
) {
2412 existing
->shutdown_socket();
2413 existing
->cs
= std::move(cs
);
2414 existing
->worker
->references
--;
2415 new_worker
->references
++;
2416 existing
->logger
= new_worker
->get_perf_counter();
2417 existing
->worker
= new_worker
;
2418 existing
->center
= new_center
;
2419 if (existing
->delay_state
)
2420 existing
->delay_state
->set_center(new_center
);
2421 } else if (exproto
->state
== CLOSED
) {
2422 auto back_to_close
=
2423 std::bind([](ConnectedSocket
&cs
) mutable { cs
.close(); },
2425 new_center
->submit_to(new_center
->get_id(),
2426 std::move(back_to_close
), true);
2433 // Before changing existing->center, it may already exists some
2434 // events in existing->center's queue. Then if we mark down
2435 // `existing`, it will execute in another thread and clean up
2436 // connection. Previous event will result in segment fault
2437 auto transfer_existing
= [existing
, exproto
, reply
,
2438 authorizer_reply
]() mutable {
2439 std::lock_guard
<std::mutex
> l(existing
->lock
);
2440 if (exproto
->state
== CLOSED
) return;
2441 ceph_assert(exproto
->state
== NONE
);
2443 // we have called shutdown_socket above
2444 ceph_assert(existing
->last_tick_id
== 0);
2445 // restart timer since we are going to re-build connection
2446 existing
->last_connect_started
= ceph::coarse_mono_clock::now();
2447 existing
->last_tick_id
= existing
->center
->create_time_event(
2448 existing
->connect_timeout_us
, existing
->tick_handler
);
2449 existing
->state
= AsyncConnection::STATE_CONNECTION_ESTABLISHED
;
2450 exproto
->state
= ACCEPTING
;
2452 existing
->center
->create_file_event(
2453 existing
->cs
.fd(), EVENT_READABLE
, existing
->read_handler
);
2454 reply
.global_seq
= exproto
->peer_global_seq
;
2455 exproto
->run_continuation(exproto
->send_connect_message_reply(
2456 CEPH_MSGR_TAG_RETRY_GLOBAL
, reply
, authorizer_reply
));
2458 if (existing
->center
->in_thread())
2459 transfer_existing();
2461 existing
->center
->submit_to(existing
->center
->get_id(),
2462 std::move(transfer_existing
), true);
2464 std::move(temp_cs
));
2466 existing
->center
->submit_to(existing
->center
->get_id(),
2467 std::move(deactivate_existing
), true);
2468 existing
->write_lock
.unlock();
2469 existing
->lock
.unlock();
2472 existing
->lock
.unlock();
2474 return open(reply
, authorizer_reply
);
2477 CtPtr
ProtocolV1::open(ceph_msg_connect_reply
&reply
,
2478 ceph::buffer::list
&authorizer_reply
) {
2479 ldout(cct
, 20) << __func__
<< dendl
;
2481 connect_seq
= connect_msg
.connect_seq
+ 1;
2482 peer_global_seq
= connect_msg
.global_seq
;
2483 ldout(cct
, 10) << __func__
<< " accept success, connect_seq = " << connect_seq
2484 << " in_seq=" << in_seq
<< ", sending READY" << dendl
;
2486 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2488 if ((connect_msg
.features
& CEPH_FEATURE_RECONNECT_SEQ
) &&
2489 !is_reset_from_peer
) {
2490 reply
.tag
= CEPH_MSGR_TAG_SEQ
;
2491 wait_for_seq
= true;
2493 reply
.tag
= CEPH_MSGR_TAG_READY
;
2494 wait_for_seq
= false;
2495 out_seq
= discard_requeued_up_to(out_seq
, 0);
2496 is_reset_from_peer
= false;
2501 reply
.features
= connection
->policy
.features_supported
;
2502 reply
.global_seq
= messenger
->get_global_seq();
2503 reply
.connect_seq
= connect_seq
;
2505 reply
.authorizer_len
= authorizer_reply
.length();
2506 if (connection
->policy
.lossy
) {
2507 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
2510 connection
->set_features((uint64_t)reply
.features
&
2511 (uint64_t)connect_msg
.features
);
2512 ldout(cct
, 10) << __func__
<< " accept features "
2513 << connection
->get_features()
2514 << " authorizer_protocol "
2515 << connect_msg
.authorizer_protocol
<< dendl
;
2517 session_security
.reset(
2518 get_auth_session_handler(cct
, auth_meta
->auth_method
,
2519 auth_meta
->session_key
,
2520 connection
->get_features()));
2522 ceph::buffer::list reply_bl
;
2523 reply_bl
.append((char *)&reply
, sizeof(reply
));
2525 if (reply
.authorizer_len
) {
2526 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
2529 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
2530 uint64_t s
= in_seq
;
2531 reply_bl
.append((char *)&s
, sizeof(s
));
2534 connection
->lock
.unlock();
2535 // Because "replacing" will prevent other connections preempt this addr,
2536 // it's safe that here we don't acquire Connection's lock
2537 ssize_t r
= messenger
->accept_conn(connection
);
2539 connection
->inject_delay();
2541 connection
->lock
.lock();
2544 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2545 << connection
->peer_addrs
->legacy_addr()
2546 << " just fail later one(this)" << dendl
;
2547 ldout(cct
, 10) << "accept fault after register" << dendl
;
2548 connection
->inject_delay();
2551 if (state
!= ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2552 ldout(cct
, 1) << __func__
2553 << " state changed while accept_conn, it must be mark_down"
2555 ceph_assert(state
== CLOSED
|| state
== NONE
);
2556 ldout(cct
, 10) << "accept fault after register" << dendl
;
2557 messenger
->unregister_conn(connection
);
2558 connection
->inject_delay();
2562 return WRITE(reply_bl
, handle_ready_connect_message_reply_write
);
2565 CtPtr
ProtocolV1::handle_ready_connect_message_reply_write(int r
) {
2566 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2569 ldout(cct
, 1) << __func__
<< " write ready connect message reply failed"
2575 connection
->dispatch_queue
->queue_accept(connection
);
2576 messenger
->ms_deliver_handle_fast_accept(connection
);
2579 state
= ACCEPTING_HANDLED_CONNECT_MSG
;
2585 return server_ready();
2588 CtPtr
ProtocolV1::wait_seq() {
2589 ldout(cct
, 20) << __func__
<< dendl
;
2591 return READ(sizeof(uint64_t), handle_seq
);
2594 CtPtr
ProtocolV1::handle_seq(char *buffer
, int r
) {
2595 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
2598 ldout(cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
2602 uint64_t newly_acked_seq
= *(uint64_t *)buffer
;
2603 ldout(cct
, 2) << __func__
<< " accept get newly_acked_seq " << newly_acked_seq
2605 out_seq
= discard_requeued_up_to(out_seq
, newly_acked_seq
);
2607 return server_ready();
2610 CtPtr
ProtocolV1::server_ready() {
2611 ldout(cct
, 20) << __func__
<< " session_security is "
2615 ldout(cct
, 20) << __func__
<< " accept done" << dendl
;
2616 // FIPS zeroization audit 20191115: this memset is not security related.
2617 memset(&connect_msg
, 0, sizeof(connect_msg
));
2619 if (connection
->delay_state
) {
2620 ceph_assert(connection
->delay_state
->ready());