1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "ProtocolV2.h"
7 #include "AsyncMessenger.h"
9 #include "common/EventTrace.h"
10 #include "common/ceph_crypto.h"
11 #include "common/errno.h"
12 #include "include/random.h"
13 #include "auth/AuthClient.h"
14 #include "auth/AuthServer.h"
16 #define dout_subsys ceph_subsys_ms
18 #define dout_prefix _conn_prefix(_dout)
19 ostream
&ProtocolV2::_conn_prefix(std::ostream
*_dout
) {
20 return *_dout
<< "--2- " << messenger
->get_myaddrs() << " >> "
21 << *connection
->peer_addrs
<< " conn(" << connection
<< " "
23 << " " << ceph_con_mode_name(auth_meta
->con_mode
)
24 << " :" << connection
->port
25 << " s=" << get_state_name(state
) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq
<< " l=" << connection
->policy
.lossy
27 << " rx=" << session_stream_handlers
.rx
.get()
28 << " tx=" << session_stream_handlers
.tx
.get()
32 using namespace ceph::msgr::v2
;
34 using CtPtr
= Ct
<ProtocolV2
> *;
35 using CtRef
= Ct
<ProtocolV2
> &;
37 void ProtocolV2::run_continuation(CtPtr pcontinuation
) {
39 run_continuation(*pcontinuation
);
43 void ProtocolV2::run_continuation(CtRef continuation
) {
45 CONTINUATION_RUN(continuation
)
46 } catch (const buffer::error
&e
) {
47 lderr(cct
) << __func__
<< " failed decoding of frame header: " << e
50 } catch (const ceph::crypto::onwire::MsgAuthError
&e
) {
51 lderr(cct
) << __func__
<< " " << e
.what() << dendl
;
53 } catch (const DecryptionError
&) {
54 lderr(cct
) << __func__
<< " failed to decrypt frame payload" << dendl
;
58 #define WRITE(B, D, C) write(D, CONTINUATION(C), B)
60 #define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
62 #define READ_RXBUF(B, C) read(CONTINUATION(C), B)
64 #ifdef UNIT_TESTS_BUILT
66 #define INTERCEPT(S) { \
67 if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
71 } else if (a == Interceptor::ACTION::STOP) { \
73 connection->dispatch_queue->queue_reset(connection); \
81 ProtocolV2::ProtocolV2(AsyncConnection
*connection
)
82 : Protocol(2, connection
),
84 peer_required_features(0),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag
>(0)),
99 ProtocolV2::~ProtocolV2() {
102 void ProtocolV2::connect() {
103 ldout(cct
, 1) << __func__
<< dendl
;
104 state
= START_CONNECT
;
105 pre_auth
.enabled
= true;
108 void ProtocolV2::accept() {
109 ldout(cct
, 1) << __func__
<< dendl
;
110 state
= START_ACCEPT
;
113 bool ProtocolV2::is_connected() { return can_write
; }
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
119 void ProtocolV2::discard_out_queue() {
120 ldout(cct
, 10) << __func__
<< " started" << dendl
;
122 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
123 ldout(cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
127 for (auto& [ prio
, entries
] : out_queue
) {
128 static_cast<void>(prio
);
129 for (auto& entry
: entries
) {
130 ldout(cct
, 20) << __func__
<< " discard " << *entry
.m
<< dendl
;
137 void ProtocolV2::reset_session() {
138 ldout(cct
, 1) << __func__
<< dendl
;
140 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
141 if (connection
->delay_state
) {
142 connection
->delay_state
->discard();
145 connection
->dispatch_queue
->discard_queue(connection
->conn_id
);
147 connection
->outcoming_bl
.clear();
149 connection
->dispatch_queue
->queue_remote_reset(connection
);
162 void ProtocolV2::stop() {
163 ldout(cct
, 1) << __func__
<< dendl
;
164 if (state
== CLOSED
) {
168 if (connection
->delay_state
) connection
->delay_state
->flush();
170 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
181 void ProtocolV2::fault() { _fault(); }
183 void ProtocolV2::requeue_sent() {
188 auto& rq
= out_queue
[CEPH_MSG_PRIO_HIGHEST
];
189 out_seq
-= sent
.size();
190 while (!sent
.empty()) {
191 Message
*m
= sent
.back();
193 ldout(cct
, 5) << __func__
<< " requeueing message m=" << m
194 << " seq=" << m
->get_seq() << " type=" << m
->get_type() << " "
196 rq
.emplace_front(out_queue_entry_t
{false, m
});
200 uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
) {
201 ldout(cct
, 10) << __func__
<< " " << seq
<< dendl
;
202 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
203 if (out_queue
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
206 auto& rq
= out_queue
[CEPH_MSG_PRIO_HIGHEST
];
207 uint64_t count
= out_seq
;
208 while (!rq
.empty()) {
209 Message
* const m
= rq
.front().m
;
210 if (m
->get_seq() == 0 || m
->get_seq() > seq
) break;
211 ldout(cct
, 5) << __func__
<< " discarding message m=" << m
212 << " seq=" << m
->get_seq() << " ack_seq=" << seq
<< " "
218 if (rq
.empty()) out_queue
.erase(CEPH_MSG_PRIO_HIGHEST
);
222 void ProtocolV2::reset_recv_state() {
223 if ((state
>= AUTH_CONNECTING
&& state
<= SESSION_RECONNECTING
) ||
225 auth_meta
.reset(new AuthConnectionMeta
);
226 session_stream_handlers
.tx
.reset(nullptr);
227 session_stream_handlers
.rx
.reset(nullptr);
228 pre_auth
.txbuf
.clear();
229 pre_auth
.rxbuf
.clear();
232 // clean read and write callbacks
233 connection
->pendingReadLen
.reset();
234 connection
->writeCallback
.reset();
236 next_tag
= static_cast<Tag
>(0);
241 size_t ProtocolV2::get_current_msg_size() const {
242 ceph_assert(!rx_segments_desc
.empty());
244 // we don't include SegmentIndex::Msg::HEADER.
245 for (__u8 idx
= 1; idx
< rx_segments_desc
.size(); idx
++) {
246 sum
+= rx_segments_desc
[idx
].length
;
251 void ProtocolV2::reset_throttle() {
252 if (state
> THROTTLE_MESSAGE
&& state
<= THROTTLE_DONE
&&
253 connection
->policy
.throttler_messages
) {
254 ldout(cct
, 10) << __func__
<< " releasing " << 1
255 << " message to policy throttler "
256 << connection
->policy
.throttler_messages
->get_current()
257 << "/" << connection
->policy
.throttler_messages
->get_max()
259 connection
->policy
.throttler_messages
->put();
261 if (state
> THROTTLE_BYTES
&& state
<= THROTTLE_DONE
) {
262 if (connection
->policy
.throttler_bytes
) {
263 const size_t cur_msg_size
= get_current_msg_size();
264 ldout(cct
, 10) << __func__
<< " releasing " << cur_msg_size
265 << " bytes to policy throttler "
266 << connection
->policy
.throttler_bytes
->get_current() << "/"
267 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
268 connection
->policy
.throttler_bytes
->put(cur_msg_size
);
271 if (state
> THROTTLE_DISPATCH_QUEUE
&& state
<= THROTTLE_DONE
) {
272 const size_t cur_msg_size
= get_current_msg_size();
274 << __func__
<< " releasing " << cur_msg_size
275 << " bytes to dispatch_queue throttler "
276 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
277 << connection
->dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
278 connection
->dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
282 CtPtr
ProtocolV2::_fault() {
283 ldout(cct
, 10) << __func__
<< dendl
;
285 if (state
== CLOSED
|| state
== NONE
) {
286 ldout(cct
, 10) << __func__
<< " connection is already closed" << dendl
;
290 if (connection
->policy
.lossy
&&
291 !(state
>= START_CONNECT
&& state
<= SESSION_RECONNECTING
)) {
292 ldout(cct
, 2) << __func__
<< " on lossy channel, failing" << dendl
;
294 connection
->dispatch_queue
->queue_reset(connection
);
298 connection
->write_lock
.lock();
301 // requeue sent items
304 if (out_queue
.empty() && state
>= START_ACCEPT
&&
305 state
<= SESSION_ACCEPTING
&& !replacing
) {
306 ldout(cct
, 2) << __func__
<< " with nothing to send and in the half "
307 << " accept state just closed" << dendl
;
308 connection
->write_lock
.unlock();
310 connection
->dispatch_queue
->queue_reset(connection
);
318 reconnecting
= false;
320 if (connection
->policy
.standby
&& out_queue
.empty() && !keepalive
&&
322 ldout(cct
, 1) << __func__
<< " with nothing to send, going to standby"
325 connection
->write_lock
.unlock();
328 if (connection
->policy
.server
) {
329 ldout(cct
, 1) << __func__
<< " server, going to standby, even though i have stuff queued" << dendl
;
331 connection
->write_lock
.unlock();
335 connection
->write_lock
.unlock();
337 if (!(state
>= START_CONNECT
&& state
<= SESSION_RECONNECTING
) &&
339 state
!= SESSION_ACCEPTING
/* due to connection race */) {
340 // policy maybe empty when state is in accept
341 if (connection
->policy
.server
) {
342 ldout(cct
, 1) << __func__
<< " server, going to standby" << dendl
;
345 ldout(cct
, 1) << __func__
<< " initiating reconnect" << dendl
;
347 global_seq
= messenger
->get_global_seq();
348 state
= START_CONNECT
;
349 pre_auth
.enabled
= true;
350 connection
->state
= AsyncConnection::STATE_CONNECTING
;
353 connection
->center
->dispatch_event_external(connection
->read_handler
);
356 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
357 } else if (backoff
== utime_t()) {
358 backoff
.set_from_double(cct
->_conf
->ms_initial_backoff
);
361 if (backoff
> cct
->_conf
->ms_max_backoff
)
362 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
369 global_seq
= messenger
->get_global_seq();
370 state
= START_CONNECT
;
371 pre_auth
.enabled
= true;
372 connection
->state
= AsyncConnection::STATE_CONNECTING
;
373 ldout(cct
, 1) << __func__
<< " waiting " << backoff
<< dendl
;
375 connection
->register_time_events
.insert(
376 connection
->center
->create_time_event(backoff
.to_nsec() / 1000,
377 connection
->wakeup_handler
));
382 void ProtocolV2::prepare_send_message(uint64_t features
,
384 ldout(cct
, 20) << __func__
<< " m=" << *m
<< dendl
;
386 // associate message with Connection (for benefit of encode_payload)
387 if (m
->empty_payload()) {
388 ldout(cct
, 20) << __func__
<< " encoding features " << features
<< " " << m
389 << " " << *m
<< dendl
;
391 ldout(cct
, 20) << __func__
<< " half-reencoding features " << features
392 << " " << m
<< " " << *m
<< dendl
;
395 // encode and copy out of *m
396 m
->encode(features
, 0);
399 void ProtocolV2::send_message(Message
*m
) {
400 uint64_t f
= connection
->get_features();
402 // TODO: Currently not all messages supports reencode like MOSDMap, so here
403 // only let fast dispatch support messages prepare message
404 const bool can_fast_prepare
= messenger
->ms_can_fast_dispatch(m
);
405 if (can_fast_prepare
) {
406 prepare_send_message(f
, m
);
409 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
410 bool is_prepared
= can_fast_prepare
;
411 // "features" changes will change the payload encoding
412 if (can_fast_prepare
&& (!can_write
|| connection
->get_features() != f
)) {
413 // ensure the correctness of message encoding
416 ldout(cct
, 10) << __func__
<< " clear encoded buffer previous " << f
417 << " != " << connection
->get_features() << dendl
;
419 if (state
== CLOSED
) {
420 ldout(cct
, 10) << __func__
<< " connection closed."
421 << " Drop message " << m
<< dendl
;
424 ldout(cct
, 5) << __func__
<< " enqueueing message m=" << m
425 << " type=" << m
->get_type() << " " << *m
<< dendl
;
426 m
->trace
.event("async enqueueing message");
427 out_queue
[m
->get_priority()].emplace_back(
428 out_queue_entry_t
{is_prepared
, m
});
429 ldout(cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
431 if ((!replacing
&& can_write
) || state
== STANDBY
) {
432 connection
->center
->dispatch_event_external(connection
->write_handler
);
437 void ProtocolV2::send_keepalive() {
438 ldout(cct
, 10) << __func__
<< dendl
;
439 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
440 if (state
!= CLOSED
) {
442 connection
->center
->dispatch_event_external(connection
->write_handler
);
446 void ProtocolV2::read_event() {
447 ldout(cct
, 20) << __func__
<< dendl
;
451 run_continuation(CONTINUATION(start_client_banner_exchange
));
454 run_continuation(CONTINUATION(start_server_banner_exchange
));
457 run_continuation(CONTINUATION(read_frame
));
459 case THROTTLE_MESSAGE
:
460 run_continuation(CONTINUATION(throttle_message
));
463 run_continuation(CONTINUATION(throttle_bytes
));
465 case THROTTLE_DISPATCH_QUEUE
:
466 run_continuation(CONTINUATION(throttle_dispatch_queue
));
473 ProtocolV2::out_queue_entry_t
ProtocolV2::_get_next_outgoing() {
474 out_queue_entry_t out_entry
;
476 if (!out_queue
.empty()) {
477 auto it
= out_queue
.rbegin();
478 auto& entries
= it
->second
;
479 ceph_assert(!entries
.empty());
480 out_entry
= entries
.front();
482 if (entries
.empty()) {
483 out_queue
.erase(it
->first
);
489 ssize_t
ProtocolV2::write_message(Message
*m
, bool more
) {
491 ceph_assert(connection
->center
->in_thread());
492 m
->set_seq(++out_seq
);
494 connection
->lock
.lock();
495 uint64_t ack_seq
= in_seq
;
497 connection
->lock
.unlock();
499 ceph_msg_header
&header
= m
->get_header();
500 ceph_msg_footer
&footer
= m
->get_footer();
502 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
503 header
.type
, header
.priority
,
507 footer
.flags
, header
.compat_version
,
510 auto message
= MessageFrame::Encode(
515 connection
->outcoming_bl
.append(message
.get_buffer(session_stream_handlers
));
517 ldout(cct
, 5) << __func__
<< " sending message m=" << m
518 << " seq=" << m
->get_seq() << " " << *m
<< dendl
;
520 m
->trace
.event("async writing message");
521 ldout(cct
, 20) << __func__
<< " sending m=" << m
<< " seq=" << m
->get_seq()
522 << " src=" << entity_name_t(messenger
->get_myname())
523 << " off=" << header2
.data_off
525 ssize_t total_send_size
= connection
->outcoming_bl
.length();
526 ssize_t rc
= connection
->_try_send(more
);
528 ldout(cct
, 1) << __func__
<< " error sending " << m
<< ", "
529 << cpp_strerror(rc
) << dendl
;
531 connection
->logger
->inc(
532 l_msgr_send_bytes
, total_send_size
- connection
->outcoming_bl
.length());
533 ldout(cct
, 10) << __func__
<< " sending " << m
534 << (rc
? " continuely." : " done.") << dendl
;
536 if (m
->get_type() == CEPH_MSG_OSD_OP
)
537 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
538 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
539 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
545 void ProtocolV2::append_keepalive() {
546 ldout(cct
, 10) << __func__
<< dendl
;
547 auto keepalive_frame
= KeepAliveFrame::Encode();
548 connection
->outcoming_bl
.append(keepalive_frame
.get_buffer(session_stream_handlers
));
551 void ProtocolV2::append_keepalive_ack(utime_t
×tamp
) {
552 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(timestamp
);
553 connection
->outcoming_bl
.append(keepalive_ack_frame
.get_buffer(session_stream_handlers
));
556 void ProtocolV2::handle_message_ack(uint64_t seq
) {
557 if (connection
->policy
.lossy
) { // lossy connections don't keep sent messages
561 ldout(cct
, 15) << __func__
<< " seq=" << seq
<< dendl
;
564 static const int max_pending
= 128;
566 Message
*pending
[max_pending
];
567 connection
->write_lock
.lock();
568 while (!sent
.empty() && sent
.front()->get_seq() <= seq
&& i
< max_pending
) {
569 Message
*m
= sent
.front();
572 ldout(cct
, 10) << __func__
<< " got ack seq " << seq
573 << " >= " << m
->get_seq() << " on " << m
<< " " << *m
576 connection
->write_lock
.unlock();
577 for (int k
= 0; k
< i
; k
++) {
582 void ProtocolV2::write_event() {
583 ldout(cct
, 10) << __func__
<< dendl
;
586 connection
->write_lock
.lock();
593 auto start
= ceph::mono_clock::now();
596 const auto out_entry
= _get_next_outgoing();
601 if (!connection
->policy
.lossy
) {
603 sent
.push_back(out_entry
.m
);
606 more
= !out_queue
.empty();
607 connection
->write_lock
.unlock();
609 // send_message or requeue messages may not encode message
610 if (!out_entry
.is_prepared
) {
611 prepare_send_message(connection
->get_features(), out_entry
.m
);
614 r
= write_message(out_entry
.m
, more
);
616 connection
->write_lock
.lock();
620 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
626 // if r > 0 mean data still lefted, so no need _try_send.
628 uint64_t left
= ack_left
;
632 auto ack
= AckFrame::Encode(in_seq
);
633 connection
->outcoming_bl
.append(ack
.get_buffer(session_stream_handlers
));
634 ldout(cct
, 10) << __func__
<< " try send msg ack, acked " << left
635 << " messages" << dendl
;
638 r
= connection
->_try_send(left
);
639 } else if (is_queued()) {
640 r
= connection
->_try_send();
643 connection
->write_lock
.unlock();
645 connection
->logger
->tinc(l_msgr_running_send_time
,
646 ceph::mono_clock::now() - start
);
648 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
649 connection
->lock
.lock();
651 connection
->lock
.unlock();
655 connection
->write_lock
.unlock();
656 connection
->lock
.lock();
657 connection
->write_lock
.lock();
658 if (state
== STANDBY
&& !connection
->policy
.server
&& is_queued()) {
659 ldout(cct
, 10) << __func__
<< " policy.server is false" << dendl
;
660 if (server_cookie
) { // only increment connect_seq if there is a session
663 connection
->_connect();
664 } else if (connection
->cs
&& state
!= NONE
&& state
!= CLOSED
&&
665 state
!= START_CONNECT
) {
666 r
= connection
->_try_send();
668 ldout(cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
669 connection
->write_lock
.unlock();
671 connection
->lock
.unlock();
675 connection
->write_lock
.unlock();
676 connection
->lock
.unlock();
680 bool ProtocolV2::is_queued() {
681 return !out_queue
.empty() || connection
->is_queued();
684 uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size
) const {
685 if (session_stream_handlers
.rx
) {
686 return segment_onwire_size(logical_size
);
692 uint32_t ProtocolV2::get_epilogue_size() const {
693 // In secure mode size of epilogue is flexible and depends on particular
694 // cipher implementation. See the comment for epilogue_secure_block_t or
695 // epilogue_plain_block_t.
696 if (session_stream_handlers
.rx
) {
697 return FRAME_SECURE_EPILOGUE_SIZE
+ \
698 session_stream_handlers
.rx
->get_extra_size_at_final();
700 return FRAME_PLAIN_EPILOGUE_SIZE
;
704 CtPtr
ProtocolV2::read(CONTINUATION_RXBPTR_TYPE
<ProtocolV2
> &next
,
705 rx_buffer_t
&&buffer
) {
706 const auto len
= buffer
->length();
707 const auto buf
= buffer
->c_str();
708 next
.node
= std::move(buffer
);
709 ssize_t r
= connection
->read(len
, buf
,
710 [&next
, this](char *buffer
, int r
) {
711 if (unlikely(pre_auth
.enabled
) && r
>= 0) {
712 pre_auth
.rxbuf
.append(*next
.node
);
713 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
714 pre_auth
.rxbuf
.length() < 1000000);
717 run_continuation(next
);
720 // error or done synchronously
721 if (unlikely(pre_auth
.enabled
) && r
>= 0) {
722 pre_auth
.rxbuf
.append(*next
.node
);
723 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
724 pre_auth
.rxbuf
.length() < 1000000);
734 CtPtr
ProtocolV2::write(const std::string
&desc
,
735 CONTINUATION_TYPE
<ProtocolV2
> &next
,
737 ceph::bufferlist bl
= frame
.get_buffer(session_stream_handlers
);
738 return write(desc
, next
, bl
);
741 CtPtr
ProtocolV2::write(const std::string
&desc
,
742 CONTINUATION_TYPE
<ProtocolV2
> &next
,
743 bufferlist
&buffer
) {
744 if (unlikely(pre_auth
.enabled
)) {
745 pre_auth
.txbuf
.append(buffer
);
746 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
747 pre_auth
.txbuf
.length() < 1000000);
751 connection
->write(buffer
, [&next
, desc
, this](int r
) {
753 ldout(cct
, 1) << __func__
<< " " << desc
<< " write failed r=" << r
754 << " (" << cpp_strerror(r
) << ")" << dendl
;
755 connection
->inject_delay();
758 run_continuation(next
);
762 ldout(cct
, 1) << __func__
<< " " << desc
<< " write failed r=" << r
763 << " (" << cpp_strerror(r
) << ")" << dendl
;
773 CtPtr
ProtocolV2::_banner_exchange(CtRef callback
) {
774 ldout(cct
, 20) << __func__
<< dendl
;
775 bannerExchangeCallback
= &callback
;
777 bufferlist banner_payload
;
778 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
779 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
782 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
783 encode((uint16_t)banner_payload
.length(), bl
, 0);
784 bl
.claim_append(banner_payload
);
786 INTERCEPT(state
== BANNER_CONNECTING
? 3 : 4);
788 return WRITE(bl
, "banner", _wait_for_peer_banner
);
791 CtPtr
ProtocolV2::_wait_for_peer_banner() {
792 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(__le16
);
793 return READ(banner_len
, _handle_peer_banner
);
796 CtPtr
ProtocolV2::_handle_peer_banner(rx_buffer_t
&&buffer
, int r
) {
797 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
800 ldout(cct
, 1) << __func__
<< " read peer banner failed r=" << r
<< " ("
801 << cpp_strerror(r
) << ")" << dendl
;
805 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
807 if (memcmp(buffer
->c_str(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
)) {
808 if (memcmp(buffer
->c_str(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
809 lderr(cct
) << __func__
<< " peer " << *connection
->peer_addrs
810 << " is using msgr V1 protocol" << dendl
;
813 ldout(cct
, 1) << __func__
<< " accept peer sent bad banner" << dendl
;
817 uint16_t payload_len
;
819 buffer
->set_offset(banner_prefix_len
);
820 buffer
->set_length(sizeof(__le16
));
821 bl
.push_back(std::move(buffer
));
822 auto ti
= bl
.cbegin();
824 decode(payload_len
, ti
);
825 } catch (const buffer::error
&e
) {
826 lderr(cct
) << __func__
<< " decode banner payload len failed " << dendl
;
830 INTERCEPT(state
== BANNER_CONNECTING
? 5 : 6);
832 return READ(payload_len
, _handle_peer_banner_payload
);
835 CtPtr
ProtocolV2::_handle_peer_banner_payload(rx_buffer_t
&&buffer
, int r
) {
836 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
839 ldout(cct
, 1) << __func__
<< " read peer banner payload failed r=" << r
840 << " (" << cpp_strerror(r
) << ")" << dendl
;
844 uint64_t peer_supported_features
;
845 uint64_t peer_required_features
;
848 bl
.push_back(std::move(buffer
));
849 auto ti
= bl
.cbegin();
851 decode(peer_supported_features
, ti
);
852 decode(peer_required_features
, ti
);
853 } catch (const buffer::error
&e
) {
854 lderr(cct
) << __func__
<< " decode banner payload failed " << dendl
;
858 ldout(cct
, 1) << __func__
<< " supported=" << std::hex
859 << peer_supported_features
<< " required=" << std::hex
860 << peer_required_features
<< std::dec
<< dendl
;
862 // Check feature bit compatibility
864 uint64_t supported_features
= CEPH_MSGR2_SUPPORTED_FEATURES
;
865 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
867 if ((required_features
& peer_supported_features
) != required_features
) {
868 ldout(cct
, 1) << __func__
<< " peer does not support all required features"
869 << " required=" << std::hex
<< required_features
870 << " supported=" << std::hex
<< peer_supported_features
871 << std::dec
<< dendl
;
873 connection
->dispatch_queue
->queue_reset(connection
);
876 if ((supported_features
& peer_required_features
) != peer_required_features
) {
877 ldout(cct
, 1) << __func__
<< " we do not support all peer required features"
878 << " required=" << std::hex
<< peer_required_features
879 << " supported=" << supported_features
<< std::dec
<< dendl
;
881 connection
->dispatch_queue
->queue_reset(connection
);
885 this->peer_required_features
= peer_required_features
;
886 if (this->peer_required_features
== 0) {
887 this->connection_features
= msgr2_required
;
890 // at this point we can change how the client protocol behaves based on
891 // this->peer_required_features
893 if (state
== BANNER_CONNECTING
) {
894 state
= HELLO_CONNECTING
;
897 ceph_assert(state
== BANNER_ACCEPTING
);
898 state
= HELLO_ACCEPTING
;
901 auto hello
= HelloFrame::Encode(messenger
->get_mytype(),
902 connection
->target_addr
);
904 INTERCEPT(state
== HELLO_CONNECTING
? 7 : 8);
906 return WRITE(hello
, "hello frame", read_frame
);
909 CtPtr
ProtocolV2::handle_hello(ceph::bufferlist
&payload
)
911 ldout(cct
, 20) << __func__
912 << " payload.length()=" << payload
.length() << dendl
;
914 if (state
!= HELLO_CONNECTING
&& state
!= HELLO_ACCEPTING
) {
915 lderr(cct
) << __func__
<< " not in hello exchange state!" << dendl
;
919 auto hello
= HelloFrame::Decode(payload
);
921 ldout(cct
, 5) << __func__
<< " received hello:"
922 << " peer_type=" << (int)hello
.entity_type()
923 << " peer_addr_for_me=" << hello
.peer_addr() << dendl
;
925 if (connection
->get_peer_type() == -1) {
926 connection
->set_peer_type(hello
.entity_type());
928 ceph_assert(state
== HELLO_ACCEPTING
);
929 connection
->policy
= messenger
->get_policy(hello
.entity_type());
930 ldout(cct
, 10) << __func__
<< " accept of host_type "
931 << (int)hello
.entity_type()
932 << ", policy.lossy=" << connection
->policy
.lossy
933 << " policy.server=" << connection
->policy
.server
934 << " policy.standby=" << connection
->policy
.standby
935 << " policy.resetcheck=" << connection
->policy
.resetcheck
938 ceph_assert(state
== HELLO_CONNECTING
);
939 if (connection
->get_peer_type() != hello
.entity_type()) {
940 ldout(cct
, 1) << __func__
<< " connection peer type does not match what"
941 << " peer advertises " << connection
->get_peer_type()
942 << " != " << (int)hello
.entity_type() << dendl
;
944 connection
->dispatch_queue
->queue_reset(connection
);
950 callback
= bannerExchangeCallback
;
951 bannerExchangeCallback
= nullptr;
952 ceph_assert(callback
);
956 CtPtr
ProtocolV2::read_frame() {
957 if (state
== CLOSED
) {
961 ldout(cct
, 20) << __func__
<< dendl
;
962 return READ(FRAME_PREAMBLE_SIZE
, handle_read_frame_preamble_main
);
965 CtPtr
ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t
&&buffer
, int r
) {
966 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
969 ldout(cct
, 1) << __func__
<< " read frame length and tag failed r=" << r
970 << " (" << cpp_strerror(r
) << ")" << dendl
;
974 ceph::bufferlist preamble
;
975 preamble
.push_back(std::move(buffer
));
977 ldout(cct
, 30) << __func__
<< " preamble\n";
978 preamble
.hexdump(*_dout
);
981 if (session_stream_handlers
.rx
) {
982 ceph_assert(session_stream_handlers
.rx
);
984 session_stream_handlers
.rx
->reset_rx_handler();
985 preamble
= session_stream_handlers
.rx
->authenticated_decrypt_update(
986 std::move(preamble
), segment_t::DEFAULT_ALIGNMENT
);
988 ldout(cct
, 10) << __func__
<< " got encrypted preamble."
989 << " after decrypt premable.length()=" << preamble
.length()
992 ldout(cct
, 30) << __func__
<< " preamble after decrypt\n";
993 preamble
.hexdump(*_dout
);
998 // I expect ceph_le32 will make the endian conversion for me. Passing
999 // everything through ::Decode is unnecessary.
1000 const auto& main_preamble
= \
1001 reinterpret_cast<preamble_block_t
&>(*preamble
.c_str());
1003 // verify preamble's CRC before any further processing
1004 const auto rx_crc
= ceph_crc32c(0,
1005 reinterpret_cast<const unsigned char*>(&main_preamble
),
1006 sizeof(main_preamble
) - sizeof(main_preamble
.crc
));
1007 if (rx_crc
!= main_preamble
.crc
) {
1008 ldout(cct
, 10) << __func__
<< " crc mismatch for main preamble"
1009 << " rx_crc=" << rx_crc
1010 << " tx_crc=" << main_preamble
.crc
<< dendl
;
1014 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1015 if (main_preamble
.num_segments
< 1 ||
1016 main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
1017 ldout(cct
, 10) << __func__
<< " unsupported num_segments="
1018 << " tx_crc=" << main_preamble
.num_segments
<< dendl
;
1022 next_tag
= static_cast<Tag
>(main_preamble
.tag
);
1024 rx_segments_desc
.clear();
1025 rx_segments_data
.clear();
1027 if (main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
1028 ldout(cct
, 30) << __func__
1029 << " num_segments=" << main_preamble
.num_segments
1030 << " is too much" << dendl
;
1033 for (std::uint8_t idx
= 0; idx
< main_preamble
.num_segments
; idx
++) {
1034 ldout(cct
, 10) << __func__
<< " got new segment:"
1035 << " len=" << main_preamble
.segments
[idx
].length
1036 << " align=" << main_preamble
.segments
[idx
].alignment
1038 rx_segments_desc
.emplace_back(main_preamble
.segments
[idx
]);
1042 // does it need throttle?
1043 if (next_tag
== Tag::MESSAGE
) {
1044 if (state
!= READY
) {
1045 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1048 state
= THROTTLE_MESSAGE
;
1049 return CONTINUE(throttle_message
);
1051 return read_frame_segment();
1055 CtPtr
ProtocolV2::handle_read_frame_dispatch() {
1056 ldout(cct
, 10) << __func__
1057 << " tag=" << static_cast<uint32_t>(next_tag
) << dendl
;
1061 case Tag::AUTH_REQUEST
:
1062 case Tag::AUTH_BAD_METHOD
:
1063 case Tag::AUTH_REPLY_MORE
:
1064 case Tag::AUTH_REQUEST_MORE
:
1065 case Tag::AUTH_DONE
:
1066 case Tag::AUTH_SIGNATURE
:
1067 case Tag::CLIENT_IDENT
:
1068 case Tag::SERVER_IDENT
:
1069 case Tag::IDENT_MISSING_FEATURES
:
1070 case Tag::SESSION_RECONNECT
:
1071 case Tag::SESSION_RESET
:
1072 case Tag::SESSION_RETRY
:
1073 case Tag::SESSION_RETRY_GLOBAL
:
1074 case Tag::SESSION_RECONNECT_OK
:
1075 case Tag::KEEPALIVE2
:
1076 case Tag::KEEPALIVE2_ACK
:
1079 return handle_frame_payload();
1081 return handle_message();
1083 lderr(cct
) << __func__
1084 << " received unknown tag=" << static_cast<uint32_t>(next_tag
)
1093 CtPtr
ProtocolV2::read_frame_segment() {
1094 ldout(cct
, 20) << __func__
<< dendl
;
1095 ceph_assert(!rx_segments_desc
.empty());
1097 // description of current segment to read
1098 const auto& cur_rx_desc
= rx_segments_desc
.at(rx_segments_data
.size());
1099 rx_buffer_t rx_buffer
;
1101 rx_buffer
= buffer::ptr_node::create(buffer::create_aligned(
1102 get_onwire_size(cur_rx_desc
.length
), cur_rx_desc
.alignment
));
1103 } catch (std::bad_alloc
&) {
1104 // Catching because of potential issues with satisfying alignment.
1105 ldout(cct
, 20) << __func__
<< " can't allocate aligned rx_buffer "
1106 << " len=" << get_onwire_size(cur_rx_desc
.length
)
1107 << " align=" << cur_rx_desc
.alignment
1112 return READ_RXBUF(std::move(rx_buffer
), handle_read_frame_segment
);
1115 CtPtr
ProtocolV2::handle_read_frame_segment(rx_buffer_t
&&rx_buffer
, int r
) {
1116 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1119 ldout(cct
, 1) << __func__
<< " read frame segment failed r=" << r
<< " ("
1120 << cpp_strerror(r
) << ")" << dendl
;
1124 rx_segments_data
.emplace_back();
1125 rx_segments_data
.back().push_back(std::move(rx_buffer
));
1127 // decrypt incoming data
1128 // FIXME: if (auth_meta->is_mode_secure()) {
1129 if (session_stream_handlers
.rx
) {
1130 ceph_assert(session_stream_handlers
.rx
);
1132 auto& new_seg
= rx_segments_data
.back();
1133 if (new_seg
.length()) {
1134 auto padded
= session_stream_handlers
.rx
->authenticated_decrypt_update(
1135 std::move(new_seg
), segment_t::DEFAULT_ALIGNMENT
);
1136 const auto idx
= rx_segments_data
.size() - 1;
1138 padded
.splice(0, rx_segments_desc
[idx
].length
, &new_seg
);
1140 ldout(cct
, 20) << __func__
1141 << " unpadded new_seg.length()=" << new_seg
.length()
1146 if (rx_segments_desc
.size() == rx_segments_data
.size()) {
1147 // OK, all segments planned to read are read. Can go with epilogue.
1148 return READ(get_epilogue_size(), handle_read_frame_epilogue_main
);
1150 // TODO: for makeshift only. This will be more generic and throttled
1151 return read_frame_segment();
1155 CtPtr
ProtocolV2::handle_frame_payload() {
1156 ceph_assert(!rx_segments_data
.empty());
1157 auto& payload
= rx_segments_data
.back();
1159 ldout(cct
, 30) << __func__
<< "\n";
1160 payload
.hexdump(*_dout
);
1165 return handle_hello(payload
);
1166 case Tag::AUTH_REQUEST
:
1167 return handle_auth_request(payload
);
1168 case Tag::AUTH_BAD_METHOD
:
1169 return handle_auth_bad_method(payload
);
1170 case Tag::AUTH_REPLY_MORE
:
1171 return handle_auth_reply_more(payload
);
1172 case Tag::AUTH_REQUEST_MORE
:
1173 return handle_auth_request_more(payload
);
1174 case Tag::AUTH_DONE
:
1175 return handle_auth_done(payload
);
1176 case Tag::AUTH_SIGNATURE
:
1177 return handle_auth_signature(payload
);
1178 case Tag::CLIENT_IDENT
:
1179 return handle_client_ident(payload
);
1180 case Tag::SERVER_IDENT
:
1181 return handle_server_ident(payload
);
1182 case Tag::IDENT_MISSING_FEATURES
:
1183 return handle_ident_missing_features(payload
);
1184 case Tag::SESSION_RECONNECT
:
1185 return handle_reconnect(payload
);
1186 case Tag::SESSION_RESET
:
1187 return handle_session_reset(payload
);
1188 case Tag::SESSION_RETRY
:
1189 return handle_session_retry(payload
);
1190 case Tag::SESSION_RETRY_GLOBAL
:
1191 return handle_session_retry_global(payload
);
1192 case Tag::SESSION_RECONNECT_OK
:
1193 return handle_reconnect_ok(payload
);
1194 case Tag::KEEPALIVE2
:
1195 return handle_keepalive2(payload
);
1196 case Tag::KEEPALIVE2_ACK
:
1197 return handle_keepalive2_ack(payload
);
1199 return handle_message_ack(payload
);
1201 return handle_wait(payload
);
1208 CtPtr
ProtocolV2::ready() {
1209 ldout(cct
, 25) << __func__
<< dendl
;
1211 reconnecting
= false;
1214 // make sure no pending tick timer
1215 if (connection
->last_tick_id
) {
1216 connection
->center
->delete_time_event(connection
->last_tick_id
);
1218 connection
->last_tick_id
= connection
->center
->create_time_event(
1219 connection
->inactive_timeout_us
, connection
->tick_handler
);
1222 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1224 if (!out_queue
.empty()) {
1225 connection
->center
->dispatch_event_external(connection
->write_handler
);
1229 connection
->maybe_start_delay_thread();
1232 ldout(cct
, 1) << __func__
<< " entity=" << peer_name
<< " client_cookie="
1233 << std::hex
<< client_cookie
<< " server_cookie="
1234 << server_cookie
<< std::dec
<< " in_seq=" << in_seq
1235 << " out_seq=" << out_seq
<< dendl
;
1239 return CONTINUE(read_frame
);
1242 CtPtr
ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t
&&buffer
, int r
)
1244 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1247 ldout(cct
, 1) << __func__
<< " read data error " << dendl
;
1253 // FIXME: if (auth_meta->is_mode_secure()) {
1254 if (session_stream_handlers
.rx
) {
1255 ldout(cct
, 1) << __func__
<< " read frame epilogue bytes="
1256 << get_epilogue_size() << dendl
;
1258 // decrypt epilogue and authenticate entire frame.
1259 ceph::bufferlist epilogue_bl
;
1261 epilogue_bl
.push_back(std::move(buffer
));
1264 session_stream_handlers
.rx
->authenticated_decrypt_update_final(
1265 std::move(epilogue_bl
), segment_t::DEFAULT_ALIGNMENT
);
1266 } catch (ceph::crypto::onwire::MsgAuthError
&e
) {
1267 ldout(cct
, 5) << __func__
<< " message authentication failed: "
1268 << e
.what() << dendl
;
1273 reinterpret_cast<epilogue_plain_block_t
&>(*epilogue_bl
.c_str());
1274 late_flags
= epilogue
.late_flags
;
1276 auto& epilogue
= reinterpret_cast<epilogue_plain_block_t
&>(*buffer
->c_str());
1278 for (std::uint8_t idx
= 0; idx
< rx_segments_data
.size(); idx
++) {
1279 const __u32 expected_crc
= epilogue
.crc_values
[idx
];
1280 const __u32 calculated_crc
= rx_segments_data
[idx
].crc32c(-1);
1281 if (expected_crc
!= calculated_crc
) {
1282 ldout(cct
, 5) << __func__
<< " message integrity check failed: "
1283 << " expected_crc=" << expected_crc
1284 << " calculated_crc=" << calculated_crc
1288 ldout(cct
, 20) << __func__
<< " message integrity check success: "
1289 << " expected_crc=" << expected_crc
1290 << " calculated_crc=" << calculated_crc
1294 late_flags
= epilogue
.late_flags
;
1297 // we do have a mechanism that allows transmitter to start sending message
1298 // and abort after putting entire data field on wire. This will be used by
1299 // the kernel client to avoid unnecessary buffering.
1300 if (late_flags
& FRAME_FLAGS_LATEABRT
) {
1303 return CONTINUE(read_frame
);
1305 return handle_read_frame_dispatch();
1309 CtPtr
ProtocolV2::handle_message() {
1310 ldout(cct
, 20) << __func__
<< dendl
;
1311 ceph_assert(state
== THROTTLE_DONE
);
1313 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1314 ltt_recv_stamp
= ceph_clock_now();
1316 recv_stamp
= ceph_clock_now();
1318 // we need to get the size before std::moving segments data
1319 const size_t cur_msg_size
= get_current_msg_size();
1320 auto msg_frame
= MessageFrame::Decode(std::move(rx_segments_data
));
1322 // XXX: paranoid copy just to avoid oops
1323 ceph_msg_header2 current_header
= msg_frame
.header();
1325 ldout(cct
, 5) << __func__
1326 << " got " << msg_frame
.front_len()
1327 << " + " << msg_frame
.middle_len()
1328 << " + " << msg_frame
.data_len()
1330 << " envelope type=" << current_header
.type
1331 << " src " << peer_name
1332 << " off " << current_header
.data_off
1336 ceph_msg_header header
{current_header
.seq
,
1338 current_header
.type
,
1339 current_header
.priority
,
1340 current_header
.version
,
1341 msg_frame
.front_len(),
1342 msg_frame
.middle_len(),
1343 msg_frame
.data_len(),
1344 current_header
.data_off
,
1346 current_header
.compat_version
,
1347 current_header
.reserved
,
1349 ceph_msg_footer footer
{0, 0, 0, 0, current_header
.flags
};
1351 Message
*message
= decode_message(cct
, 0, header
, footer
,
1357 ldout(cct
, 1) << __func__
<< " decode message failed " << dendl
;
1360 state
= READ_MESSAGE_COMPLETE
;
1365 message
->set_byte_throttler(connection
->policy
.throttler_bytes
);
1366 message
->set_message_throttler(connection
->policy
.throttler_messages
);
1368 // store reservation size in message, so we don't get confused
1369 // by messages entering the dispatch queue through other paths.
1370 message
->set_dispatch_throttle_size(cur_msg_size
);
1372 message
->set_recv_stamp(recv_stamp
);
1373 message
->set_throttle_stamp(throttle_stamp
);
1374 message
->set_recv_complete_stamp(ceph_clock_now());
1376 // check received seq#. if it is old, drop the message.
1377 // note that incoming messages may skip ahead. this is convenient for the
1378 // client side queueing because messages can't be renumbered, but the (kernel)
1379 // client will occasionally pull a message out of the sent queue to send
1380 // elsewhere. in that case it doesn't matter if we "got" it or not.
1381 uint64_t cur_seq
= in_seq
;
1382 if (message
->get_seq() <= cur_seq
) {
1383 ldout(cct
, 0) << __func__
<< " got old message " << message
->get_seq()
1384 << " <= " << cur_seq
<< " " << message
<< " " << *message
1385 << ", discarding" << dendl
;
1387 if (connection
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
1388 cct
->_conf
->ms_die_on_old_message
) {
1389 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1393 if (message
->get_seq() > cur_seq
+ 1) {
1394 ldout(cct
, 0) << __func__
<< " missed message? skipped from seq "
1395 << cur_seq
<< " to " << message
->get_seq() << dendl
;
1396 if (cct
->_conf
->ms_die_on_skipped_message
) {
1397 ceph_assert(0 == "skipped incoming seq");
1401 message
->set_connection(connection
);
1403 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1404 if (message
->get_type() == CEPH_MSG_OSD_OP
||
1405 message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
1406 utime_t ltt_processed_stamp
= ceph_clock_now();
1407 double usecs_elapsed
=
1408 (ltt_processed_stamp
.to_nsec() - ltt_recv_stamp
.to_nsec()) / 1000;
1410 if (message
->get_type() == CEPH_MSG_OSD_OP
)
1411 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP",
1414 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY",
1419 // note last received message.
1420 in_seq
= message
->get_seq();
1421 ldout(cct
, 5) << __func__
<< " received message m=" << message
1422 << " seq=" << message
->get_seq()
1423 << " from=" << message
->get_source() << " type=" << header
.type
1424 << " " << *message
<< dendl
;
1426 bool need_dispatch_writer
= false;
1427 if (!connection
->policy
.lossy
) {
1429 need_dispatch_writer
= true;
1434 connection
->logger
->inc(l_msgr_recv_messages
);
1435 connection
->logger
->inc(
1437 cur_msg_size
+ sizeof(ceph_msg_header
) + sizeof(ceph_msg_footer
));
1439 messenger
->ms_fast_preprocess(message
);
1440 auto fast_dispatch_time
= ceph::mono_clock::now();
1441 connection
->logger
->tinc(l_msgr_running_recv_time
,
1442 fast_dispatch_time
- connection
->recv_start_time
);
1443 if (connection
->delay_state
) {
1444 double delay_period
= 0;
1445 if (rand() % 10000 < cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1447 cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1448 ldout(cct
, 1) << "queue_received will delay after "
1449 << (ceph_clock_now() + delay_period
) << " on " << message
1450 << " " << *message
<< dendl
;
1452 connection
->delay_state
->queue(delay_period
, message
);
1453 } else if (messenger
->ms_can_fast_dispatch(message
)) {
1454 connection
->lock
.unlock();
1455 connection
->dispatch_queue
->fast_dispatch(message
);
1456 connection
->recv_start_time
= ceph::mono_clock::now();
1457 connection
->logger
->tinc(l_msgr_running_fast_dispatch_time
,
1458 connection
->recv_start_time
- fast_dispatch_time
);
1459 connection
->lock
.lock();
1461 connection
->dispatch_queue
->enqueue(message
, message
->get_priority(),
1462 connection
->conn_id
);
1465 handle_message_ack(current_header
.ack_seq
);
1467 // we might have been reused by another connection
1468 // let's check if that is the case
1469 if (state
!= READY
) {
1470 // yes, that was the case, let's do nothing
1474 if (need_dispatch_writer
&& connection
->is_connected()) {
1475 connection
->center
->dispatch_event_external(connection
->write_handler
);
1478 return CONTINUE(read_frame
);
1482 CtPtr
ProtocolV2::throttle_message() {
1483 ldout(cct
, 20) << __func__
<< dendl
;
1485 if (connection
->policy
.throttler_messages
) {
1486 ldout(cct
, 10) << __func__
<< " wants " << 1
1487 << " message from policy throttler "
1488 << connection
->policy
.throttler_messages
->get_current()
1489 << "/" << connection
->policy
.throttler_messages
->get_max()
1491 if (!connection
->policy
.throttler_messages
->get_or_fail()) {
1492 ldout(cct
, 10) << __func__
<< " wants 1 message from policy throttle "
1493 << connection
->policy
.throttler_messages
->get_current()
1494 << "/" << connection
->policy
.throttler_messages
->get_max()
1495 << " failed, just wait." << dendl
;
1496 // following thread pool deal with th full message queue isn't a
1497 // short time, so we can wait a ms.
1498 if (connection
->register_time_events
.empty()) {
1499 connection
->register_time_events
.insert(
1500 connection
->center
->create_time_event(1000,
1501 connection
->wakeup_handler
));
1507 state
= THROTTLE_BYTES
;
1508 return CONTINUE(throttle_bytes
);
1511 CtPtr
ProtocolV2::throttle_bytes() {
1512 ldout(cct
, 20) << __func__
<< dendl
;
1514 const size_t cur_msg_size
= get_current_msg_size();
1516 if (connection
->policy
.throttler_bytes
) {
1517 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
1518 << " bytes from policy throttler "
1519 << connection
->policy
.throttler_bytes
->get_current() << "/"
1520 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
1521 if (!connection
->policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
1522 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
1523 << " bytes from policy throttler "
1524 << connection
->policy
.throttler_bytes
->get_current()
1525 << "/" << connection
->policy
.throttler_bytes
->get_max()
1526 << " failed, just wait." << dendl
;
1527 // following thread pool deal with th full message queue isn't a
1528 // short time, so we can wait a ms.
1529 if (connection
->register_time_events
.empty()) {
1530 connection
->register_time_events
.insert(
1531 connection
->center
->create_time_event(
1532 1000, connection
->wakeup_handler
));
1539 state
= THROTTLE_DISPATCH_QUEUE
;
1540 return CONTINUE(throttle_dispatch_queue
);
1543 CtPtr
ProtocolV2::throttle_dispatch_queue() {
1544 ldout(cct
, 20) << __func__
<< dendl
;
1546 const size_t cur_msg_size
= get_current_msg_size();
1548 if (!connection
->dispatch_queue
->dispatch_throttler
.get_or_fail(
1551 << __func__
<< " wants " << cur_msg_size
1552 << " bytes from dispatch throttle "
1553 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
1554 << connection
->dispatch_queue
->dispatch_throttler
.get_max()
1555 << " failed, just wait." << dendl
;
1556 // following thread pool deal with th full message queue isn't a
1557 // short time, so we can wait a ms.
1558 if (connection
->register_time_events
.empty()) {
1559 connection
->register_time_events
.insert(
1560 connection
->center
->create_time_event(1000,
1561 connection
->wakeup_handler
));
1567 throttle_stamp
= ceph_clock_now();
1568 state
= THROTTLE_DONE
;
1570 return read_frame_segment();
1573 CtPtr
ProtocolV2::handle_keepalive2(ceph::bufferlist
&payload
)
1575 ldout(cct
, 20) << __func__
1576 << " payload.length()=" << payload
.length() << dendl
;
1578 if (state
!= READY
) {
1579 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1583 auto keepalive_frame
= KeepAliveFrame::Decode(payload
);
1585 ldout(cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
1587 connection
->write_lock
.lock();
1588 append_keepalive_ack(keepalive_frame
.timestamp());
1589 connection
->write_lock
.unlock();
1591 ldout(cct
, 20) << __func__
<< " got KEEPALIVE2 "
1592 << keepalive_frame
.timestamp() << dendl
;
1593 connection
->set_last_keepalive(ceph_clock_now());
1595 if (is_connected()) {
1596 connection
->center
->dispatch_event_external(connection
->write_handler
);
1599 return CONTINUE(read_frame
);
1602 CtPtr
ProtocolV2::handle_keepalive2_ack(ceph::bufferlist
&payload
)
1604 ldout(cct
, 20) << __func__
1605 << " payload.length()=" << payload
.length() << dendl
;
1607 if (state
!= READY
) {
1608 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1612 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(payload
);
1613 connection
->set_last_keepalive_ack(keepalive_ack_frame
.timestamp());
1614 ldout(cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
1616 return CONTINUE(read_frame
);
1619 CtPtr
ProtocolV2::handle_message_ack(ceph::bufferlist
&payload
)
1621 ldout(cct
, 20) << __func__
1622 << " payload.length()=" << payload
.length() << dendl
;
1624 if (state
!= READY
) {
1625 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1629 auto ack
= AckFrame::Decode(payload
);
1630 handle_message_ack(ack
.seq());
1631 return CONTINUE(read_frame
);
1634 /* Client Protocol Methods */
1636 CtPtr
ProtocolV2::start_client_banner_exchange() {
1637 ldout(cct
, 20) << __func__
<< dendl
;
1641 state
= BANNER_CONNECTING
;
1643 global_seq
= messenger
->get_global_seq();
1645 return _banner_exchange(CONTINUATION(post_client_banner_exchange
));
1648 CtPtr
ProtocolV2::post_client_banner_exchange() {
1649 ldout(cct
, 20) << __func__
<< dendl
;
1651 state
= AUTH_CONNECTING
;
1653 return send_auth_request();
1656 CtPtr
ProtocolV2::send_auth_request(std::vector
<uint32_t> &allowed_methods
) {
1657 ldout(cct
, 20) << __func__
<< " peer_type " << (int)connection
->peer_type
1658 << " auth_client " << messenger
->auth_client
<< dendl
;
1659 ceph_assert(messenger
->auth_client
);
1662 vector
<uint32_t> preferred_modes
;
1663 auto am
= auth_meta
;
1664 connection
->lock
.unlock();
1665 int r
= messenger
->auth_client
->get_auth_request(
1666 connection
, am
.get(),
1667 &am
->auth_method
, &preferred_modes
, &bl
);
1668 connection
->lock
.lock();
1669 if (state
!= AUTH_CONNECTING
) {
1670 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1674 ldout(cct
, 0) << __func__
<< " get_initial_auth_request returned " << r
1677 connection
->dispatch_queue
->queue_reset(connection
);
1683 auto frame
= AuthRequestFrame::Encode(auth_meta
->auth_method
, preferred_modes
,
1685 return WRITE(frame
, "auth request", read_frame
);
1688 CtPtr
ProtocolV2::handle_auth_bad_method(ceph::bufferlist
&payload
) {
1689 ldout(cct
, 20) << __func__
1690 << " payload.length()=" << payload
.length() << dendl
;
1692 if (state
!= AUTH_CONNECTING
) {
1693 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1697 auto bad_method
= AuthBadMethodFrame::Decode(payload
);
1698 ldout(cct
, 1) << __func__
<< " method=" << bad_method
.method()
1699 << " result " << cpp_strerror(bad_method
.result())
1700 << ", allowed methods=" << bad_method
.allowed_methods()
1701 << ", allowed modes=" << bad_method
.allowed_modes()
1703 ceph_assert(messenger
->auth_client
);
1704 auto am
= auth_meta
;
1705 connection
->lock
.unlock();
1706 int r
= messenger
->auth_client
->handle_auth_bad_method(
1709 bad_method
.method(), bad_method
.result(),
1710 bad_method
.allowed_methods(),
1711 bad_method
.allowed_modes());
1712 connection
->lock
.lock();
1713 if (state
!= AUTH_CONNECTING
|| r
< 0) {
1716 return send_auth_request(bad_method
.allowed_methods());
1719 CtPtr
ProtocolV2::handle_auth_reply_more(ceph::bufferlist
&payload
)
1721 ldout(cct
, 20) << __func__
1722 << " payload.length()=" << payload
.length() << dendl
;
1724 if (state
!= AUTH_CONNECTING
) {
1725 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1729 auto auth_more
= AuthReplyMoreFrame::Decode(payload
);
1730 ldout(cct
, 5) << __func__
1731 << " auth reply more len=" << auth_more
.auth_payload().length()
1733 ceph_assert(messenger
->auth_client
);
1734 ceph::bufferlist reply
;
1735 auto am
= auth_meta
;
1736 connection
->lock
.unlock();
1737 int r
= messenger
->auth_client
->handle_auth_reply_more(
1738 connection
, am
.get(), auth_more
.auth_payload(), &reply
);
1739 connection
->lock
.lock();
1740 if (state
!= AUTH_CONNECTING
) {
1741 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1745 lderr(cct
) << __func__
<< " auth_client handle_auth_reply_more returned "
1749 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
1750 return WRITE(more_reply
, "auth request more", read_frame
);
1753 CtPtr
ProtocolV2::handle_auth_done(ceph::bufferlist
&payload
)
1755 ldout(cct
, 20) << __func__
1756 << " payload.length()=" << payload
.length() << dendl
;
1758 if (state
!= AUTH_CONNECTING
) {
1759 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1763 auto auth_done
= AuthDoneFrame::Decode(payload
);
1765 ceph_assert(messenger
->auth_client
);
1766 auto am
= auth_meta
;
1767 connection
->lock
.unlock();
1768 int r
= messenger
->auth_client
->handle_auth_done(
1771 auth_done
.global_id(),
1772 auth_done
.con_mode(),
1773 auth_done
.auth_payload(),
1775 &am
->connection_secret
);
1776 connection
->lock
.lock();
1777 if (state
!= AUTH_CONNECTING
) {
1778 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1784 auth_meta
->con_mode
= auth_done
.con_mode();
1785 session_stream_handlers
= \
1786 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct
, *auth_meta
, false);
1788 state
= AUTH_CONNECTING_SIGN
;
1790 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1791 auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.rxbuf
);
1792 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1793 pre_auth
.enabled
= false;
1794 pre_auth
.rxbuf
.clear();
1795 return WRITE(sig_frame
, "auth signature", read_frame
);
1798 CtPtr
ProtocolV2::finish_client_auth() {
1799 if (!server_cookie
) {
1800 ceph_assert(connect_seq
== 0);
1801 state
= SESSION_CONNECTING
;
1802 return send_client_ident();
1803 } else { // reconnecting to previous session
1804 state
= SESSION_RECONNECTING
;
1805 ceph_assert(connect_seq
> 0);
1806 return send_reconnect();
1810 CtPtr
ProtocolV2::send_client_ident() {
1811 ldout(cct
, 20) << __func__
<< dendl
;
1813 if (!connection
->policy
.lossy
&& !client_cookie
) {
1814 client_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1818 if (connection
->policy
.lossy
) {
1819 flags
|= CEPH_MSG_CONNECT_LOSSY
;
1822 if (messenger
->get_myaddrs().empty() ||
1823 messenger
->get_myaddrs().front().is_blank_ip()) {
1824 sockaddr_storage ss
;
1825 socklen_t len
= sizeof(ss
);
1826 int r
= getsockname(connection
->cs
.socket_fd(), (sockaddr
*)&ss
, &len
);
1827 ceph_assert(r
== 0);
1828 ldout(cct
, 1) << __func__
<< " getsockname reveals I am " << (sockaddr
*)&ss
1829 << " when talking to " << connection
->target_addr
<< dendl
;
1831 a
.set_type(entity_addr_t::TYPE_MSGR2
); // anything but NONE; learned_addr ignores this
1832 a
.set_sockaddr((sockaddr
*)&ss
);
1834 connection
->lock
.unlock();
1835 messenger
->learned_addr(a
);
1836 if (cct
->_conf
->ms_inject_internal_delays
&&
1837 cct
->_conf
->ms_inject_socket_failures
) {
1838 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
1839 ldout(cct
, 10) << __func__
<< " sleep for "
1840 << cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1842 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
1846 connection
->lock
.lock();
1847 if (state
!= SESSION_CONNECTING
) {
1848 ldout(cct
, 1) << __func__
1849 << " state changed while learned_addr, mark_down or "
1850 << " replacing must be happened just now" << dendl
;
1855 auto client_ident
= ClientIdentFrame::Encode(
1856 messenger
->get_myaddrs(),
1857 connection
->target_addr
,
1858 messenger
->get_myname().num(),
1860 connection
->policy
.features_supported
,
1861 connection
->policy
.features_required
| msgr2_required
,
1865 ldout(cct
, 5) << __func__
<< " sending identification: "
1866 << "addrs=" << messenger
->get_myaddrs()
1867 << " target=" << connection
->target_addr
1868 << " gid=" << messenger
->get_myname().num()
1869 << " global_seq=" << global_seq
1870 << " features_supported=" << std::hex
1871 << connection
->policy
.features_supported
1872 << " features_required="
1873 << (connection
->policy
.features_required
| msgr2_required
)
1874 << " flags=" << flags
1875 << " cookie=" << client_cookie
<< std::dec
<< dendl
;
1879 return WRITE(client_ident
, "client ident", read_frame
);
1882 CtPtr
ProtocolV2::send_reconnect() {
1883 ldout(cct
, 20) << __func__
<< dendl
;
1885 auto reconnect
= ReconnectFrame::Encode(messenger
->get_myaddrs(),
1892 ldout(cct
, 5) << __func__
<< " reconnect to session: client_cookie="
1893 << std::hex
<< client_cookie
<< " server_cookie="
1894 << server_cookie
<< std::dec
1895 << " gs=" << global_seq
<< " cs=" << connect_seq
1896 << " ms=" << in_seq
<< dendl
;
1900 return WRITE(reconnect
, "reconnect", read_frame
);
1903 CtPtr
ProtocolV2::handle_ident_missing_features(ceph::bufferlist
&payload
)
1905 ldout(cct
, 20) << __func__
1906 << " payload.length()=" << payload
.length() << dendl
;
1908 if (state
!= SESSION_CONNECTING
) {
1909 lderr(cct
) << __func__
<< " not in session connect state!" << dendl
;
1913 auto ident_missing
=
1914 IdentMissingFeaturesFrame::Decode(payload
);
1915 lderr(cct
) << __func__
1916 << " client does not support all server features: " << std::hex
1917 << ident_missing
.features() << std::dec
<< dendl
;
1922 CtPtr
ProtocolV2::handle_session_reset(ceph::bufferlist
&payload
)
1924 ldout(cct
, 20) << __func__
1925 << " payload.length()=" << payload
.length() << dendl
;
1927 if (state
!= SESSION_RECONNECTING
) {
1928 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
1932 auto reset
= ResetFrame::Decode(payload
);
1934 ldout(cct
, 1) << __func__
<< " received session reset full=" << reset
.full()
1944 state
= SESSION_CONNECTING
;
1945 return send_client_ident();
1948 CtPtr
ProtocolV2::handle_session_retry(ceph::bufferlist
&payload
)
1950 ldout(cct
, 20) << __func__
1951 << " payload.length()=" << payload
.length() << dendl
;
1953 if (state
!= SESSION_RECONNECTING
) {
1954 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
1958 auto retry
= RetryFrame::Decode(payload
);
1959 connect_seq
= retry
.connect_seq() + 1;
1961 ldout(cct
, 1) << __func__
1962 << " received session retry connect_seq=" << retry
.connect_seq()
1963 << ", inc to cs=" << connect_seq
<< dendl
;
1965 return send_reconnect();
1968 CtPtr
ProtocolV2::handle_session_retry_global(ceph::bufferlist
&payload
)
1970 ldout(cct
, 20) << __func__
1971 << " payload.length()=" << payload
.length() << dendl
;
1973 if (state
!= SESSION_RECONNECTING
) {
1974 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
1978 auto retry
= RetryGlobalFrame::Decode(payload
);
1979 global_seq
= messenger
->get_global_seq(retry
.global_seq());
1981 ldout(cct
, 1) << __func__
<< " received session retry global global_seq="
1982 << retry
.global_seq() << ", choose new gs=" << global_seq
1985 return send_reconnect();
1988 CtPtr
ProtocolV2::handle_wait(ceph::bufferlist
&payload
) {
1989 ldout(cct
, 20) << __func__
1990 << " received WAIT (connection race)"
1991 << " payload.length()=" << payload
.length()
1994 if (state
!= SESSION_CONNECTING
&& state
!= SESSION_RECONNECTING
) {
1995 lderr(cct
) << __func__
<< " not in session (re)connect state!" << dendl
;
2000 WaitFrame::Decode(payload
);
2004 CtPtr
ProtocolV2::handle_reconnect_ok(ceph::bufferlist
&payload
)
2006 ldout(cct
, 20) << __func__
2007 << " payload.length()=" << payload
.length() << dendl
;
2009 if (state
!= SESSION_RECONNECTING
) {
2010 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
2014 auto reconnect_ok
= ReconnectOkFrame::Decode(payload
);
2015 ldout(cct
, 5) << __func__
2016 << " reconnect accepted: sms=" << reconnect_ok
.msg_seq()
2019 out_seq
= discard_requeued_up_to(out_seq
, reconnect_ok
.msg_seq());
2021 backoff
= utime_t();
2022 ldout(cct
, 10) << __func__
<< " reconnect success " << connect_seq
2023 << ", lossy = " << connection
->policy
.lossy
<< ", features "
2024 << connection
->get_features() << dendl
;
2026 if (connection
->delay_state
) {
2027 ceph_assert(connection
->delay_state
->ready());
2030 connection
->dispatch_queue
->queue_connect(connection
);
2031 messenger
->ms_deliver_handle_fast_connect(connection
);
2036 CtPtr
ProtocolV2::handle_server_ident(ceph::bufferlist
&payload
)
2038 ldout(cct
, 20) << __func__
2039 << " payload.length()=" << payload
.length() << dendl
;
2041 if (state
!= SESSION_CONNECTING
) {
2042 lderr(cct
) << __func__
<< " not in session connect state!" << dendl
;
2046 auto server_ident
= ServerIdentFrame::Decode(payload
);
2047 ldout(cct
, 5) << __func__
<< " received server identification:"
2048 << " addrs=" << server_ident
.addrs()
2049 << " gid=" << server_ident
.gid()
2050 << " global_seq=" << server_ident
.global_seq()
2051 << " features_supported=" << std::hex
2052 << server_ident
.supported_features()
2053 << " features_required=" << server_ident
.required_features()
2054 << " flags=" << server_ident
.flags() << " cookie=" << std::dec
2055 << server_ident
.cookie() << dendl
;
2057 // is this who we intended to talk to?
2058 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2059 // of mon_host or something.
2060 if (!server_ident
.addrs().contains(connection
->target_addr
)) {
2061 ldout(cct
,1) << __func__
<< " peer identifies as " << server_ident
.addrs()
2062 << ", does not include " << connection
->target_addr
<< dendl
;
2066 server_cookie
= server_ident
.cookie();
2068 connection
->set_peer_addrs(server_ident
.addrs());
2069 peer_name
= entity_name_t(connection
->get_peer_type(), server_ident
.gid());
2070 connection
->set_features(server_ident
.supported_features() &
2071 connection
->policy
.features_supported
);
2072 peer_global_seq
= server_ident
.global_seq();
2074 connection
->policy
.lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
2076 backoff
= utime_t();
2077 ldout(cct
, 10) << __func__
<< " connect success " << connect_seq
2078 << ", lossy = " << connection
->policy
.lossy
<< ", features "
2079 << connection
->get_features() << dendl
;
2081 if (connection
->delay_state
) {
2082 ceph_assert(connection
->delay_state
->ready());
2085 connection
->dispatch_queue
->queue_connect(connection
);
2086 messenger
->ms_deliver_handle_fast_connect(connection
);
2091 /* Server Protocol Methods */
2093 CtPtr
ProtocolV2::start_server_banner_exchange() {
2094 ldout(cct
, 20) << __func__
<< dendl
;
2098 state
= BANNER_ACCEPTING
;
2100 return _banner_exchange(CONTINUATION(post_server_banner_exchange
));
2103 CtPtr
ProtocolV2::post_server_banner_exchange() {
2104 ldout(cct
, 20) << __func__
<< dendl
;
2106 state
= AUTH_ACCEPTING
;
2108 return CONTINUE(read_frame
);
2111 CtPtr
ProtocolV2::handle_auth_request(ceph::bufferlist
&payload
) {
2112 ldout(cct
, 20) << __func__
<< " payload.length()=" << payload
.length()
2115 if (state
!= AUTH_ACCEPTING
) {
2116 lderr(cct
) << __func__
<< " not in auth accept state!" << dendl
;
2120 auto request
= AuthRequestFrame::Decode(payload
);
2121 ldout(cct
, 10) << __func__
<< " AuthRequest(method=" << request
.method()
2122 << ", preferred_modes=" << request
.preferred_modes()
2123 << ", payload_len=" << request
.auth_payload().length() << ")"
2125 auth_meta
->auth_method
= request
.method();
2126 auth_meta
->con_mode
= messenger
->auth_server
->pick_con_mode(
2127 connection
->get_peer_type(), auth_meta
->auth_method
,
2128 request
.preferred_modes());
2129 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
2130 return _auth_bad_method(-EOPNOTSUPP
);
2132 return _handle_auth_request(request
.auth_payload(), false);
2135 CtPtr
ProtocolV2::_auth_bad_method(int r
)
2138 std::vector
<uint32_t> allowed_methods
;
2139 std::vector
<uint32_t> allowed_modes
;
2140 messenger
->auth_server
->get_supported_auth_methods(
2141 connection
->get_peer_type(), &allowed_methods
, &allowed_modes
);
2142 ldout(cct
, 1) << __func__
<< " auth_method " << auth_meta
->auth_method
2143 << " r " << cpp_strerror(r
)
2144 << ", allowed_methods " << allowed_methods
2145 << ", allowed_modes " << allowed_modes
2147 auto bad_method
= AuthBadMethodFrame::Encode(auth_meta
->auth_method
, r
,
2148 allowed_methods
, allowed_modes
);
2149 return WRITE(bad_method
, "bad auth method", read_frame
);
2152 CtPtr
ProtocolV2::_handle_auth_request(bufferlist
& auth_payload
, bool more
)
2154 if (!messenger
->auth_server
) {
2158 auto am
= auth_meta
;
2159 connection
->lock
.unlock();
2160 int r
= messenger
->auth_server
->handle_auth_request(
2161 connection
, am
.get(),
2162 more
, am
->auth_method
, auth_payload
,
2164 connection
->lock
.lock();
2165 if (state
!= AUTH_ACCEPTING
&& state
!= AUTH_ACCEPTING_MORE
) {
2166 ldout(cct
, 1) << __func__
2167 << " state changed while accept, it must be mark_down"
2169 ceph_assert(state
== CLOSED
);
2174 state
= AUTH_ACCEPTING_SIGN
;
2176 auto auth_done
= AuthDoneFrame::Encode(connection
->peer_global_id
,
2177 auth_meta
->con_mode
,
2179 return WRITE(auth_done
, "auth done", finish_auth
);
2180 } else if (r
== 0) {
2181 state
= AUTH_ACCEPTING_MORE
;
2183 auto more
= AuthReplyMoreFrame::Encode(reply
);
2184 return WRITE(more
, "auth reply more", read_frame
);
2185 } else if (r
== -EBUSY
) {
2186 // kick the client and maybe they'll come back later
2189 return _auth_bad_method(r
);
2193 CtPtr
ProtocolV2::finish_auth()
2195 ceph_assert(auth_meta
);
2196 // TODO: having a possibility to check whether we're server or client could
2197 // allow reusing finish_auth().
2198 session_stream_handlers
= \
2199 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct
, *auth_meta
, true);
2201 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
2202 auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.rxbuf
);
2203 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
2204 pre_auth
.enabled
= false;
2205 pre_auth
.rxbuf
.clear();
2206 return WRITE(sig_frame
, "auth signature", read_frame
);
2209 CtPtr
ProtocolV2::handle_auth_request_more(ceph::bufferlist
&payload
)
2211 ldout(cct
, 20) << __func__
2212 << " payload.length()=" << payload
.length() << dendl
;
2214 if (state
!= AUTH_ACCEPTING_MORE
) {
2215 lderr(cct
) << __func__
<< " not in auth accept more state!" << dendl
;
2219 auto auth_more
= AuthRequestMoreFrame::Decode(payload
);
2220 return _handle_auth_request(auth_more
.auth_payload(), true);
2223 CtPtr
ProtocolV2::handle_auth_signature(ceph::bufferlist
&payload
)
2225 ldout(cct
, 20) << __func__
2226 << " payload.length()=" << payload
.length() << dendl
;
2228 if (state
!= AUTH_ACCEPTING_SIGN
&& state
!= AUTH_CONNECTING_SIGN
) {
2229 lderr(cct
) << __func__
2230 << " pre-auth verification signature seen in wrong state!"
2235 auto sig_frame
= AuthSignatureFrame::Decode(payload
);
2237 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
2238 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.txbuf
);
2239 if (sig_frame
.signature() != actual_tx_sig
) {
2240 ldout(cct
, 2) << __func__
<< " pre-auth signature mismatch"
2241 << " actual_tx_sig=" << actual_tx_sig
2242 << " sig_frame.signature()=" << sig_frame
.signature()
2246 ldout(cct
, 20) << __func__
<< " pre-auth signature success"
2247 << " sig_frame.signature()=" << sig_frame
.signature()
2249 pre_auth
.txbuf
.clear();
2252 if (state
== AUTH_ACCEPTING_SIGN
) {
2253 // server had sent AuthDone and client responded with correct pre-auth
2254 // signature. we can start accepting new sessions/reconnects.
2255 state
= SESSION_ACCEPTING
;
2256 return CONTINUE(read_frame
);
2257 } else if (state
== AUTH_CONNECTING_SIGN
) {
2258 // this happened at client side
2259 return finish_client_auth();
2261 ceph_assert_always("state corruption" == nullptr);
2265 CtPtr
ProtocolV2::handle_client_ident(ceph::bufferlist
&payload
)
2267 ldout(cct
, 20) << __func__
2268 << " payload.length()=" << payload
.length() << dendl
;
2270 if (state
!= SESSION_ACCEPTING
) {
2271 lderr(cct
) << __func__
<< " not in session accept state!" << dendl
;
2275 auto client_ident
= ClientIdentFrame::Decode(payload
);
2277 ldout(cct
, 5) << __func__
<< " received client identification:"
2278 << " addrs=" << client_ident
.addrs()
2279 << " target=" << client_ident
.target_addr()
2280 << " gid=" << client_ident
.gid()
2281 << " global_seq=" << client_ident
.global_seq()
2282 << " features_supported=" << std::hex
2283 << client_ident
.supported_features()
2284 << " features_required=" << client_ident
.required_features()
2285 << " flags=" << client_ident
.flags()
2286 << " cookie=" << client_ident
.cookie() << std::dec
<< dendl
;
2288 if (client_ident
.addrs().empty() ||
2289 client_ident
.addrs().front() == entity_addr_t()) {
2290 ldout(cct
,5) << __func__
<< " oops, client_ident.addrs() is empty" << dendl
;
2291 return _fault(); // a v2 peer should never do this
2293 if (!messenger
->get_myaddrs().contains(client_ident
.target_addr())) {
2294 ldout(cct
,5) << __func__
<< " peer is trying to reach "
2295 << client_ident
.target_addr()
2296 << " which is not us (" << messenger
->get_myaddrs() << ")"
2301 connection
->set_peer_addrs(client_ident
.addrs());
2302 connection
->target_addr
= connection
->_infer_target_addr(client_ident
.addrs());
2304 peer_name
= entity_name_t(connection
->get_peer_type(), client_ident
.gid());
2305 connection
->set_peer_id(client_ident
.gid());
2307 client_cookie
= client_ident
.cookie();
2309 uint64_t feat_missing
=
2310 (connection
->policy
.features_required
| msgr2_required
) &
2311 ~(uint64_t)client_ident
.supported_features();
2313 ldout(cct
, 1) << __func__
<< " peer missing required features " << std::hex
2314 << feat_missing
<< std::dec
<< dendl
;
2315 auto ident_missing_features
=
2316 IdentMissingFeaturesFrame::Encode(feat_missing
);
2318 return WRITE(ident_missing_features
, "ident missing features", read_frame
);
2321 connection_features
=
2322 client_ident
.supported_features() & connection
->policy
.features_supported
;
2324 peer_global_seq
= client_ident
.global_seq();
2326 // Looks good so far, let's check if there is already an existing connection
2329 connection
->lock
.unlock();
2330 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
2333 existing
->protocol
->proto_type
!= 2) {
2334 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2335 << existing
->protocol
.get() << " version is "
2336 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
2337 existing
->mark_down();
2341 connection
->inject_delay();
2343 connection
->lock
.lock();
2344 if (state
!= SESSION_ACCEPTING
) {
2345 ldout(cct
, 1) << __func__
2346 << " state changed while accept, it must be mark_down"
2348 ceph_assert(state
== CLOSED
);
2353 return handle_existing_connection(existing
);
2356 // if everything is OK reply with server identification
2357 return send_server_ident();
2360 CtPtr
ProtocolV2::handle_reconnect(ceph::bufferlist
&payload
)
2362 ldout(cct
, 20) << __func__
2363 << " payload.length()=" << payload
.length() << dendl
;
2365 if (state
!= SESSION_ACCEPTING
) {
2366 lderr(cct
) << __func__
<< " not in session accept state!" << dendl
;
2370 auto reconnect
= ReconnectFrame::Decode(payload
);
2372 ldout(cct
, 5) << __func__
2373 << " received reconnect:"
2374 << " client_cookie=" << std::hex
<< reconnect
.client_cookie()
2375 << " server_cookie=" << reconnect
.server_cookie() << std::dec
2376 << " gs=" << reconnect
.global_seq()
2377 << " cs=" << reconnect
.connect_seq()
2378 << " ms=" << reconnect
.msg_seq()
2381 // Should we check if one of the ident.addrs match connection->target_addr
2382 // as we do in ProtocolV1?
2383 connection
->set_peer_addrs(reconnect
.addrs());
2384 connection
->target_addr
= connection
->_infer_target_addr(reconnect
.addrs());
2385 peer_global_seq
= reconnect
.global_seq();
2387 connection
->lock
.unlock();
2388 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
2391 existing
->protocol
->proto_type
!= 2) {
2392 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2393 << existing
->protocol
.get() << " version is "
2394 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
2395 existing
->mark_down();
2399 connection
->inject_delay();
2401 connection
->lock
.lock();
2402 if (state
!= SESSION_ACCEPTING
) {
2403 ldout(cct
, 1) << __func__
2404 << " state changed while accept, it must be mark_down"
2406 ceph_assert(state
== CLOSED
);
2411 // there is no existing connection therefore cannot reconnect to previous
2413 ldout(cct
, 0) << __func__
2414 << " no existing connection exists, reseting client" << dendl
;
2415 auto reset
= ResetFrame::Encode(true);
2416 return WRITE(reset
, "session reset", read_frame
);
2419 std::lock_guard
<std::mutex
> l(existing
->lock
);
2421 ProtocolV2
*exproto
= dynamic_cast<ProtocolV2
*>(existing
->protocol
.get());
2423 ldout(cct
, 1) << __func__
<< " existing=" << existing
<< dendl
;
2427 if (exproto
->state
== CLOSED
) {
2428 ldout(cct
, 5) << __func__
<< " existing " << existing
2429 << " already closed. Reseting client" << dendl
;
2430 auto reset
= ResetFrame::Encode(true);
2431 return WRITE(reset
, "session reset", read_frame
);
2434 if (exproto
->replacing
) {
2435 ldout(cct
, 1) << __func__
2436 << " existing racing replace happened while replacing."
2437 << " existing=" << existing
<< dendl
;
2438 auto retry
= RetryGlobalFrame::Encode(exproto
->peer_global_seq
);
2439 return WRITE(retry
, "session retry", read_frame
);
2442 if (exproto
->client_cookie
!= reconnect
.client_cookie()) {
2443 ldout(cct
, 1) << __func__
<< " existing=" << existing
2444 << " client cookie mismatch, I must have reseted:"
2445 << " cc=" << std::hex
<< exproto
->client_cookie
2446 << " rcc=" << reconnect
.client_cookie()
2447 << ", reseting client." << std::dec
2449 auto reset
= ResetFrame::Encode(connection
->policy
.resetcheck
);
2450 return WRITE(reset
, "session reset", read_frame
);
2451 } else if (exproto
->server_cookie
== 0) {
2452 // this happens when:
2453 // - a connects to b
2454 // - a sends client_ident
2455 // - b gets client_ident, sends server_ident and sets cookie X
2456 // - connection fault
2457 // - b reconnects to a with cookie X, connect_seq=1
2458 // - a has cookie==0
2459 ldout(cct
, 1) << __func__
<< " I was a client and didn't received the"
2460 << " server_ident. Asking peer to resume session"
2461 << " establishment" << dendl
;
2462 auto reset
= ResetFrame::Encode(false);
2463 return WRITE(reset
, "session reset", read_frame
);
2466 if (exproto
->peer_global_seq
> reconnect
.global_seq()) {
2467 ldout(cct
, 5) << __func__
2468 << " stale global_seq: sgs=" << exproto
->peer_global_seq
2469 << " cgs=" << reconnect
.global_seq()
2470 << ", ask client to retry global" << dendl
;
2471 auto retry
= RetryGlobalFrame::Encode(exproto
->peer_global_seq
);
2475 return WRITE(retry
, "session retry", read_frame
);
2478 if (exproto
->connect_seq
> reconnect
.connect_seq()) {
2479 ldout(cct
, 5) << __func__
2480 << " stale connect_seq scs=" << exproto
->connect_seq
2481 << " ccs=" << reconnect
.connect_seq()
2482 << " , ask client to retry" << dendl
;
2483 auto retry
= RetryFrame::Encode(exproto
->connect_seq
);
2484 return WRITE(retry
, "session retry", read_frame
);
2487 if (exproto
->connect_seq
== reconnect
.connect_seq()) {
2488 // reconnect race: both peers are sending reconnect messages
2489 if (existing
->peer_addrs
->msgr2_addr() >
2490 messenger
->get_myaddrs().msgr2_addr() &&
2491 !existing
->policy
.server
) {
2492 // the existing connection wins
2495 << " reconnect race detected, this connection loses to existing="
2496 << existing
<< dendl
;
2498 auto wait
= WaitFrame::Encode();
2499 return WRITE(wait
, "wait", read_frame
);
2501 // this connection wins
2502 ldout(cct
, 1) << __func__
2503 << " reconnect race detected, replacing existing="
2504 << existing
<< " socket by this connection's socket"
2509 ldout(cct
, 1) << __func__
<< " reconnect to existing=" << existing
<< dendl
;
2511 reconnecting
= true;
2513 // everything looks good
2514 exproto
->connect_seq
= reconnect
.connect_seq();
2515 exproto
->message_seq
= reconnect
.msg_seq();
2517 return reuse_connection(existing
, exproto
);
2520 CtPtr
ProtocolV2::handle_existing_connection(AsyncConnectionRef existing
) {
2521 ldout(cct
, 20) << __func__
<< " existing=" << existing
<< dendl
;
2523 std::lock_guard
<std::mutex
> l(existing
->lock
);
2525 ProtocolV2
*exproto
= dynamic_cast<ProtocolV2
*>(existing
->protocol
.get());
2527 ldout(cct
, 1) << __func__
<< " existing=" << existing
<< dendl
;
2531 if (exproto
->state
== CLOSED
) {
2532 ldout(cct
, 1) << __func__
<< " existing " << existing
<< " already closed."
2534 return send_server_ident();
2537 if (exproto
->replacing
) {
2538 ldout(cct
, 1) << __func__
2539 << " existing racing replace happened while replacing."
2540 << " existing=" << existing
<< dendl
;
2541 auto wait
= WaitFrame::Encode();
2542 return WRITE(wait
, "wait", read_frame
);
2545 if (exproto
->peer_global_seq
> peer_global_seq
) {
2546 ldout(cct
, 1) << __func__
<< " this is a stale connection, peer_global_seq="
2548 << " existing->peer_global_seq=" << exproto
->peer_global_seq
2549 << ", stopping this connection." << dendl
;
2551 connection
->dispatch_queue
->queue_reset(connection
);
2555 if (existing
->policy
.lossy
) {
2556 // existing connection can be thrown out in favor of this one
2558 << __func__
<< " existing=" << existing
2559 << " is a lossy channel. Stopping existing in favor of this connection"
2561 existing
->protocol
->stop();
2562 existing
->dispatch_queue
->queue_reset(existing
.get());
2563 return send_server_ident();
2566 if (exproto
->server_cookie
&& exproto
->client_cookie
&&
2567 exproto
->client_cookie
!= client_cookie
) {
2568 // Found previous session
2569 // peer has reseted and we're going to reuse the existing connection
2570 // by replacing the communication socket
2571 ldout(cct
, 1) << __func__
<< " found previous session existing=" << existing
2572 << ", peer must have reseted." << dendl
;
2573 if (connection
->policy
.resetcheck
) {
2574 exproto
->reset_session();
2576 return reuse_connection(existing
, exproto
);
2579 if (exproto
->client_cookie
== client_cookie
) {
2580 // session establishment interrupted between client_ident and server_ident,
2582 ldout(cct
, 1) << __func__
<< " found previous session existing=" << existing
2583 << ", continuing session establishment." << dendl
;
2584 return reuse_connection(existing
, exproto
);
2587 if (exproto
->state
== READY
|| exproto
->state
== STANDBY
) {
2588 ldout(cct
, 1) << __func__
<< " existing=" << existing
2589 << " is READY/STANDBY, lets reuse it" << dendl
;
2590 return reuse_connection(existing
, exproto
);
2593 // Looks like a connection race: server and client are both connecting to
2594 // each other at the same time.
2595 if (connection
->peer_addrs
->msgr2_addr() <
2596 messenger
->get_myaddrs().msgr2_addr() ||
2597 existing
->policy
.server
) {
2598 // this connection wins
2599 ldout(cct
, 1) << __func__
2600 << " connection race detected, replacing existing="
2601 << existing
<< " socket by this connection's socket" << dendl
;
2602 return reuse_connection(existing
, exproto
);
2604 // the existing connection wins
2607 << " connection race detected, this connection loses to existing="
2608 << existing
<< dendl
;
2609 ceph_assert(connection
->peer_addrs
->msgr2_addr() >
2610 messenger
->get_myaddrs().msgr2_addr());
2612 // make sure we follow through with opening the existing
2613 // connection (if it isn't yet open) since we know the peer
2614 // has something to send to us.
2615 existing
->send_keepalive();
2616 auto wait
= WaitFrame::Encode();
2617 return WRITE(wait
, "wait", read_frame
);
2621 CtPtr
ProtocolV2::reuse_connection(AsyncConnectionRef existing
,
2622 ProtocolV2
*exproto
) {
2623 ldout(cct
, 20) << __func__
<< " existing=" << existing
2624 << " reconnect=" << reconnecting
<< dendl
;
2626 connection
->inject_delay();
2628 std::lock_guard
<std::mutex
> l(existing
->write_lock
);
2630 connection
->center
->delete_file_event(connection
->cs
.fd(),
2631 EVENT_READABLE
| EVENT_WRITABLE
);
2633 if (existing
->delay_state
) {
2634 existing
->delay_state
->flush();
2635 ceph_assert(!connection
->delay_state
);
2637 exproto
->reset_recv_state();
2638 exproto
->pre_auth
.enabled
= false;
2640 if (!reconnecting
) {
2641 exproto
->client_cookie
= client_cookie
;
2642 exproto
->peer_name
= peer_name
;
2643 exproto
->connection_features
= connection_features
;
2644 existing
->set_features(connection_features
);
2646 exproto
->peer_global_seq
= peer_global_seq
;
2648 auto temp_cs
= std::move(connection
->cs
);
2649 EventCenter
*new_center
= connection
->center
;
2650 Worker
*new_worker
= connection
->worker
;
2652 ldout(messenger
->cct
, 5) << __func__
<< " stop myself to swap existing"
2654 // avoid _stop shutdown replacing socket
2655 // queue a reset on the new connection, which we're dumping for the old
2658 connection
->dispatch_queue
->queue_reset(connection
);
2660 exproto
->can_write
= false;
2661 exproto
->reconnecting
= reconnecting
;
2662 exproto
->replacing
= true;
2663 std::swap(exproto
->session_stream_handlers
, session_stream_handlers
);
2664 exproto
->auth_meta
= auth_meta
;
2665 existing
->state_offset
= 0;
2666 // avoid previous thread modify event
2667 exproto
->state
= NONE
;
2668 existing
->state
= AsyncConnection::STATE_NONE
;
2669 // Discard existing prefetch buffer in `recv_buf`
2670 existing
->recv_start
= existing
->recv_end
= 0;
2671 // there shouldn't exist any buffer
2672 ceph_assert(connection
->recv_start
== connection
->recv_end
);
2674 auto deactivate_existing
= std::bind(
2675 [existing
, new_worker
, new_center
, exproto
](ConnectedSocket
&cs
) mutable {
2676 // we need to delete time event in original thread
2678 std::lock_guard
<std::mutex
> l(existing
->lock
);
2679 existing
->write_lock
.lock();
2680 exproto
->requeue_sent();
2681 existing
->outcoming_bl
.clear();
2682 existing
->open_write
= false;
2683 existing
->write_lock
.unlock();
2684 if (exproto
->state
== NONE
) {
2685 existing
->shutdown_socket();
2686 existing
->cs
= std::move(cs
);
2687 existing
->worker
->references
--;
2688 new_worker
->references
++;
2689 existing
->logger
= new_worker
->get_perf_counter();
2690 existing
->worker
= new_worker
;
2691 existing
->center
= new_center
;
2692 if (existing
->delay_state
)
2693 existing
->delay_state
->set_center(new_center
);
2694 } else if (exproto
->state
== CLOSED
) {
2695 auto back_to_close
= std::bind(
2696 [](ConnectedSocket
&cs
) mutable { cs
.close(); }, std::move(cs
));
2697 new_center
->submit_to(new_center
->get_id(),
2698 std::move(back_to_close
), true);
2705 // Before changing existing->center, it may already exists some
2706 // events in existing->center's queue. Then if we mark down
2707 // `existing`, it will execute in another thread and clean up
2708 // connection. Previous event will result in segment fault
2709 auto transfer_existing
= [existing
, exproto
]() mutable {
2710 std::lock_guard
<std::mutex
> l(existing
->lock
);
2711 if (exproto
->state
== CLOSED
) return;
2712 ceph_assert(exproto
->state
== NONE
);
2714 exproto
->state
= SESSION_ACCEPTING
;
2715 existing
->state
= AsyncConnection::STATE_CONNECTION_ESTABLISHED
;
2716 existing
->center
->create_file_event(existing
->cs
.fd(), EVENT_READABLE
,
2717 existing
->read_handler
);
2718 if (!exproto
->reconnecting
) {
2719 exproto
->run_continuation(exproto
->send_server_ident());
2721 exproto
->run_continuation(exproto
->send_reconnect_ok());
2724 if (existing
->center
->in_thread())
2725 transfer_existing();
2727 existing
->center
->submit_to(existing
->center
->get_id(),
2728 std::move(transfer_existing
), true);
2730 std::move(temp_cs
));
2732 existing
->center
->submit_to(existing
->center
->get_id(),
2733 std::move(deactivate_existing
), true);
2737 CtPtr
ProtocolV2::send_server_ident() {
2738 ldout(cct
, 20) << __func__
<< dendl
;
2740 // this is required for the case when this connection is being replaced
2741 out_seq
= discard_requeued_up_to(out_seq
, 0);
2744 if (!connection
->policy
.lossy
) {
2745 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
2749 if (connection
->policy
.lossy
) {
2750 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
2753 uint64_t gs
= messenger
->get_global_seq();
2754 auto server_ident
= ServerIdentFrame::Encode(
2755 messenger
->get_myaddrs(),
2756 messenger
->get_myname().num(),
2758 connection
->policy
.features_supported
,
2759 connection
->policy
.features_required
| msgr2_required
,
2763 ldout(cct
, 5) << __func__
<< " sending identification:"
2764 << " addrs=" << messenger
->get_myaddrs()
2765 << " gid=" << messenger
->get_myname().num()
2766 << " global_seq=" << gs
<< " features_supported=" << std::hex
2767 << connection
->policy
.features_supported
2768 << " features_required="
2769 << (connection
->policy
.features_required
| msgr2_required
)
2770 << " flags=" << flags
<< " cookie=" << std::dec
<< server_cookie
2773 connection
->lock
.unlock();
2774 // Because "replacing" will prevent other connections preempt this addr,
2775 // it's safe that here we don't acquire Connection's lock
2776 ssize_t r
= messenger
->accept_conn(connection
);
2778 connection
->inject_delay();
2780 connection
->lock
.lock();
2783 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2784 << connection
->peer_addrs
->msgr2_addr()
2785 << " just fail later one(this)" << dendl
;
2786 connection
->inject_delay();
2789 if (state
!= SESSION_ACCEPTING
) {
2790 ldout(cct
, 1) << __func__
2791 << " state changed while accept_conn, it must be mark_down"
2793 ceph_assert(state
== CLOSED
|| state
== NONE
);
2794 messenger
->unregister_conn(connection
);
2795 connection
->inject_delay();
2799 connection
->set_features(connection_features
);
2802 connection
->dispatch_queue
->queue_accept(connection
);
2803 messenger
->ms_deliver_handle_fast_accept(connection
);
2807 return WRITE(server_ident
, "server ident", server_ready
);
2810 CtPtr
ProtocolV2::server_ready() {
2811 ldout(cct
, 20) << __func__
<< dendl
;
2813 if (connection
->delay_state
) {
2814 ceph_assert(connection
->delay_state
->ready());
2820 CtPtr
ProtocolV2::send_reconnect_ok() {
2821 ldout(cct
, 20) << __func__
<< dendl
;
2823 out_seq
= discard_requeued_up_to(out_seq
, message_seq
);
2825 uint64_t ms
= in_seq
;
2826 auto reconnect_ok
= ReconnectOkFrame::Encode(ms
);
2828 ldout(cct
, 5) << __func__
<< " sending reconnect_ok: msg_seq=" << ms
<< dendl
;
2830 connection
->lock
.unlock();
2831 // Because "replacing" will prevent other connections preempt this addr,
2832 // it's safe that here we don't acquire Connection's lock
2833 ssize_t r
= messenger
->accept_conn(connection
);
2835 connection
->inject_delay();
2837 connection
->lock
.lock();
2840 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2841 << connection
->peer_addrs
->msgr2_addr()
2842 << " just fail later one(this)" << dendl
;
2843 connection
->inject_delay();
2846 if (state
!= SESSION_ACCEPTING
) {
2847 ldout(cct
, 1) << __func__
2848 << " state changed while accept_conn, it must be mark_down"
2850 ceph_assert(state
== CLOSED
|| state
== NONE
);
2851 messenger
->unregister_conn(connection
);
2852 connection
->inject_delay();
2857 connection
->dispatch_queue
->queue_accept(connection
);
2858 messenger
->ms_deliver_handle_fast_accept(connection
);
2862 return WRITE(reconnect_ok
, "reconnect ok", server_ready
);