1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "ProtocolV2.h"
7 #include "AsyncMessenger.h"
9 #include "common/EventTrace.h"
10 #include "common/ceph_crypto.h"
11 #include "common/errno.h"
12 #include "include/random.h"
13 #include "auth/AuthClient.h"
14 #include "auth/AuthServer.h"
16 #define dout_subsys ceph_subsys_ms
18 #define dout_prefix _conn_prefix(_dout)
19 std::ostream
&ProtocolV2::_conn_prefix(std::ostream
*_dout
) {
20 return *_dout
<< "--2- " << messenger
->get_myaddrs() << " >> "
21 << *connection
->peer_addrs
<< " conn(" << connection
<< " "
23 << " " << ceph_con_mode_name(auth_meta
->con_mode
)
24 << " :" << connection
->port
25 << " s=" << get_state_name(state
) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq
<< " l=" << connection
->policy
.lossy
27 << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features
,
29 << " rx=" << session_stream_handlers
.rx
.get()
30 << " tx=" << session_stream_handlers
.tx
.get()
34 using namespace ceph::msgr::v2
;
36 using CtPtr
= Ct
<ProtocolV2
> *;
37 using CtRef
= Ct
<ProtocolV2
> &;
39 void ProtocolV2::run_continuation(CtPtr pcontinuation
) {
41 run_continuation(*pcontinuation
);
45 void ProtocolV2::run_continuation(CtRef continuation
) {
47 CONTINUATION_RUN(continuation
)
48 } catch (const ceph::buffer::error
&e
) {
49 lderr(cct
) << __func__
<< " failed decoding of frame header: " << e
.what()
52 } catch (const ceph::crypto::onwire::MsgAuthError
&e
) {
53 lderr(cct
) << __func__
<< " " << e
.what() << dendl
;
55 } catch (const DecryptionError
&) {
56 lderr(cct
) << __func__
<< " failed to decrypt frame payload" << dendl
;
60 #define WRITE(B, D, C) write(D, CONTINUATION(C), B)
62 #define READ(L, C) read(CONTINUATION(C), ceph::buffer::ptr_node::create(ceph::buffer::create(L)))
64 #define READ_RXBUF(B, C) read(CONTINUATION(C), B)
66 #ifdef UNIT_TESTS_BUILT
68 #define INTERCEPT(S) { \
69 if(connection->interceptor) { \
70 auto a = connection->interceptor->intercept(connection, (S)); \
71 if (a == Interceptor::ACTION::FAIL) { \
73 } else if (a == Interceptor::ACTION::STOP) { \
75 connection->dispatch_queue->queue_reset(connection); \
83 ProtocolV2::ProtocolV2(AsyncConnection
*connection
)
84 : Protocol(2, connection
),
86 peer_supported_features(0),
96 bannerExchangeCallback(nullptr),
97 tx_frame_asm(&session_stream_handlers
, false),
98 rx_frame_asm(&session_stream_handlers
, false),
99 next_tag(static_cast<Tag
>(0)),
103 ProtocolV2::~ProtocolV2() {
106 void ProtocolV2::connect() {
107 ldout(cct
, 1) << __func__
<< dendl
;
108 state
= START_CONNECT
;
109 pre_auth
.enabled
= true;
112 void ProtocolV2::accept() {
113 ldout(cct
, 1) << __func__
<< dendl
;
114 state
= START_ACCEPT
;
117 bool ProtocolV2::is_connected() { return can_write
; }
120 * Tears down the message queues, and removes them from the
121 * DispatchQueue Must hold write_lock prior to calling.
123 void ProtocolV2::discard_out_queue() {
124 ldout(cct
, 10) << __func__
<< " started" << dendl
;
126 for (auto p
= sent
.begin(); p
!= sent
.end(); ++p
) {
127 ldout(cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
131 for (auto& [ prio
, entries
] : out_queue
) {
132 static_cast<void>(prio
);
133 for (auto& entry
: entries
) {
134 ldout(cct
, 20) << __func__
<< " discard " << *entry
.m
<< dendl
;
139 write_in_progress
= false;
142 void ProtocolV2::reset_session() {
143 ldout(cct
, 1) << __func__
<< dendl
;
145 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
146 if (connection
->delay_state
) {
147 connection
->delay_state
->discard();
150 connection
->dispatch_queue
->discard_queue(connection
->conn_id
);
152 connection
->outgoing_bl
.clear();
154 connection
->dispatch_queue
->queue_remote_reset(connection
);
167 void ProtocolV2::stop() {
168 ldout(cct
, 1) << __func__
<< dendl
;
169 if (state
== CLOSED
) {
173 if (connection
->delay_state
) connection
->delay_state
->flush();
175 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
186 void ProtocolV2::fault() { _fault(); }
188 void ProtocolV2::requeue_sent() {
189 write_in_progress
= false;
194 auto& rq
= out_queue
[CEPH_MSG_PRIO_HIGHEST
];
195 out_seq
-= sent
.size();
196 while (!sent
.empty()) {
197 Message
*m
= sent
.back();
199 ldout(cct
, 5) << __func__
<< " requeueing message m=" << m
200 << " seq=" << m
->get_seq() << " type=" << m
->get_type() << " "
203 rq
.emplace_front(out_queue_entry_t
{false, m
});
207 uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
) {
208 ldout(cct
, 10) << __func__
<< " " << seq
<< dendl
;
209 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
210 if (out_queue
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
213 auto& rq
= out_queue
[CEPH_MSG_PRIO_HIGHEST
];
214 uint64_t count
= out_seq
;
215 while (!rq
.empty()) {
216 Message
* const m
= rq
.front().m
;
217 if (m
->get_seq() == 0 || m
->get_seq() > seq
) break;
218 ldout(cct
, 5) << __func__
<< " discarding message m=" << m
219 << " seq=" << m
->get_seq() << " ack_seq=" << seq
<< " "
225 if (rq
.empty()) out_queue
.erase(CEPH_MSG_PRIO_HIGHEST
);
229 void ProtocolV2::reset_security() {
230 ldout(cct
, 5) << __func__
<< dendl
;
232 auth_meta
.reset(new AuthConnectionMeta
);
233 session_stream_handlers
.rx
.reset(nullptr);
234 session_stream_handlers
.tx
.reset(nullptr);
235 pre_auth
.rxbuf
.clear();
236 pre_auth
.txbuf
.clear();
239 // it's expected the `write_lock` is held while calling this method.
240 void ProtocolV2::reset_recv_state() {
241 ldout(cct
, 5) << __func__
<< dendl
;
243 if (!connection
->center
->in_thread()) {
244 // execute in the same thread that uses the rx/tx handlers. We need
245 // to do the warp because holding `write_lock` is not enough as
246 // `write_event()` unlocks it just before calling `write_message()`.
247 // `submit_to()` here is NOT blocking.
248 connection
->center
->submit_to(connection
->center
->get_id(), [this] {
249 ldout(cct
, 5) << "reset_recv_state (warped) reseting crypto handlers"
251 // Possibly unnecessary. See the comment in `deactivate_existing`.
252 std::lock_guard
<std::mutex
> l(connection
->lock
);
253 std::lock_guard
<std::mutex
> wl(connection
->write_lock
);
255 }, /* always_async = */true);
260 // clean read and write callbacks
261 connection
->pendingReadLen
.reset();
262 connection
->writeCallback
.reset();
264 next_tag
= static_cast<Tag
>(0);
269 size_t ProtocolV2::get_current_msg_size() const {
270 ceph_assert(rx_frame_asm
.get_num_segments() > 0);
272 // we don't include SegmentIndex::Msg::HEADER.
273 for (size_t i
= 1; i
< rx_frame_asm
.get_num_segments(); i
++) {
274 sum
+= rx_frame_asm
.get_segment_logical_len(i
);
279 void ProtocolV2::reset_throttle() {
280 if (state
> THROTTLE_MESSAGE
&& state
<= THROTTLE_DONE
&&
281 connection
->policy
.throttler_messages
) {
282 ldout(cct
, 10) << __func__
<< " releasing " << 1
283 << " message to policy throttler "
284 << connection
->policy
.throttler_messages
->get_current()
285 << "/" << connection
->policy
.throttler_messages
->get_max()
287 connection
->policy
.throttler_messages
->put();
289 if (state
> THROTTLE_BYTES
&& state
<= THROTTLE_DONE
) {
290 if (connection
->policy
.throttler_bytes
) {
291 const size_t cur_msg_size
= get_current_msg_size();
292 ldout(cct
, 10) << __func__
<< " releasing " << cur_msg_size
293 << " bytes to policy throttler "
294 << connection
->policy
.throttler_bytes
->get_current() << "/"
295 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
296 connection
->policy
.throttler_bytes
->put(cur_msg_size
);
299 if (state
> THROTTLE_DISPATCH_QUEUE
&& state
<= THROTTLE_DONE
) {
300 const size_t cur_msg_size
= get_current_msg_size();
302 << __func__
<< " releasing " << cur_msg_size
303 << " bytes to dispatch_queue throttler "
304 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
305 << connection
->dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
306 connection
->dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
310 CtPtr
ProtocolV2::_fault() {
311 ldout(cct
, 10) << __func__
<< dendl
;
313 if (state
== CLOSED
|| state
== NONE
) {
314 ldout(cct
, 10) << __func__
<< " connection is already closed" << dendl
;
318 if (connection
->policy
.lossy
&&
319 !(state
>= START_CONNECT
&& state
<= SESSION_RECONNECTING
)) {
320 ldout(cct
, 2) << __func__
<< " on lossy channel, failing" << dendl
;
322 connection
->dispatch_queue
->queue_reset(connection
);
326 connection
->write_lock
.lock();
329 // requeue sent items
332 if (out_queue
.empty() && state
>= START_ACCEPT
&&
333 state
<= SESSION_ACCEPTING
&& !replacing
) {
334 ldout(cct
, 2) << __func__
<< " with nothing to send and in the half "
335 << " accept state just closed" << dendl
;
336 connection
->write_lock
.unlock();
338 connection
->dispatch_queue
->queue_reset(connection
);
346 reconnecting
= false;
348 if (connection
->policy
.standby
&& out_queue
.empty() && !keepalive
&&
350 ldout(cct
, 1) << __func__
<< " with nothing to send, going to standby"
353 connection
->write_lock
.unlock();
356 if (connection
->policy
.server
) {
357 ldout(cct
, 1) << __func__
<< " server, going to standby, even though i have stuff queued" << dendl
;
359 connection
->write_lock
.unlock();
363 connection
->write_lock
.unlock();
365 if (!(state
>= START_CONNECT
&& state
<= SESSION_RECONNECTING
) &&
367 state
!= SESSION_ACCEPTING
/* due to connection race */) {
368 // policy maybe empty when state is in accept
369 if (connection
->policy
.server
) {
370 ldout(cct
, 1) << __func__
<< " server, going to standby" << dendl
;
373 ldout(cct
, 1) << __func__
<< " initiating reconnect" << dendl
;
375 global_seq
= messenger
->get_global_seq();
376 state
= START_CONNECT
;
377 pre_auth
.enabled
= true;
378 connection
->state
= AsyncConnection::STATE_CONNECTING
;
381 connection
->center
->dispatch_event_external(connection
->read_handler
);
384 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
385 } else if (backoff
== utime_t()) {
386 backoff
.set_from_double(cct
->_conf
->ms_initial_backoff
);
389 if (backoff
> cct
->_conf
->ms_max_backoff
)
390 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
397 global_seq
= messenger
->get_global_seq();
398 state
= START_CONNECT
;
399 pre_auth
.enabled
= true;
400 connection
->state
= AsyncConnection::STATE_CONNECTING
;
401 ldout(cct
, 1) << __func__
<< " waiting " << backoff
<< dendl
;
403 connection
->register_time_events
.insert(
404 connection
->center
->create_time_event(backoff
.to_nsec() / 1000,
405 connection
->wakeup_handler
));
410 void ProtocolV2::prepare_send_message(uint64_t features
,
412 ldout(cct
, 20) << __func__
<< " m=" << *m
<< dendl
;
414 // associate message with Connection (for benefit of encode_payload)
415 ldout(cct
, 20) << __func__
<< (m
->empty_payload() ? " encoding features " : " half-reencoding features ")
416 << features
<< " " << m
<< " " << *m
<< dendl
;
418 // encode and copy out of *m
419 m
->encode(features
, 0);
422 void ProtocolV2::send_message(Message
*m
) {
423 uint64_t f
= connection
->get_features();
425 // TODO: Currently not all messages supports reencode like MOSDMap, so here
426 // only let fast dispatch support messages prepare message
427 const bool can_fast_prepare
= messenger
->ms_can_fast_dispatch(m
);
428 if (can_fast_prepare
) {
429 prepare_send_message(f
, m
);
432 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
433 bool is_prepared
= can_fast_prepare
;
434 // "features" changes will change the payload encoding
435 if (can_fast_prepare
&& (!can_write
|| connection
->get_features() != f
)) {
436 // ensure the correctness of message encoding
439 ldout(cct
, 10) << __func__
<< " clear encoded buffer previous " << f
440 << " != " << connection
->get_features() << dendl
;
442 if (state
== CLOSED
) {
443 ldout(cct
, 10) << __func__
<< " connection closed."
444 << " Drop message " << m
<< dendl
;
447 ldout(cct
, 5) << __func__
<< " enqueueing message m=" << m
448 << " type=" << m
->get_type() << " " << *m
<< dendl
;
449 m
->queue_start
= ceph::mono_clock::now();
450 m
->trace
.event("async enqueueing message");
451 out_queue
[m
->get_priority()].emplace_back(
452 out_queue_entry_t
{is_prepared
, m
});
453 ldout(cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
455 if (((!replacing
&& can_write
) || state
== STANDBY
) && !write_in_progress
) {
456 write_in_progress
= true;
457 connection
->center
->dispatch_event_external(connection
->write_handler
);
462 void ProtocolV2::send_keepalive() {
463 ldout(cct
, 10) << __func__
<< dendl
;
464 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
465 if (state
!= CLOSED
) {
467 connection
->center
->dispatch_event_external(connection
->write_handler
);
471 void ProtocolV2::read_event() {
472 ldout(cct
, 20) << __func__
<< dendl
;
476 run_continuation(CONTINUATION(start_client_banner_exchange
));
479 run_continuation(CONTINUATION(start_server_banner_exchange
));
482 run_continuation(CONTINUATION(read_frame
));
484 case THROTTLE_MESSAGE
:
485 run_continuation(CONTINUATION(throttle_message
));
488 run_continuation(CONTINUATION(throttle_bytes
));
490 case THROTTLE_DISPATCH_QUEUE
:
491 run_continuation(CONTINUATION(throttle_dispatch_queue
));
498 ProtocolV2::out_queue_entry_t
ProtocolV2::_get_next_outgoing() {
499 out_queue_entry_t out_entry
;
501 if (!out_queue
.empty()) {
502 auto it
= out_queue
.rbegin();
503 auto& entries
= it
->second
;
504 ceph_assert(!entries
.empty());
505 out_entry
= entries
.front();
507 if (entries
.empty()) {
508 out_queue
.erase(it
->first
);
514 ssize_t
ProtocolV2::write_message(Message
*m
, bool more
) {
516 ceph_assert(connection
->center
->in_thread());
517 m
->set_seq(++out_seq
);
519 connection
->lock
.lock();
520 uint64_t ack_seq
= in_seq
;
522 connection
->lock
.unlock();
524 ceph_msg_header
&header
= m
->get_header();
525 ceph_msg_footer
&footer
= m
->get_footer();
527 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
528 header
.type
, header
.priority
,
530 init_le32(0), header
.data_off
,
532 footer
.flags
, header
.compat_version
,
535 auto message
= MessageFrame::Encode(
540 if (!append_frame(message
)) {
545 ldout(cct
, 5) << __func__
<< " sending message m=" << m
546 << " seq=" << m
->get_seq() << " " << *m
<< dendl
;
548 m
->trace
.event("async writing message");
549 ldout(cct
, 20) << __func__
<< " sending m=" << m
<< " seq=" << m
->get_seq()
550 << " src=" << entity_name_t(messenger
->get_myname())
551 << " off=" << header2
.data_off
553 ssize_t total_send_size
= connection
->outgoing_bl
.length();
554 ssize_t rc
= connection
->_try_send(more
);
556 ldout(cct
, 1) << __func__
<< " error sending " << m
<< ", "
557 << cpp_strerror(rc
) << dendl
;
559 connection
->logger
->inc(
560 l_msgr_send_bytes
, total_send_size
- connection
->outgoing_bl
.length());
561 ldout(cct
, 10) << __func__
<< " sending " << m
562 << (rc
? " continuely." : " done.") << dendl
;
565 #if defined(WITH_EVENTTRACE)
566 if (m
->get_type() == CEPH_MSG_OSD_OP
)
567 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
568 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
569 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
577 bool ProtocolV2::append_frame(F
& frame
) {
580 bl
= frame
.get_buffer(tx_frame_asm
);
581 } catch (ceph::crypto::onwire::TxHandlerError
&e
) {
582 ldout(cct
, 1) << __func__
<< " " << e
.what() << dendl
;
586 ldout(cct
, 25) << __func__
<< " assembled frame " << bl
.length()
587 << " bytes " << tx_frame_asm
<< dendl
;
588 connection
->outgoing_bl
.append(bl
);
592 void ProtocolV2::handle_message_ack(uint64_t seq
) {
593 if (connection
->policy
.lossy
) { // lossy connections don't keep sent messages
597 ldout(cct
, 15) << __func__
<< " seq=" << seq
<< dendl
;
600 static const int max_pending
= 128;
602 Message
*pending
[max_pending
];
603 auto now
= ceph::mono_clock::now();
604 connection
->write_lock
.lock();
605 while (!sent
.empty() && sent
.front()->get_seq() <= seq
&& i
< max_pending
) {
606 Message
*m
= sent
.front();
609 ldout(cct
, 10) << __func__
<< " got ack seq " << seq
610 << " >= " << m
->get_seq() << " on " << m
<< " " << *m
613 connection
->write_lock
.unlock();
614 connection
->logger
->tinc(l_msgr_handle_ack_lat
, ceph::mono_clock::now() - now
);
615 for (int k
= 0; k
< i
; k
++) {
620 void ProtocolV2::write_event() {
621 ldout(cct
, 10) << __func__
<< dendl
;
624 connection
->write_lock
.lock();
627 ldout(cct
, 10) << __func__
<< " appending keepalive" << dendl
;
628 auto keepalive_frame
= KeepAliveFrame::Encode();
629 if (!append_frame(keepalive_frame
)) {
630 connection
->write_lock
.unlock();
631 connection
->lock
.lock();
633 connection
->lock
.unlock();
639 auto start
= ceph::mono_clock::now();
642 const auto out_entry
= _get_next_outgoing();
647 if (!connection
->policy
.lossy
) {
649 sent
.push_back(out_entry
.m
);
652 more
= !out_queue
.empty();
653 connection
->write_lock
.unlock();
655 // send_message or requeue messages may not encode message
656 if (!out_entry
.is_prepared
) {
657 prepare_send_message(connection
->get_features(), out_entry
.m
);
660 if (out_entry
.m
->queue_start
!= ceph::mono_time()) {
661 connection
->logger
->tinc(l_msgr_send_messages_queue_lat
,
662 ceph::mono_clock::now() -
663 out_entry
.m
->queue_start
);
666 r
= write_message(out_entry
.m
, more
);
668 connection
->write_lock
.lock();
672 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
675 // Outbound message in-progress, thread will be re-awoken
676 // when the outbound socket is writeable again
680 write_in_progress
= false;
682 // if r > 0 mean data still lefted, so no need _try_send.
684 uint64_t left
= ack_left
;
686 ldout(cct
, 10) << __func__
<< " try send msg ack, acked " << left
687 << " messages" << dendl
;
688 auto ack_frame
= AckFrame::Encode(in_seq
);
689 if (append_frame(ack_frame
)) {
692 r
= connection
->_try_send(left
);
696 } else if (is_queued()) {
697 r
= connection
->_try_send();
700 connection
->write_lock
.unlock();
702 connection
->logger
->tinc(l_msgr_running_send_time
,
703 ceph::mono_clock::now() - start
);
705 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
706 connection
->lock
.lock();
708 connection
->lock
.unlock();
712 write_in_progress
= false;
713 connection
->write_lock
.unlock();
714 connection
->lock
.lock();
715 connection
->write_lock
.lock();
716 if (state
== STANDBY
&& !connection
->policy
.server
&& is_queued()) {
717 ldout(cct
, 10) << __func__
<< " policy.server is false" << dendl
;
718 if (server_cookie
) { // only increment connect_seq if there is a session
721 connection
->_connect();
722 } else if (connection
->cs
&& state
!= NONE
&& state
!= CLOSED
&&
723 state
!= START_CONNECT
) {
724 r
= connection
->_try_send();
726 ldout(cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
727 connection
->write_lock
.unlock();
729 connection
->lock
.unlock();
733 connection
->write_lock
.unlock();
734 connection
->lock
.unlock();
738 bool ProtocolV2::is_queued() {
739 return !out_queue
.empty() || connection
->is_queued();
742 CtPtr
ProtocolV2::read(CONTINUATION_RXBPTR_TYPE
<ProtocolV2
> &next
,
743 rx_buffer_t
&&buffer
) {
744 const auto len
= buffer
->length();
745 const auto buf
= buffer
->c_str();
746 next
.node
= std::move(buffer
);
747 ssize_t r
= connection
->read(len
, buf
,
748 [&next
, this](char *buffer
, int r
) {
749 if (unlikely(pre_auth
.enabled
) && r
>= 0) {
750 pre_auth
.rxbuf
.append(*next
.node
);
751 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
752 pre_auth
.rxbuf
.length() < 20000000);
755 run_continuation(next
);
758 // error or done synchronously
759 if (unlikely(pre_auth
.enabled
) && r
== 0) {
760 pre_auth
.rxbuf
.append(*next
.node
);
761 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
762 pre_auth
.rxbuf
.length() < 20000000);
772 CtPtr
ProtocolV2::write(const std::string
&desc
,
773 CONTINUATION_TYPE
<ProtocolV2
> &next
,
777 bl
= frame
.get_buffer(tx_frame_asm
);
778 } catch (ceph::crypto::onwire::TxHandlerError
&e
) {
779 ldout(cct
, 1) << __func__
<< " " << e
.what() << dendl
;
783 ldout(cct
, 25) << __func__
<< " assembled frame " << bl
.length()
784 << " bytes " << tx_frame_asm
<< dendl
;
785 return write(desc
, next
, bl
);
788 CtPtr
ProtocolV2::write(const std::string
&desc
,
789 CONTINUATION_TYPE
<ProtocolV2
> &next
,
790 ceph::bufferlist
&buffer
) {
791 if (unlikely(pre_auth
.enabled
)) {
792 pre_auth
.txbuf
.append(buffer
);
793 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
794 pre_auth
.txbuf
.length() < 20000000);
798 connection
->write(buffer
, [&next
, desc
, this](int r
) {
800 ldout(cct
, 1) << __func__
<< " " << desc
<< " write failed r=" << r
801 << " (" << cpp_strerror(r
) << ")" << dendl
;
802 connection
->inject_delay();
805 run_continuation(next
);
809 ldout(cct
, 1) << __func__
<< " " << desc
<< " write failed r=" << r
810 << " (" << cpp_strerror(r
) << ")" << dendl
;
820 CtPtr
ProtocolV2::_banner_exchange(CtRef callback
) {
821 ldout(cct
, 20) << __func__
<< dendl
;
822 bannerExchangeCallback
= &callback
;
824 ceph::bufferlist banner_payload
;
826 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
827 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
830 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
831 encode((uint16_t)banner_payload
.length(), bl
, 0);
832 bl
.claim_append(banner_payload
);
834 INTERCEPT(state
== BANNER_CONNECTING
? 3 : 4);
836 return WRITE(bl
, "banner", _wait_for_peer_banner
);
839 CtPtr
ProtocolV2::_wait_for_peer_banner() {
840 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(ceph_le16
);
841 return READ(banner_len
, _handle_peer_banner
);
844 CtPtr
ProtocolV2::_handle_peer_banner(rx_buffer_t
&&buffer
, int r
) {
845 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
848 ldout(cct
, 1) << __func__
<< " read peer banner failed r=" << r
<< " ("
849 << cpp_strerror(r
) << ")" << dendl
;
853 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
855 if (memcmp(buffer
->c_str(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
)) {
856 if (memcmp(buffer
->c_str(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
857 lderr(cct
) << __func__
<< " peer " << *connection
->peer_addrs
858 << " is using msgr V1 protocol" << dendl
;
861 ldout(cct
, 1) << __func__
<< " accept peer sent bad banner" << dendl
;
865 uint16_t payload_len
;
867 buffer
->set_offset(banner_prefix_len
);
868 buffer
->set_length(sizeof(ceph_le16
));
869 bl
.push_back(std::move(buffer
));
870 auto ti
= bl
.cbegin();
873 decode(payload_len
, ti
);
874 } catch (const ceph::buffer::error
&e
) {
875 lderr(cct
) << __func__
<< " decode banner payload len failed " << dendl
;
879 INTERCEPT(state
== BANNER_CONNECTING
? 5 : 6);
881 return READ(payload_len
, _handle_peer_banner_payload
);
884 CtPtr
ProtocolV2::_handle_peer_banner_payload(rx_buffer_t
&&buffer
, int r
) {
885 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
888 ldout(cct
, 1) << __func__
<< " read peer banner payload failed r=" << r
889 << " (" << cpp_strerror(r
) << ")" << dendl
;
893 uint64_t peer_supported_features
;
894 uint64_t peer_required_features
;
898 bl
.push_back(std::move(buffer
));
899 auto ti
= bl
.cbegin();
901 decode(peer_supported_features
, ti
);
902 decode(peer_required_features
, ti
);
903 } catch (const ceph::buffer::error
&e
) {
904 lderr(cct
) << __func__
<< " decode banner payload failed " << dendl
;
908 ldout(cct
, 1) << __func__
<< " supported=" << std::hex
909 << peer_supported_features
<< " required=" << std::hex
910 << peer_required_features
<< std::dec
<< dendl
;
912 // Check feature bit compatibility
914 uint64_t supported_features
= CEPH_MSGR2_SUPPORTED_FEATURES
;
915 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
917 if ((required_features
& peer_supported_features
) != required_features
) {
918 ldout(cct
, 1) << __func__
<< " peer does not support all required features"
919 << " required=" << std::hex
<< required_features
920 << " supported=" << std::hex
<< peer_supported_features
921 << std::dec
<< dendl
;
923 connection
->dispatch_queue
->queue_reset(connection
);
926 if ((supported_features
& peer_required_features
) != peer_required_features
) {
927 ldout(cct
, 1) << __func__
<< " we do not support all peer required features"
928 << " required=" << std::hex
<< peer_required_features
929 << " supported=" << supported_features
<< std::dec
<< dendl
;
931 connection
->dispatch_queue
->queue_reset(connection
);
935 this->peer_supported_features
= peer_supported_features
;
936 if (peer_required_features
== 0) {
937 this->connection_features
= msgr2_required
;
940 // if the peer supports msgr2.1, switch to it
941 bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
942 tx_frame_asm
.set_is_rev1(is_rev1
);
943 rx_frame_asm
.set_is_rev1(is_rev1
);
945 if (state
== BANNER_CONNECTING
) {
946 state
= HELLO_CONNECTING
;
949 ceph_assert(state
== BANNER_ACCEPTING
);
950 state
= HELLO_ACCEPTING
;
953 auto hello
= HelloFrame::Encode(messenger
->get_mytype(),
954 connection
->target_addr
);
956 INTERCEPT(state
== HELLO_CONNECTING
? 7 : 8);
958 return WRITE(hello
, "hello frame", read_frame
);
961 CtPtr
ProtocolV2::handle_hello(ceph::bufferlist
&payload
)
963 ldout(cct
, 20) << __func__
964 << " payload.length()=" << payload
.length() << dendl
;
966 if (state
!= HELLO_CONNECTING
&& state
!= HELLO_ACCEPTING
) {
967 lderr(cct
) << __func__
<< " not in hello exchange state!" << dendl
;
971 auto hello
= HelloFrame::Decode(payload
);
973 ldout(cct
, 5) << __func__
<< " received hello:"
974 << " peer_type=" << (int)hello
.entity_type()
975 << " peer_addr_for_me=" << hello
.peer_addr() << dendl
;
978 socklen_t len
= sizeof(ss
);
979 getsockname(connection
->cs
.fd(), (sockaddr
*)&ss
, &len
);
980 ldout(cct
, 5) << __func__
<< " getsockname says I am " << (sockaddr
*)&ss
981 << " when talking to " << connection
->target_addr
<< dendl
;
983 if (connection
->get_peer_type() == -1) {
984 connection
->set_peer_type(hello
.entity_type());
986 ceph_assert(state
== HELLO_ACCEPTING
);
987 connection
->policy
= messenger
->get_policy(hello
.entity_type());
988 ldout(cct
, 10) << __func__
<< " accept of host_type "
989 << (int)hello
.entity_type()
990 << ", policy.lossy=" << connection
->policy
.lossy
991 << " policy.server=" << connection
->policy
.server
992 << " policy.standby=" << connection
->policy
.standby
993 << " policy.resetcheck=" << connection
->policy
.resetcheck
996 ceph_assert(state
== HELLO_CONNECTING
);
997 if (connection
->get_peer_type() != hello
.entity_type()) {
998 ldout(cct
, 1) << __func__
<< " connection peer type does not match what"
999 << " peer advertises " << connection
->get_peer_type()
1000 << " != " << (int)hello
.entity_type() << dendl
;
1002 connection
->dispatch_queue
->queue_reset(connection
);
1007 if (messenger
->get_myaddrs().empty() ||
1008 messenger
->get_myaddrs().front().is_blank_ip()) {
1010 if (cct
->_conf
->ms_learn_addr_from_peer
) {
1011 ldout(cct
, 1) << __func__
<< " peer " << connection
->target_addr
1012 << " says I am " << hello
.peer_addr() << " (socket says "
1013 << (sockaddr
*)&ss
<< ")" << dendl
;
1014 a
= hello
.peer_addr();
1016 ldout(cct
, 1) << __func__
<< " socket to " << connection
->target_addr
1017 << " says I am " << (sockaddr
*)&ss
1018 << " (peer says " << hello
.peer_addr() << ")" << dendl
;
1019 a
.set_sockaddr((sockaddr
*)&ss
);
1021 a
.set_type(entity_addr_t::TYPE_MSGR2
); // anything but NONE; learned_addr ignores this
1023 connection
->lock
.unlock();
1024 messenger
->learned_addr(a
);
1025 if (cct
->_conf
->ms_inject_internal_delays
&&
1026 cct
->_conf
->ms_inject_socket_failures
) {
1027 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
1028 ldout(cct
, 10) << __func__
<< " sleep for "
1029 << cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1031 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
1035 connection
->lock
.lock();
1036 if (state
!= HELLO_CONNECTING
) {
1037 ldout(cct
, 1) << __func__
1038 << " state changed while learned_addr, mark_down or "
1039 << " replacing must be happened just now" << dendl
;
1047 callback
= bannerExchangeCallback
;
1048 bannerExchangeCallback
= nullptr;
1049 ceph_assert(callback
);
1053 CtPtr
ProtocolV2::read_frame() {
1054 if (state
== CLOSED
) {
1058 ldout(cct
, 20) << __func__
<< dendl
;
1059 rx_preamble
.clear();
1060 rx_epilogue
.clear();
1061 rx_segments_data
.clear();
1063 return READ(rx_frame_asm
.get_preamble_onwire_len(),
1064 handle_read_frame_preamble_main
);
1067 CtPtr
ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t
&&buffer
, int r
) {
1068 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1071 ldout(cct
, 1) << __func__
<< " read frame preamble failed r=" << r
1072 << " (" << cpp_strerror(r
) << ")" << dendl
;
1076 rx_preamble
.push_back(std::move(buffer
));
1078 ldout(cct
, 30) << __func__
<< " preamble\n";
1079 rx_preamble
.hexdump(*_dout
);
1083 next_tag
= rx_frame_asm
.disassemble_preamble(rx_preamble
);
1084 } catch (FrameError
& e
) {
1085 ldout(cct
, 1) << __func__
<< " " << e
.what() << dendl
;
1087 } catch (ceph::crypto::onwire::MsgAuthError
&) {
1088 ldout(cct
, 1) << __func__
<< "bad auth tag" << dendl
;
1092 ldout(cct
, 25) << __func__
<< " disassembled preamble " << rx_frame_asm
1095 if (session_stream_handlers
.rx
) {
1096 ldout(cct
, 30) << __func__
<< " preamble after decrypt\n";
1097 rx_preamble
.hexdump(*_dout
);
1101 // does it need throttle?
1102 if (next_tag
== Tag::MESSAGE
) {
1103 if (state
!= READY
) {
1104 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1107 state
= THROTTLE_MESSAGE
;
1108 return CONTINUE(throttle_message
);
1110 return read_frame_segment();
1114 CtPtr
ProtocolV2::handle_read_frame_dispatch() {
1115 ldout(cct
, 10) << __func__
1116 << " tag=" << static_cast<uint32_t>(next_tag
) << dendl
;
1120 case Tag::AUTH_REQUEST
:
1121 case Tag::AUTH_BAD_METHOD
:
1122 case Tag::AUTH_REPLY_MORE
:
1123 case Tag::AUTH_REQUEST_MORE
:
1124 case Tag::AUTH_DONE
:
1125 case Tag::AUTH_SIGNATURE
:
1126 case Tag::CLIENT_IDENT
:
1127 case Tag::SERVER_IDENT
:
1128 case Tag::IDENT_MISSING_FEATURES
:
1129 case Tag::SESSION_RECONNECT
:
1130 case Tag::SESSION_RESET
:
1131 case Tag::SESSION_RETRY
:
1132 case Tag::SESSION_RETRY_GLOBAL
:
1133 case Tag::SESSION_RECONNECT_OK
:
1134 case Tag::KEEPALIVE2
:
1135 case Tag::KEEPALIVE2_ACK
:
1138 return handle_frame_payload();
1140 return handle_message();
1142 lderr(cct
) << __func__
1143 << " received unknown tag=" << static_cast<uint32_t>(next_tag
)
1152 CtPtr
ProtocolV2::read_frame_segment() {
1153 size_t seg_idx
= rx_segments_data
.size();
1154 ldout(cct
, 20) << __func__
<< " seg_idx=" << seg_idx
<< dendl
;
1155 rx_segments_data
.emplace_back();
1157 uint32_t onwire_len
= rx_frame_asm
.get_segment_onwire_len(seg_idx
);
1158 if (onwire_len
== 0) {
1159 return _handle_read_frame_segment();
1162 rx_buffer_t rx_buffer
;
1163 uint16_t align
= rx_frame_asm
.get_segment_align(seg_idx
);
1165 rx_buffer
= ceph::buffer::ptr_node::create(ceph::buffer::create_aligned(
1166 onwire_len
, align
));
1167 } catch (std::bad_alloc
&) {
1168 // Catching because of potential issues with satisfying alignment.
1169 ldout(cct
, 1) << __func__
<< " can't allocate aligned rx_buffer"
1170 << " len=" << onwire_len
1171 << " align=" << align
1176 return READ_RXBUF(std::move(rx_buffer
), handle_read_frame_segment
);
1179 CtPtr
ProtocolV2::handle_read_frame_segment(rx_buffer_t
&&rx_buffer
, int r
) {
1180 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1183 ldout(cct
, 1) << __func__
<< " read frame segment failed r=" << r
<< " ("
1184 << cpp_strerror(r
) << ")" << dendl
;
1188 rx_segments_data
.back().push_back(std::move(rx_buffer
));
1189 return _handle_read_frame_segment();
1192 CtPtr
ProtocolV2::_handle_read_frame_segment() {
1193 if (rx_segments_data
.size() == rx_frame_asm
.get_num_segments()) {
1194 // OK, all segments planned to read are read. Can go with epilogue.
1195 uint32_t epilogue_onwire_len
= rx_frame_asm
.get_epilogue_onwire_len();
1196 if (epilogue_onwire_len
== 0) {
1197 return _handle_read_frame_epilogue_main();
1199 return READ(epilogue_onwire_len
, handle_read_frame_epilogue_main
);
1201 // TODO: for makeshift only. This will be more generic and throttled
1202 return read_frame_segment();
1205 CtPtr
ProtocolV2::handle_frame_payload() {
1206 ceph_assert(!rx_segments_data
.empty());
1207 auto& payload
= rx_segments_data
.back();
1209 ldout(cct
, 30) << __func__
<< "\n";
1210 payload
.hexdump(*_dout
);
1215 return handle_hello(payload
);
1216 case Tag::AUTH_REQUEST
:
1217 return handle_auth_request(payload
);
1218 case Tag::AUTH_BAD_METHOD
:
1219 return handle_auth_bad_method(payload
);
1220 case Tag::AUTH_REPLY_MORE
:
1221 return handle_auth_reply_more(payload
);
1222 case Tag::AUTH_REQUEST_MORE
:
1223 return handle_auth_request_more(payload
);
1224 case Tag::AUTH_DONE
:
1225 return handle_auth_done(payload
);
1226 case Tag::AUTH_SIGNATURE
:
1227 return handle_auth_signature(payload
);
1228 case Tag::CLIENT_IDENT
:
1229 return handle_client_ident(payload
);
1230 case Tag::SERVER_IDENT
:
1231 return handle_server_ident(payload
);
1232 case Tag::IDENT_MISSING_FEATURES
:
1233 return handle_ident_missing_features(payload
);
1234 case Tag::SESSION_RECONNECT
:
1235 return handle_reconnect(payload
);
1236 case Tag::SESSION_RESET
:
1237 return handle_session_reset(payload
);
1238 case Tag::SESSION_RETRY
:
1239 return handle_session_retry(payload
);
1240 case Tag::SESSION_RETRY_GLOBAL
:
1241 return handle_session_retry_global(payload
);
1242 case Tag::SESSION_RECONNECT_OK
:
1243 return handle_reconnect_ok(payload
);
1244 case Tag::KEEPALIVE2
:
1245 return handle_keepalive2(payload
);
1246 case Tag::KEEPALIVE2_ACK
:
1247 return handle_keepalive2_ack(payload
);
1249 return handle_message_ack(payload
);
1251 return handle_wait(payload
);
1258 CtPtr
ProtocolV2::ready() {
1259 ldout(cct
, 25) << __func__
<< dendl
;
1261 reconnecting
= false;
1264 // make sure no pending tick timer
1265 if (connection
->last_tick_id
) {
1266 connection
->center
->delete_time_event(connection
->last_tick_id
);
1268 connection
->last_tick_id
= connection
->center
->create_time_event(
1269 connection
->inactive_timeout_us
, connection
->tick_handler
);
1272 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1274 if (!out_queue
.empty()) {
1275 connection
->center
->dispatch_event_external(connection
->write_handler
);
1279 connection
->maybe_start_delay_thread();
1282 ldout(cct
, 1) << __func__
<< " entity=" << peer_name
<< " client_cookie="
1283 << std::hex
<< client_cookie
<< " server_cookie="
1284 << server_cookie
<< std::dec
<< " in_seq=" << in_seq
1285 << " out_seq=" << out_seq
<< dendl
;
1289 return CONTINUE(read_frame
);
1292 CtPtr
ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t
&&buffer
, int r
)
1294 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1297 ldout(cct
, 1) << __func__
<< " read frame epilogue failed r=" << r
1298 << " (" << cpp_strerror(r
) << ")" << dendl
;
1302 rx_epilogue
.push_back(std::move(buffer
));
1303 return _handle_read_frame_epilogue_main();
1306 CtPtr
ProtocolV2::_handle_read_frame_epilogue_main() {
1309 rx_frame_asm
.disassemble_first_segment(rx_preamble
, rx_segments_data
[0]);
1310 aborted
= !rx_frame_asm
.disassemble_remaining_segments(
1311 rx_segments_data
.data(), rx_epilogue
);
1312 } catch (FrameError
& e
) {
1313 ldout(cct
, 1) << __func__
<< " " << e
.what() << dendl
;
1315 } catch (ceph::crypto::onwire::MsgAuthError
&) {
1316 ldout(cct
, 1) << __func__
<< "bad auth tag" << dendl
;
1320 // we do have a mechanism that allows transmitter to start sending message
1321 // and abort after putting entire data field on wire. This will be used by
1322 // the kernel client to avoid unnecessary buffering.
1326 return CONTINUE(read_frame
);
1328 return handle_read_frame_dispatch();
1331 CtPtr
ProtocolV2::handle_message() {
1332 ldout(cct
, 20) << __func__
<< dendl
;
1333 ceph_assert(state
== THROTTLE_DONE
);
1335 #if defined(WITH_EVENTTRACE)
1336 utime_t ltt_recv_stamp
= ceph_clock_now();
1338 recv_stamp
= ceph_clock_now();
1340 const size_t cur_msg_size
= get_current_msg_size();
1341 auto msg_frame
= MessageFrame::Decode(rx_segments_data
);
1343 // XXX: paranoid copy just to avoid oops
1344 ceph_msg_header2 current_header
= msg_frame
.header();
1346 ldout(cct
, 5) << __func__
1347 << " got " << msg_frame
.front_len()
1348 << " + " << msg_frame
.middle_len()
1349 << " + " << msg_frame
.data_len()
1351 << " envelope type=" << current_header
.type
1352 << " src " << peer_name
1353 << " off " << current_header
.data_off
1357 ceph_msg_header header
{current_header
.seq
,
1359 current_header
.type
,
1360 current_header
.priority
,
1361 current_header
.version
,
1362 init_le32(msg_frame
.front_len()),
1363 init_le32(msg_frame
.middle_len()),
1364 init_le32(msg_frame
.data_len()),
1365 current_header
.data_off
,
1367 current_header
.compat_version
,
1368 current_header
.reserved
,
1370 ceph_msg_footer footer
{init_le32(0), init_le32(0),
1371 init_le32(0), init_le64(0), current_header
.flags
};
1373 Message
*message
= decode_message(cct
, 0, header
, footer
,
1379 ldout(cct
, 1) << __func__
<< " decode message failed " << dendl
;
1382 state
= READ_MESSAGE_COMPLETE
;
1387 message
->set_byte_throttler(connection
->policy
.throttler_bytes
);
1388 message
->set_message_throttler(connection
->policy
.throttler_messages
);
1390 // store reservation size in message, so we don't get confused
1391 // by messages entering the dispatch queue through other paths.
1392 message
->set_dispatch_throttle_size(cur_msg_size
);
1394 message
->set_recv_stamp(recv_stamp
);
1395 message
->set_throttle_stamp(throttle_stamp
);
1396 message
->set_recv_complete_stamp(ceph_clock_now());
1398 // check received seq#. if it is old, drop the message.
1399 // note that incoming messages may skip ahead. this is convenient for the
1400 // client side queueing because messages can't be renumbered, but the (kernel)
1401 // client will occasionally pull a message out of the sent queue to send
1402 // elsewhere. in that case it doesn't matter if we "got" it or not.
1403 uint64_t cur_seq
= in_seq
;
1404 if (message
->get_seq() <= cur_seq
) {
1405 ldout(cct
, 0) << __func__
<< " got old message " << message
->get_seq()
1406 << " <= " << cur_seq
<< " " << message
<< " " << *message
1407 << ", discarding" << dendl
;
1409 if (connection
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
1410 cct
->_conf
->ms_die_on_old_message
) {
1411 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1415 if (message
->get_seq() > cur_seq
+ 1) {
1416 ldout(cct
, 0) << __func__
<< " missed message? skipped from seq "
1417 << cur_seq
<< " to " << message
->get_seq() << dendl
;
1418 if (cct
->_conf
->ms_die_on_skipped_message
) {
1419 ceph_assert(0 == "skipped incoming seq");
1423 #if defined(WITH_EVENTTRACE)
1424 if (message
->get_type() == CEPH_MSG_OSD_OP
||
1425 message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
1426 utime_t ltt_processed_stamp
= ceph_clock_now();
1427 double usecs_elapsed
=
1428 (ltt_processed_stamp
.to_nsec() - ltt_recv_stamp
.to_nsec()) / 1000;
1430 if (message
->get_type() == CEPH_MSG_OSD_OP
)
1431 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP",
1434 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY",
1439 // note last received message.
1440 in_seq
= message
->get_seq();
1441 ldout(cct
, 5) << __func__
<< " received message m=" << message
1442 << " seq=" << message
->get_seq()
1443 << " from=" << message
->get_source() << " type=" << header
.type
1444 << " " << *message
<< dendl
;
1446 bool need_dispatch_writer
= false;
1447 if (!connection
->policy
.lossy
) {
1449 need_dispatch_writer
= true;
1454 ceph::mono_time fast_dispatch_time
;
1456 if (connection
->is_blackhole()) {
1457 ldout(cct
, 10) << __func__
<< " blackhole " << *message
<< dendl
;
1462 connection
->logger
->inc(l_msgr_recv_messages
);
1463 connection
->logger
->inc(l_msgr_recv_bytes
,
1464 rx_frame_asm
.get_frame_onwire_len());
1466 messenger
->ms_fast_preprocess(message
);
1467 fast_dispatch_time
= ceph::mono_clock::now();
1468 connection
->logger
->tinc(l_msgr_running_recv_time
,
1469 fast_dispatch_time
- connection
->recv_start_time
);
1470 if (connection
->delay_state
) {
1471 double delay_period
= 0;
1472 if (rand() % 10000 < cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1474 cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1475 ldout(cct
, 1) << "queue_received will delay after "
1476 << (ceph_clock_now() + delay_period
) << " on " << message
1477 << " " << *message
<< dendl
;
1479 connection
->delay_state
->queue(delay_period
, message
);
1480 } else if (messenger
->ms_can_fast_dispatch(message
)) {
1481 connection
->lock
.unlock();
1482 connection
->dispatch_queue
->fast_dispatch(message
);
1483 connection
->recv_start_time
= ceph::mono_clock::now();
1484 connection
->logger
->tinc(l_msgr_running_fast_dispatch_time
,
1485 connection
->recv_start_time
- fast_dispatch_time
);
1486 connection
->lock
.lock();
1487 // we might have been reused by another connection
1488 // let's check if that is the case
1489 if (state
!= READY
) {
1490 // yes, that was the case, let's do nothing
1494 connection
->dispatch_queue
->enqueue(message
, message
->get_priority(),
1495 connection
->conn_id
);
1498 handle_message_ack(current_header
.ack_seq
);
1501 if (need_dispatch_writer
&& connection
->is_connected()) {
1502 connection
->center
->dispatch_event_external(connection
->write_handler
);
1505 return CONTINUE(read_frame
);
1509 CtPtr
ProtocolV2::throttle_message() {
1510 ldout(cct
, 20) << __func__
<< dendl
;
1512 if (connection
->policy
.throttler_messages
) {
1513 ldout(cct
, 10) << __func__
<< " wants " << 1
1514 << " message from policy throttler "
1515 << connection
->policy
.throttler_messages
->get_current()
1516 << "/" << connection
->policy
.throttler_messages
->get_max()
1518 if (!connection
->policy
.throttler_messages
->get_or_fail()) {
1519 ldout(cct
, 10) << __func__
<< " wants 1 message from policy throttle "
1520 << connection
->policy
.throttler_messages
->get_current()
1521 << "/" << connection
->policy
.throttler_messages
->get_max()
1522 << " failed, just wait." << dendl
;
1523 // following thread pool deal with th full message queue isn't a
1524 // short time, so we can wait a ms.
1525 if (connection
->register_time_events
.empty()) {
1526 connection
->register_time_events
.insert(
1527 connection
->center
->create_time_event(1000,
1528 connection
->wakeup_handler
));
1534 state
= THROTTLE_BYTES
;
1535 return CONTINUE(throttle_bytes
);
1538 CtPtr
ProtocolV2::throttle_bytes() {
1539 ldout(cct
, 20) << __func__
<< dendl
;
1541 const size_t cur_msg_size
= get_current_msg_size();
1543 if (connection
->policy
.throttler_bytes
) {
1544 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
1545 << " bytes from policy throttler "
1546 << connection
->policy
.throttler_bytes
->get_current() << "/"
1547 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
1548 if (!connection
->policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
1549 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
1550 << " bytes from policy throttler "
1551 << connection
->policy
.throttler_bytes
->get_current()
1552 << "/" << connection
->policy
.throttler_bytes
->get_max()
1553 << " failed, just wait." << dendl
;
1554 // following thread pool deal with th full message queue isn't a
1555 // short time, so we can wait a ms.
1556 if (connection
->register_time_events
.empty()) {
1557 connection
->register_time_events
.insert(
1558 connection
->center
->create_time_event(
1559 1000, connection
->wakeup_handler
));
1566 state
= THROTTLE_DISPATCH_QUEUE
;
1567 return CONTINUE(throttle_dispatch_queue
);
1570 CtPtr
ProtocolV2::throttle_dispatch_queue() {
1571 ldout(cct
, 20) << __func__
<< dendl
;
1573 const size_t cur_msg_size
= get_current_msg_size();
1575 if (!connection
->dispatch_queue
->dispatch_throttler
.get_or_fail(
1578 << __func__
<< " wants " << cur_msg_size
1579 << " bytes from dispatch throttle "
1580 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
1581 << connection
->dispatch_queue
->dispatch_throttler
.get_max()
1582 << " failed, just wait." << dendl
;
1583 // following thread pool deal with th full message queue isn't a
1584 // short time, so we can wait a ms.
1585 if (connection
->register_time_events
.empty()) {
1586 connection
->register_time_events
.insert(
1587 connection
->center
->create_time_event(1000,
1588 connection
->wakeup_handler
));
1594 throttle_stamp
= ceph_clock_now();
1595 state
= THROTTLE_DONE
;
1597 return read_frame_segment();
1600 CtPtr
ProtocolV2::handle_keepalive2(ceph::bufferlist
&payload
)
1602 ldout(cct
, 20) << __func__
1603 << " payload.length()=" << payload
.length() << dendl
;
1605 if (state
!= READY
) {
1606 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1610 auto keepalive_frame
= KeepAliveFrame::Decode(payload
);
1612 ldout(cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
1614 connection
->write_lock
.lock();
1615 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(keepalive_frame
.timestamp());
1616 if (!append_frame(keepalive_ack_frame
)) {
1617 connection
->write_lock
.unlock();
1620 connection
->write_lock
.unlock();
1622 ldout(cct
, 20) << __func__
<< " got KEEPALIVE2 "
1623 << keepalive_frame
.timestamp() << dendl
;
1624 connection
->set_last_keepalive(ceph_clock_now());
1626 if (is_connected()) {
1627 connection
->center
->dispatch_event_external(connection
->write_handler
);
1630 return CONTINUE(read_frame
);
1633 CtPtr
ProtocolV2::handle_keepalive2_ack(ceph::bufferlist
&payload
)
1635 ldout(cct
, 20) << __func__
1636 << " payload.length()=" << payload
.length() << dendl
;
1638 if (state
!= READY
) {
1639 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1643 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(payload
);
1644 connection
->set_last_keepalive_ack(keepalive_ack_frame
.timestamp());
1645 ldout(cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
1647 return CONTINUE(read_frame
);
1650 CtPtr
ProtocolV2::handle_message_ack(ceph::bufferlist
&payload
)
1652 ldout(cct
, 20) << __func__
1653 << " payload.length()=" << payload
.length() << dendl
;
1655 if (state
!= READY
) {
1656 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1660 auto ack
= AckFrame::Decode(payload
);
1661 handle_message_ack(ack
.seq());
1662 return CONTINUE(read_frame
);
1665 /* Client Protocol Methods */
1667 CtPtr
ProtocolV2::start_client_banner_exchange() {
1668 ldout(cct
, 20) << __func__
<< dendl
;
1672 state
= BANNER_CONNECTING
;
1674 global_seq
= messenger
->get_global_seq();
1676 return _banner_exchange(CONTINUATION(post_client_banner_exchange
));
1679 CtPtr
ProtocolV2::post_client_banner_exchange() {
1680 ldout(cct
, 20) << __func__
<< dendl
;
1682 state
= AUTH_CONNECTING
;
1684 return send_auth_request();
1687 CtPtr
ProtocolV2::send_auth_request(std::vector
<uint32_t> &allowed_methods
) {
1688 ceph_assert(messenger
->auth_client
);
1689 ldout(cct
, 20) << __func__
<< " peer_type " << (int)connection
->peer_type
1690 << " auth_client " << messenger
->auth_client
<< dendl
;
1692 ceph::bufferlist bl
;
1693 std::vector
<uint32_t> preferred_modes
;
1694 auto am
= auth_meta
;
1695 connection
->lock
.unlock();
1696 int r
= messenger
->auth_client
->get_auth_request(
1697 connection
, am
.get(),
1698 &am
->auth_method
, &preferred_modes
, &bl
);
1699 connection
->lock
.lock();
1700 if (state
!= AUTH_CONNECTING
) {
1701 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1705 ldout(cct
, 0) << __func__
<< " get_initial_auth_request returned " << r
1708 connection
->dispatch_queue
->queue_reset(connection
);
1714 auto frame
= AuthRequestFrame::Encode(auth_meta
->auth_method
, preferred_modes
,
1716 return WRITE(frame
, "auth request", read_frame
);
1719 CtPtr
ProtocolV2::handle_auth_bad_method(ceph::bufferlist
&payload
) {
1720 ldout(cct
, 20) << __func__
1721 << " payload.length()=" << payload
.length() << dendl
;
1723 if (state
!= AUTH_CONNECTING
) {
1724 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1728 auto bad_method
= AuthBadMethodFrame::Decode(payload
);
1729 ldout(cct
, 1) << __func__
<< " method=" << bad_method
.method()
1730 << " result " << cpp_strerror(bad_method
.result())
1731 << ", allowed methods=" << bad_method
.allowed_methods()
1732 << ", allowed modes=" << bad_method
.allowed_modes()
1734 ceph_assert(messenger
->auth_client
);
1735 auto am
= auth_meta
;
1736 connection
->lock
.unlock();
1737 int r
= messenger
->auth_client
->handle_auth_bad_method(
1740 bad_method
.method(), bad_method
.result(),
1741 bad_method
.allowed_methods(),
1742 bad_method
.allowed_modes());
1743 connection
->lock
.lock();
1744 if (state
!= AUTH_CONNECTING
|| r
< 0) {
1747 return send_auth_request(bad_method
.allowed_methods());
1750 CtPtr
ProtocolV2::handle_auth_reply_more(ceph::bufferlist
&payload
)
1752 ldout(cct
, 20) << __func__
1753 << " payload.length()=" << payload
.length() << dendl
;
1755 if (state
!= AUTH_CONNECTING
) {
1756 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1760 auto auth_more
= AuthReplyMoreFrame::Decode(payload
);
1761 ldout(cct
, 5) << __func__
1762 << " auth reply more len=" << auth_more
.auth_payload().length()
1764 ceph_assert(messenger
->auth_client
);
1765 ceph::bufferlist reply
;
1766 auto am
= auth_meta
;
1767 connection
->lock
.unlock();
1768 int r
= messenger
->auth_client
->handle_auth_reply_more(
1769 connection
, am
.get(), auth_more
.auth_payload(), &reply
);
1770 connection
->lock
.lock();
1771 if (state
!= AUTH_CONNECTING
) {
1772 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1776 lderr(cct
) << __func__
<< " auth_client handle_auth_reply_more returned "
1780 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
1781 return WRITE(more_reply
, "auth request more", read_frame
);
1784 CtPtr
ProtocolV2::handle_auth_done(ceph::bufferlist
&payload
)
1786 ldout(cct
, 20) << __func__
1787 << " payload.length()=" << payload
.length() << dendl
;
1789 if (state
!= AUTH_CONNECTING
) {
1790 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1794 auto auth_done
= AuthDoneFrame::Decode(payload
);
1796 ceph_assert(messenger
->auth_client
);
1797 auto am
= auth_meta
;
1798 connection
->lock
.unlock();
1799 int r
= messenger
->auth_client
->handle_auth_done(
1802 auth_done
.global_id(),
1803 auth_done
.con_mode(),
1804 auth_done
.auth_payload(),
1806 &am
->connection_secret
);
1807 connection
->lock
.lock();
1808 if (state
!= AUTH_CONNECTING
) {
1809 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1815 auth_meta
->con_mode
= auth_done
.con_mode();
1816 bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
1817 session_stream_handlers
= ceph::crypto::onwire::rxtx_t::create_handler_pair(
1818 cct
, *auth_meta
, /*new_nonce_format=*/is_rev1
, /*crossed=*/false);
1820 state
= AUTH_CONNECTING_SIGN
;
1822 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1823 auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.rxbuf
);
1824 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1825 pre_auth
.enabled
= false;
1826 pre_auth
.rxbuf
.clear();
1827 return WRITE(sig_frame
, "auth signature", read_frame
);
1830 CtPtr
ProtocolV2::finish_client_auth() {
1831 if (!server_cookie
) {
1832 ceph_assert(connect_seq
== 0);
1833 state
= SESSION_CONNECTING
;
1834 return send_client_ident();
1835 } else { // reconnecting to previous session
1836 state
= SESSION_RECONNECTING
;
1837 ceph_assert(connect_seq
> 0);
1838 return send_reconnect();
1842 CtPtr
ProtocolV2::send_client_ident() {
1843 ldout(cct
, 20) << __func__
<< dendl
;
1845 if (!connection
->policy
.lossy
&& !client_cookie
) {
1846 client_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1850 if (connection
->policy
.lossy
) {
1851 flags
|= CEPH_MSG_CONNECT_LOSSY
;
1854 auto client_ident
= ClientIdentFrame::Encode(
1855 messenger
->get_myaddrs(),
1856 connection
->target_addr
,
1857 messenger
->get_myname().num(),
1859 connection
->policy
.features_supported
,
1860 connection
->policy
.features_required
| msgr2_required
,
1864 ldout(cct
, 5) << __func__
<< " sending identification: "
1865 << "addrs=" << messenger
->get_myaddrs()
1866 << " target=" << connection
->target_addr
1867 << " gid=" << messenger
->get_myname().num()
1868 << " global_seq=" << global_seq
1869 << " features_supported=" << std::hex
1870 << connection
->policy
.features_supported
1871 << " features_required="
1872 << (connection
->policy
.features_required
| msgr2_required
)
1873 << " flags=" << flags
1874 << " cookie=" << client_cookie
<< std::dec
<< dendl
;
1878 return WRITE(client_ident
, "client ident", read_frame
);
1881 CtPtr
ProtocolV2::send_reconnect() {
1882 ldout(cct
, 20) << __func__
<< dendl
;
1884 auto reconnect
= ReconnectFrame::Encode(messenger
->get_myaddrs(),
1891 ldout(cct
, 5) << __func__
<< " reconnect to session: client_cookie="
1892 << std::hex
<< client_cookie
<< " server_cookie="
1893 << server_cookie
<< std::dec
1894 << " gs=" << global_seq
<< " cs=" << connect_seq
1895 << " ms=" << in_seq
<< dendl
;
1899 return WRITE(reconnect
, "reconnect", read_frame
);
1902 CtPtr
ProtocolV2::handle_ident_missing_features(ceph::bufferlist
&payload
)
1904 ldout(cct
, 20) << __func__
1905 << " payload.length()=" << payload
.length() << dendl
;
1907 if (state
!= SESSION_CONNECTING
) {
1908 lderr(cct
) << __func__
<< " not in session connect state!" << dendl
;
1912 auto ident_missing
=
1913 IdentMissingFeaturesFrame::Decode(payload
);
1914 lderr(cct
) << __func__
1915 << " client does not support all server features: " << std::hex
1916 << ident_missing
.features() << std::dec
<< dendl
;
1921 CtPtr
ProtocolV2::handle_session_reset(ceph::bufferlist
&payload
)
1923 ldout(cct
, 20) << __func__
1924 << " payload.length()=" << payload
.length() << dendl
;
1926 if (state
!= SESSION_RECONNECTING
) {
1927 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
1931 auto reset
= ResetFrame::Decode(payload
);
1933 ldout(cct
, 1) << __func__
<< " received session reset full=" << reset
.full()
1943 state
= SESSION_CONNECTING
;
1944 return send_client_ident();
1947 CtPtr
ProtocolV2::handle_session_retry(ceph::bufferlist
&payload
)
1949 ldout(cct
, 20) << __func__
1950 << " payload.length()=" << payload
.length() << dendl
;
1952 if (state
!= SESSION_RECONNECTING
) {
1953 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
1957 auto retry
= RetryFrame::Decode(payload
);
1958 connect_seq
= retry
.connect_seq() + 1;
1960 ldout(cct
, 1) << __func__
1961 << " received session retry connect_seq=" << retry
.connect_seq()
1962 << ", inc to cs=" << connect_seq
<< dendl
;
1964 return send_reconnect();
1967 CtPtr
ProtocolV2::handle_session_retry_global(ceph::bufferlist
&payload
)
1969 ldout(cct
, 20) << __func__
1970 << " payload.length()=" << payload
.length() << dendl
;
1972 if (state
!= SESSION_RECONNECTING
) {
1973 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
1977 auto retry
= RetryGlobalFrame::Decode(payload
);
1978 global_seq
= messenger
->get_global_seq(retry
.global_seq());
1980 ldout(cct
, 1) << __func__
<< " received session retry global global_seq="
1981 << retry
.global_seq() << ", choose new gs=" << global_seq
1984 return send_reconnect();
1987 CtPtr
ProtocolV2::handle_wait(ceph::bufferlist
&payload
) {
1988 ldout(cct
, 20) << __func__
1989 << " received WAIT (connection race)"
1990 << " payload.length()=" << payload
.length()
1993 if (state
!= SESSION_CONNECTING
&& state
!= SESSION_RECONNECTING
) {
1994 lderr(cct
) << __func__
<< " not in session (re)connect state!" << dendl
;
1999 WaitFrame::Decode(payload
);
2003 CtPtr
ProtocolV2::handle_reconnect_ok(ceph::bufferlist
&payload
)
2005 ldout(cct
, 20) << __func__
2006 << " payload.length()=" << payload
.length() << dendl
;
2008 if (state
!= SESSION_RECONNECTING
) {
2009 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
2013 auto reconnect_ok
= ReconnectOkFrame::Decode(payload
);
2014 ldout(cct
, 5) << __func__
2015 << " reconnect accepted: sms=" << reconnect_ok
.msg_seq()
2018 out_seq
= discard_requeued_up_to(out_seq
, reconnect_ok
.msg_seq());
2020 backoff
= utime_t();
2021 ldout(cct
, 10) << __func__
<< " reconnect success " << connect_seq
2022 << ", lossy = " << connection
->policy
.lossy
<< ", features "
2023 << connection
->get_features() << dendl
;
2025 if (connection
->delay_state
) {
2026 ceph_assert(connection
->delay_state
->ready());
2029 connection
->dispatch_queue
->queue_connect(connection
);
2030 messenger
->ms_deliver_handle_fast_connect(connection
);
2035 CtPtr
ProtocolV2::handle_server_ident(ceph::bufferlist
&payload
)
2037 ldout(cct
, 20) << __func__
2038 << " payload.length()=" << payload
.length() << dendl
;
2040 if (state
!= SESSION_CONNECTING
) {
2041 lderr(cct
) << __func__
<< " not in session connect state!" << dendl
;
2045 auto server_ident
= ServerIdentFrame::Decode(payload
);
2046 ldout(cct
, 5) << __func__
<< " received server identification:"
2047 << " addrs=" << server_ident
.addrs()
2048 << " gid=" << server_ident
.gid()
2049 << " global_seq=" << server_ident
.global_seq()
2050 << " features_supported=" << std::hex
2051 << server_ident
.supported_features()
2052 << " features_required=" << server_ident
.required_features()
2053 << " flags=" << server_ident
.flags()
2054 << " cookie=" << server_ident
.cookie() << std::dec
<< dendl
;
2056 // is this who we intended to talk to?
2057 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2058 // of mon_host or something.
2059 if (!server_ident
.addrs().contains(connection
->target_addr
)) {
2060 ldout(cct
,1) << __func__
<< " peer identifies as " << server_ident
.addrs()
2061 << ", does not include " << connection
->target_addr
<< dendl
;
2065 server_cookie
= server_ident
.cookie();
2067 connection
->set_peer_addrs(server_ident
.addrs());
2068 peer_name
= entity_name_t(connection
->get_peer_type(), server_ident
.gid());
2069 connection
->set_features(server_ident
.supported_features() &
2070 connection
->policy
.features_supported
);
2071 peer_global_seq
= server_ident
.global_seq();
2073 connection
->policy
.lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
2075 backoff
= utime_t();
2076 ldout(cct
, 10) << __func__
<< " connect success " << connect_seq
2077 << ", lossy = " << connection
->policy
.lossy
<< ", features "
2078 << connection
->get_features() << dendl
;
2080 if (connection
->delay_state
) {
2081 ceph_assert(connection
->delay_state
->ready());
2084 connection
->dispatch_queue
->queue_connect(connection
);
2085 messenger
->ms_deliver_handle_fast_connect(connection
);
2090 /* Server Protocol Methods */
2092 CtPtr
ProtocolV2::start_server_banner_exchange() {
2093 ldout(cct
, 20) << __func__
<< dendl
;
2097 state
= BANNER_ACCEPTING
;
2099 return _banner_exchange(CONTINUATION(post_server_banner_exchange
));
2102 CtPtr
ProtocolV2::post_server_banner_exchange() {
2103 ldout(cct
, 20) << __func__
<< dendl
;
2105 state
= AUTH_ACCEPTING
;
2107 return CONTINUE(read_frame
);
2110 CtPtr
ProtocolV2::handle_auth_request(ceph::bufferlist
&payload
) {
2111 ldout(cct
, 20) << __func__
<< " payload.length()=" << payload
.length()
2114 if (state
!= AUTH_ACCEPTING
) {
2115 lderr(cct
) << __func__
<< " not in auth accept state!" << dendl
;
2119 auto request
= AuthRequestFrame::Decode(payload
);
2120 ldout(cct
, 10) << __func__
<< " AuthRequest(method=" << request
.method()
2121 << ", preferred_modes=" << request
.preferred_modes()
2122 << ", payload_len=" << request
.auth_payload().length() << ")"
2124 auth_meta
->auth_method
= request
.method();
2125 auth_meta
->con_mode
= messenger
->auth_server
->pick_con_mode(
2126 connection
->get_peer_type(), auth_meta
->auth_method
,
2127 request
.preferred_modes());
2128 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
2129 return _auth_bad_method(-EOPNOTSUPP
);
2131 return _handle_auth_request(request
.auth_payload(), false);
2134 CtPtr
ProtocolV2::_auth_bad_method(int r
)
2137 std::vector
<uint32_t> allowed_methods
;
2138 std::vector
<uint32_t> allowed_modes
;
2139 messenger
->auth_server
->get_supported_auth_methods(
2140 connection
->get_peer_type(), &allowed_methods
, &allowed_modes
);
2141 ldout(cct
, 1) << __func__
<< " auth_method " << auth_meta
->auth_method
2142 << " r " << cpp_strerror(r
)
2143 << ", allowed_methods " << allowed_methods
2144 << ", allowed_modes " << allowed_modes
2146 auto bad_method
= AuthBadMethodFrame::Encode(auth_meta
->auth_method
, r
,
2147 allowed_methods
, allowed_modes
);
2148 return WRITE(bad_method
, "bad auth method", read_frame
);
2151 CtPtr
ProtocolV2::_handle_auth_request(ceph::bufferlist
& auth_payload
, bool more
)
2153 if (!messenger
->auth_server
) {
2156 ceph::bufferlist reply
;
2157 auto am
= auth_meta
;
2158 connection
->lock
.unlock();
2159 int r
= messenger
->auth_server
->handle_auth_request(
2160 connection
, am
.get(),
2161 more
, am
->auth_method
, auth_payload
,
2163 connection
->lock
.lock();
2164 if (state
!= AUTH_ACCEPTING
&& state
!= AUTH_ACCEPTING_MORE
) {
2165 ldout(cct
, 1) << __func__
2166 << " state changed while accept, it must be mark_down"
2168 ceph_assert(state
== CLOSED
);
2173 state
= AUTH_ACCEPTING_SIGN
;
2175 auto auth_done
= AuthDoneFrame::Encode(connection
->peer_global_id
,
2176 auth_meta
->con_mode
,
2178 return WRITE(auth_done
, "auth done", finish_auth
);
2179 } else if (r
== 0) {
2180 state
= AUTH_ACCEPTING_MORE
;
2182 auto more
= AuthReplyMoreFrame::Encode(reply
);
2183 return WRITE(more
, "auth reply more", read_frame
);
2184 } else if (r
== -EBUSY
) {
2185 // kick the client and maybe they'll come back later
2188 return _auth_bad_method(r
);
2192 CtPtr
ProtocolV2::finish_auth()
2194 ceph_assert(auth_meta
);
2195 // TODO: having a possibility to check whether we're server or client could
2196 // allow reusing finish_auth().
2197 bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
2198 session_stream_handlers
= ceph::crypto::onwire::rxtx_t::create_handler_pair(
2199 cct
, *auth_meta
, /*new_nonce_format=*/is_rev1
, /*crossed=*/true);
2201 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
2202 auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.rxbuf
);
2203 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
2204 pre_auth
.enabled
= false;
2205 pre_auth
.rxbuf
.clear();
2206 return WRITE(sig_frame
, "auth signature", read_frame
);
2209 CtPtr
ProtocolV2::handle_auth_request_more(ceph::bufferlist
&payload
)
2211 ldout(cct
, 20) << __func__
2212 << " payload.length()=" << payload
.length() << dendl
;
2214 if (state
!= AUTH_ACCEPTING_MORE
) {
2215 lderr(cct
) << __func__
<< " not in auth accept more state!" << dendl
;
2219 auto auth_more
= AuthRequestMoreFrame::Decode(payload
);
2220 return _handle_auth_request(auth_more
.auth_payload(), true);
2223 CtPtr
ProtocolV2::handle_auth_signature(ceph::bufferlist
&payload
)
2225 ldout(cct
, 20) << __func__
2226 << " payload.length()=" << payload
.length() << dendl
;
2228 if (state
!= AUTH_ACCEPTING_SIGN
&& state
!= AUTH_CONNECTING_SIGN
) {
2229 lderr(cct
) << __func__
2230 << " pre-auth verification signature seen in wrong state!"
2235 auto sig_frame
= AuthSignatureFrame::Decode(payload
);
2237 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
2238 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.txbuf
);
2239 if (sig_frame
.signature() != actual_tx_sig
) {
2240 ldout(cct
, 2) << __func__
<< " pre-auth signature mismatch"
2241 << " actual_tx_sig=" << actual_tx_sig
2242 << " sig_frame.signature()=" << sig_frame
.signature()
2246 ldout(cct
, 20) << __func__
<< " pre-auth signature success"
2247 << " sig_frame.signature()=" << sig_frame
.signature()
2249 pre_auth
.txbuf
.clear();
2252 if (state
== AUTH_ACCEPTING_SIGN
) {
2253 // server had sent AuthDone and client responded with correct pre-auth
2254 // signature. we can start accepting new sessions/reconnects.
2255 state
= SESSION_ACCEPTING
;
2256 return CONTINUE(read_frame
);
2257 } else if (state
== AUTH_CONNECTING_SIGN
) {
2258 // this happened at client side
2259 return finish_client_auth();
2261 ceph_abort("state corruption");
2265 CtPtr
ProtocolV2::handle_client_ident(ceph::bufferlist
&payload
)
2267 ldout(cct
, 20) << __func__
2268 << " payload.length()=" << payload
.length() << dendl
;
2270 if (state
!= SESSION_ACCEPTING
) {
2271 lderr(cct
) << __func__
<< " not in session accept state!" << dendl
;
2275 auto client_ident
= ClientIdentFrame::Decode(payload
);
2277 ldout(cct
, 5) << __func__
<< " received client identification:"
2278 << " addrs=" << client_ident
.addrs()
2279 << " target=" << client_ident
.target_addr()
2280 << " gid=" << client_ident
.gid()
2281 << " global_seq=" << client_ident
.global_seq()
2282 << " features_supported=" << std::hex
2283 << client_ident
.supported_features()
2284 << " features_required=" << client_ident
.required_features()
2285 << " flags=" << client_ident
.flags()
2286 << " cookie=" << client_ident
.cookie() << std::dec
<< dendl
;
2288 if (client_ident
.addrs().empty() ||
2289 client_ident
.addrs().front() == entity_addr_t()) {
2290 ldout(cct
,5) << __func__
<< " oops, client_ident.addrs() is empty" << dendl
;
2291 return _fault(); // a v2 peer should never do this
2293 if (!messenger
->get_myaddrs().contains(client_ident
.target_addr())) {
2294 ldout(cct
,5) << __func__
<< " peer is trying to reach "
2295 << client_ident
.target_addr()
2296 << " which is not us (" << messenger
->get_myaddrs() << ")"
2301 connection
->set_peer_addrs(client_ident
.addrs());
2302 connection
->target_addr
= connection
->_infer_target_addr(client_ident
.addrs());
2304 peer_name
= entity_name_t(connection
->get_peer_type(), client_ident
.gid());
2305 connection
->set_peer_id(client_ident
.gid());
2307 client_cookie
= client_ident
.cookie();
2309 uint64_t feat_missing
=
2310 (connection
->policy
.features_required
| msgr2_required
) &
2311 ~(uint64_t)client_ident
.supported_features();
2313 ldout(cct
, 1) << __func__
<< " peer missing required features " << std::hex
2314 << feat_missing
<< std::dec
<< dendl
;
2315 auto ident_missing_features
=
2316 IdentMissingFeaturesFrame::Encode(feat_missing
);
2318 return WRITE(ident_missing_features
, "ident missing features", read_frame
);
2321 connection_features
=
2322 client_ident
.supported_features() & connection
->policy
.features_supported
;
2324 peer_global_seq
= client_ident
.global_seq();
2326 if (connection
->policy
.server
&&
2327 connection
->policy
.lossy
&&
2328 !connection
->policy
.register_lossy_clients
) {
2329 // incoming lossy client, no need to register this connection
2331 // Looks good so far, let's check if there is already an existing connection
2333 connection
->lock
.unlock();
2334 AsyncConnectionRef existing
= messenger
->lookup_conn(
2335 *connection
->peer_addrs
);
2338 existing
->protocol
->proto_type
!= 2) {
2339 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2340 << existing
->protocol
.get() << " version is "
2341 << existing
->protocol
->proto_type
<< ", marking down"
2343 existing
->mark_down();
2347 connection
->inject_delay();
2349 connection
->lock
.lock();
2350 if (state
!= SESSION_ACCEPTING
) {
2351 ldout(cct
, 1) << __func__
2352 << " state changed while accept, it must be mark_down"
2354 ceph_assert(state
== CLOSED
);
2359 return handle_existing_connection(existing
);
2363 // if everything is OK reply with server identification
2364 return send_server_ident();
2367 CtPtr
ProtocolV2::handle_reconnect(ceph::bufferlist
&payload
)
2369 ldout(cct
, 20) << __func__
2370 << " payload.length()=" << payload
.length() << dendl
;
2372 if (state
!= SESSION_ACCEPTING
) {
2373 lderr(cct
) << __func__
<< " not in session accept state!" << dendl
;
2377 auto reconnect
= ReconnectFrame::Decode(payload
);
2379 ldout(cct
, 5) << __func__
2380 << " received reconnect:"
2381 << " client_cookie=" << std::hex
<< reconnect
.client_cookie()
2382 << " server_cookie=" << reconnect
.server_cookie() << std::dec
2383 << " gs=" << reconnect
.global_seq()
2384 << " cs=" << reconnect
.connect_seq()
2385 << " ms=" << reconnect
.msg_seq()
2388 // Should we check if one of the ident.addrs match connection->target_addr
2389 // as we do in ProtocolV1?
2390 connection
->set_peer_addrs(reconnect
.addrs());
2391 connection
->target_addr
= connection
->_infer_target_addr(reconnect
.addrs());
2392 peer_global_seq
= reconnect
.global_seq();
2394 connection
->lock
.unlock();
2395 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
2398 existing
->protocol
->proto_type
!= 2) {
2399 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2400 << existing
->protocol
.get() << " version is "
2401 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
2402 existing
->mark_down();
2406 connection
->inject_delay();
2408 connection
->lock
.lock();
2409 if (state
!= SESSION_ACCEPTING
) {
2410 ldout(cct
, 1) << __func__
2411 << " state changed while accept, it must be mark_down"
2413 ceph_assert(state
== CLOSED
);
2418 // there is no existing connection therefore cannot reconnect to previous
2420 ldout(cct
, 0) << __func__
2421 << " no existing connection exists, reseting client" << dendl
;
2422 auto reset
= ResetFrame::Encode(true);
2423 return WRITE(reset
, "session reset", read_frame
);
2426 std::lock_guard
<std::mutex
> l(existing
->lock
);
2428 ProtocolV2
*exproto
= dynamic_cast<ProtocolV2
*>(existing
->protocol
.get());
2430 ldout(cct
, 1) << __func__
<< " existing=" << existing
<< dendl
;
2434 if (exproto
->state
== CLOSED
) {
2435 ldout(cct
, 5) << __func__
<< " existing " << existing
2436 << " already closed. Reseting client" << dendl
;
2437 auto reset
= ResetFrame::Encode(true);
2438 return WRITE(reset
, "session reset", read_frame
);
2441 if (exproto
->replacing
) {
2442 ldout(cct
, 1) << __func__
2443 << " existing racing replace happened while replacing."
2444 << " existing=" << existing
<< dendl
;
2445 auto retry
= RetryGlobalFrame::Encode(exproto
->peer_global_seq
);
2446 return WRITE(retry
, "session retry", read_frame
);
2449 if (exproto
->client_cookie
!= reconnect
.client_cookie()) {
2450 ldout(cct
, 1) << __func__
<< " existing=" << existing
2451 << " client cookie mismatch, I must have reseted:"
2452 << " cc=" << std::hex
<< exproto
->client_cookie
2453 << " rcc=" << reconnect
.client_cookie()
2454 << ", reseting client." << std::dec
2456 auto reset
= ResetFrame::Encode(connection
->policy
.resetcheck
);
2457 return WRITE(reset
, "session reset", read_frame
);
2458 } else if (exproto
->server_cookie
== 0) {
2459 // this happens when:
2460 // - a connects to b
2461 // - a sends client_ident
2462 // - b gets client_ident, sends server_ident and sets cookie X
2463 // - connection fault
2464 // - b reconnects to a with cookie X, connect_seq=1
2465 // - a has cookie==0
2466 ldout(cct
, 1) << __func__
<< " I was a client and didn't received the"
2467 << " server_ident. Asking peer to resume session"
2468 << " establishment" << dendl
;
2469 auto reset
= ResetFrame::Encode(false);
2470 return WRITE(reset
, "session reset", read_frame
);
2473 if (exproto
->peer_global_seq
> reconnect
.global_seq()) {
2474 ldout(cct
, 5) << __func__
2475 << " stale global_seq: sgs=" << exproto
->peer_global_seq
2476 << " cgs=" << reconnect
.global_seq()
2477 << ", ask client to retry global" << dendl
;
2478 auto retry
= RetryGlobalFrame::Encode(exproto
->peer_global_seq
);
2482 return WRITE(retry
, "session retry", read_frame
);
2485 if (exproto
->connect_seq
> reconnect
.connect_seq()) {
2486 ldout(cct
, 5) << __func__
2487 << " stale connect_seq scs=" << exproto
->connect_seq
2488 << " ccs=" << reconnect
.connect_seq()
2489 << " , ask client to retry" << dendl
;
2490 auto retry
= RetryFrame::Encode(exproto
->connect_seq
);
2491 return WRITE(retry
, "session retry", read_frame
);
2494 if (exproto
->connect_seq
== reconnect
.connect_seq()) {
2495 // reconnect race: both peers are sending reconnect messages
2496 if (existing
->peer_addrs
->msgr2_addr() >
2497 messenger
->get_myaddrs().msgr2_addr() &&
2498 !existing
->policy
.server
) {
2499 // the existing connection wins
2502 << " reconnect race detected, this connection loses to existing="
2503 << existing
<< dendl
;
2505 auto wait
= WaitFrame::Encode();
2506 return WRITE(wait
, "wait", read_frame
);
2508 // this connection wins
2509 ldout(cct
, 1) << __func__
2510 << " reconnect race detected, replacing existing="
2511 << existing
<< " socket by this connection's socket"
2516 ldout(cct
, 1) << __func__
<< " reconnect to existing=" << existing
<< dendl
;
2518 reconnecting
= true;
2520 // everything looks good
2521 exproto
->connect_seq
= reconnect
.connect_seq();
2522 exproto
->message_seq
= reconnect
.msg_seq();
2524 return reuse_connection(existing
, exproto
);
2527 CtPtr
ProtocolV2::handle_existing_connection(const AsyncConnectionRef
& existing
) {
2528 ldout(cct
, 20) << __func__
<< " existing=" << existing
<< dendl
;
2530 std::lock_guard
<std::mutex
> l(existing
->lock
);
2532 ProtocolV2
*exproto
= dynamic_cast<ProtocolV2
*>(existing
->protocol
.get());
2534 ldout(cct
, 1) << __func__
<< " existing=" << existing
<< dendl
;
2538 if (exproto
->state
== CLOSED
) {
2539 ldout(cct
, 1) << __func__
<< " existing " << existing
<< " already closed."
2541 return send_server_ident();
2544 if (exproto
->replacing
) {
2545 ldout(cct
, 1) << __func__
2546 << " existing racing replace happened while replacing."
2547 << " existing=" << existing
<< dendl
;
2548 auto wait
= WaitFrame::Encode();
2549 return WRITE(wait
, "wait", read_frame
);
2552 if (exproto
->peer_global_seq
> peer_global_seq
) {
2553 ldout(cct
, 1) << __func__
<< " this is a stale connection, peer_global_seq="
2555 << " existing->peer_global_seq=" << exproto
->peer_global_seq
2556 << ", stopping this connection." << dendl
;
2558 connection
->dispatch_queue
->queue_reset(connection
);
2562 if (existing
->policy
.lossy
) {
2563 // existing connection can be thrown out in favor of this one
2565 << __func__
<< " existing=" << existing
2566 << " is a lossy channel. Stopping existing in favor of this connection"
2568 existing
->protocol
->stop();
2569 existing
->dispatch_queue
->queue_reset(existing
.get());
2570 return send_server_ident();
2573 if (exproto
->server_cookie
&& exproto
->client_cookie
&&
2574 exproto
->client_cookie
!= client_cookie
) {
2575 // Found previous session
2576 // peer has reseted and we're going to reuse the existing connection
2577 // by replacing the communication socket
2578 ldout(cct
, 1) << __func__
<< " found previous session existing=" << existing
2579 << ", peer must have reseted." << dendl
;
2580 if (connection
->policy
.resetcheck
) {
2581 exproto
->reset_session();
2583 return reuse_connection(existing
, exproto
);
2586 if (exproto
->client_cookie
== client_cookie
) {
2587 // session establishment interrupted between client_ident and server_ident,
2589 ldout(cct
, 1) << __func__
<< " found previous session existing=" << existing
2590 << ", continuing session establishment." << dendl
;
2591 return reuse_connection(existing
, exproto
);
2594 if (exproto
->state
== READY
|| exproto
->state
== STANDBY
) {
2595 ldout(cct
, 1) << __func__
<< " existing=" << existing
2596 << " is READY/STANDBY, lets reuse it" << dendl
;
2597 return reuse_connection(existing
, exproto
);
2600 // Looks like a connection race: server and client are both connecting to
2601 // each other at the same time.
2602 if (connection
->peer_addrs
->msgr2_addr() <
2603 messenger
->get_myaddrs().msgr2_addr() ||
2604 existing
->policy
.server
) {
2605 // this connection wins
2606 ldout(cct
, 1) << __func__
2607 << " connection race detected, replacing existing="
2608 << existing
<< " socket by this connection's socket" << dendl
;
2609 return reuse_connection(existing
, exproto
);
2611 // the existing connection wins
2614 << " connection race detected, this connection loses to existing="
2615 << existing
<< dendl
;
2616 ceph_assert(connection
->peer_addrs
->msgr2_addr() >
2617 messenger
->get_myaddrs().msgr2_addr());
2619 // make sure we follow through with opening the existing
2620 // connection (if it isn't yet open) since we know the peer
2621 // has something to send to us.
2622 existing
->send_keepalive();
2623 auto wait
= WaitFrame::Encode();
2624 return WRITE(wait
, "wait", read_frame
);
2628 CtPtr
ProtocolV2::reuse_connection(const AsyncConnectionRef
& existing
,
2629 ProtocolV2
*exproto
) {
2630 ldout(cct
, 20) << __func__
<< " existing=" << existing
2631 << " reconnect=" << reconnecting
<< dendl
;
2633 connection
->inject_delay();
2635 std::lock_guard
<std::mutex
> l(existing
->write_lock
);
2637 connection
->center
->delete_file_event(connection
->cs
.fd(),
2638 EVENT_READABLE
| EVENT_WRITABLE
);
2640 if (existing
->delay_state
) {
2641 existing
->delay_state
->flush();
2642 ceph_assert(!connection
->delay_state
);
2644 exproto
->reset_recv_state();
2645 exproto
->pre_auth
.enabled
= false;
2647 if (!reconnecting
) {
2648 exproto
->peer_supported_features
= peer_supported_features
;
2649 exproto
->tx_frame_asm
.set_is_rev1(tx_frame_asm
.get_is_rev1());
2650 exproto
->rx_frame_asm
.set_is_rev1(rx_frame_asm
.get_is_rev1());
2652 exproto
->client_cookie
= client_cookie
;
2653 exproto
->peer_name
= peer_name
;
2654 exproto
->connection_features
= connection_features
;
2655 existing
->set_features(connection_features
);
2657 exproto
->peer_global_seq
= peer_global_seq
;
2659 ceph_assert(connection
->center
->in_thread());
2660 auto temp_cs
= std::move(connection
->cs
);
2661 EventCenter
*new_center
= connection
->center
;
2662 Worker
*new_worker
= connection
->worker
;
2663 // we can steal the session_stream_handlers under the assumption
2664 // this happens in the event center's thread as there should be
2665 // no user outside its boundaries (simlarly to e.g. outgoing_bl).
2666 auto temp_stream_handlers
= std::move(session_stream_handlers
);
2667 exproto
->auth_meta
= auth_meta
;
2669 ldout(messenger
->cct
, 5) << __func__
<< " stop myself to swap existing"
2672 // avoid _stop shutdown replacing socket
2673 // queue a reset on the new connection, which we're dumping for the old
2676 connection
->dispatch_queue
->queue_reset(connection
);
2678 exproto
->can_write
= false;
2679 exproto
->write_in_progress
= false;
2680 exproto
->reconnecting
= reconnecting
;
2681 exproto
->replacing
= true;
2682 existing
->state_offset
= 0;
2683 // avoid previous thread modify event
2684 exproto
->state
= NONE
;
2685 existing
->state
= AsyncConnection::STATE_NONE
;
2686 // Discard existing prefetch buffer in `recv_buf`
2687 existing
->recv_start
= existing
->recv_end
= 0;
2688 // there shouldn't exist any buffer
2689 ceph_assert(connection
->recv_start
== connection
->recv_end
);
2691 auto deactivate_existing
= std::bind(
2696 temp_stream_handlers
=std::move(temp_stream_handlers
)
2697 ](ConnectedSocket
&cs
) mutable {
2698 // we need to delete time event in original thread
2700 std::lock_guard
<std::mutex
> l(existing
->lock
);
2701 existing
->write_lock
.lock();
2702 exproto
->requeue_sent();
2703 // XXX: do we really need the locking for `outgoing_bl`? There is
2704 // a comment just above its definition saying "lockfree, only used
2705 // in own thread". I'm following lockfull schema just in the case.
2706 // From performance point of view it should be fine – this happens
2707 // far away from hot paths.
2708 existing
->outgoing_bl
.clear();
2709 existing
->open_write
= false;
2710 exproto
->session_stream_handlers
= std::move(temp_stream_handlers
);
2711 existing
->write_lock
.unlock();
2712 if (exproto
->state
== NONE
) {
2713 existing
->shutdown_socket();
2714 existing
->cs
= std::move(cs
);
2715 existing
->worker
->references
--;
2716 new_worker
->references
++;
2717 existing
->logger
= new_worker
->get_perf_counter();
2718 existing
->worker
= new_worker
;
2719 existing
->center
= new_center
;
2720 if (existing
->delay_state
)
2721 existing
->delay_state
->set_center(new_center
);
2722 } else if (exproto
->state
== CLOSED
) {
2723 auto back_to_close
= std::bind(
2724 [](ConnectedSocket
&cs
) mutable { cs
.close(); }, std::move(cs
));
2725 new_center
->submit_to(new_center
->get_id(),
2726 std::move(back_to_close
), true);
2733 // Before changing existing->center, it may already exists some
2734 // events in existing->center's queue. Then if we mark down
2735 // `existing`, it will execute in another thread and clean up
2736 // connection. Previous event will result in segment fault
2737 auto transfer_existing
= [existing
, exproto
]() mutable {
2738 std::lock_guard
<std::mutex
> l(existing
->lock
);
2739 if (exproto
->state
== CLOSED
) return;
2740 ceph_assert(exproto
->state
== NONE
);
2742 exproto
->state
= SESSION_ACCEPTING
;
2743 // we have called shutdown_socket above
2744 ceph_assert(existing
->last_tick_id
== 0);
2745 // restart timer since we are going to re-build connection
2746 existing
->last_connect_started
= ceph::coarse_mono_clock::now();
2747 existing
->last_tick_id
= existing
->center
->create_time_event(
2748 existing
->connect_timeout_us
, existing
->tick_handler
);
2749 existing
->state
= AsyncConnection::STATE_CONNECTION_ESTABLISHED
;
2750 existing
->center
->create_file_event(existing
->cs
.fd(), EVENT_READABLE
,
2751 existing
->read_handler
);
2752 if (!exproto
->reconnecting
) {
2753 exproto
->run_continuation(exproto
->send_server_ident());
2755 exproto
->run_continuation(exproto
->send_reconnect_ok());
2758 if (existing
->center
->in_thread())
2759 transfer_existing();
2761 existing
->center
->submit_to(existing
->center
->get_id(),
2762 std::move(transfer_existing
), true);
2764 std::move(temp_cs
));
2766 existing
->center
->submit_to(existing
->center
->get_id(),
2767 std::move(deactivate_existing
), true);
2771 CtPtr
ProtocolV2::send_server_ident() {
2772 ldout(cct
, 20) << __func__
<< dendl
;
2774 // this is required for the case when this connection is being replaced
2775 out_seq
= discard_requeued_up_to(out_seq
, 0);
2778 if (!connection
->policy
.lossy
) {
2779 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
2783 if (connection
->policy
.lossy
) {
2784 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
2787 uint64_t gs
= messenger
->get_global_seq();
2788 auto server_ident
= ServerIdentFrame::Encode(
2789 messenger
->get_myaddrs(),
2790 messenger
->get_myname().num(),
2792 connection
->policy
.features_supported
,
2793 connection
->policy
.features_required
| msgr2_required
,
2797 ldout(cct
, 5) << __func__
<< " sending identification:"
2798 << " addrs=" << messenger
->get_myaddrs()
2799 << " gid=" << messenger
->get_myname().num()
2800 << " global_seq=" << gs
<< " features_supported=" << std::hex
2801 << connection
->policy
.features_supported
2802 << " features_required="
2803 << (connection
->policy
.features_required
| msgr2_required
)
2804 << " flags=" << flags
2805 << " cookie=" << server_cookie
<< std::dec
<< dendl
;
2807 connection
->lock
.unlock();
2808 // Because "replacing" will prevent other connections preempt this addr,
2809 // it's safe that here we don't acquire Connection's lock
2810 ssize_t r
= messenger
->accept_conn(connection
);
2812 connection
->inject_delay();
2814 connection
->lock
.lock();
2817 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2818 << connection
->peer_addrs
->msgr2_addr()
2819 << " just fail later one(this)" << dendl
;
2820 connection
->inject_delay();
2823 if (state
!= SESSION_ACCEPTING
) {
2824 ldout(cct
, 1) << __func__
2825 << " state changed while accept_conn, it must be mark_down"
2827 ceph_assert(state
== CLOSED
|| state
== NONE
);
2828 messenger
->unregister_conn(connection
);
2829 connection
->inject_delay();
2833 connection
->set_features(connection_features
);
2836 connection
->dispatch_queue
->queue_accept(connection
);
2837 messenger
->ms_deliver_handle_fast_accept(connection
);
2841 return WRITE(server_ident
, "server ident", server_ready
);
2844 CtPtr
ProtocolV2::server_ready() {
2845 ldout(cct
, 20) << __func__
<< dendl
;
2847 if (connection
->delay_state
) {
2848 ceph_assert(connection
->delay_state
->ready());
2854 CtPtr
ProtocolV2::send_reconnect_ok() {
2855 ldout(cct
, 20) << __func__
<< dendl
;
2857 out_seq
= discard_requeued_up_to(out_seq
, message_seq
);
2859 uint64_t ms
= in_seq
;
2860 auto reconnect_ok
= ReconnectOkFrame::Encode(ms
);
2862 ldout(cct
, 5) << __func__
<< " sending reconnect_ok: msg_seq=" << ms
<< dendl
;
2864 connection
->lock
.unlock();
2865 // Because "replacing" will prevent other connections preempt this addr,
2866 // it's safe that here we don't acquire Connection's lock
2867 ssize_t r
= messenger
->accept_conn(connection
);
2869 connection
->inject_delay();
2871 connection
->lock
.lock();
2874 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2875 << connection
->peer_addrs
->msgr2_addr()
2876 << " just fail later one(this)" << dendl
;
2877 connection
->inject_delay();
2880 if (state
!= SESSION_ACCEPTING
) {
2881 ldout(cct
, 1) << __func__
2882 << " state changed while accept_conn, it must be mark_down"
2884 ceph_assert(state
== CLOSED
|| state
== NONE
);
2885 messenger
->unregister_conn(connection
);
2886 connection
->inject_delay();
2891 connection
->dispatch_queue
->queue_accept(connection
);
2892 messenger
->ms_deliver_handle_fast_accept(connection
);
2896 return WRITE(reconnect_ok
, "reconnect ok", server_ready
);