1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "ProtocolV2.h"
7 #include "AsyncMessenger.h"
9 #include "common/EventTrace.h"
10 #include "common/ceph_crypto.h"
11 #include "common/errno.h"
12 #include "include/random.h"
13 #include "auth/AuthClient.h"
14 #include "auth/AuthServer.h"
16 #define dout_subsys ceph_subsys_ms
18 #define dout_prefix _conn_prefix(_dout)
19 ostream
&ProtocolV2::_conn_prefix(std::ostream
*_dout
) {
20 return *_dout
<< "--2- " << messenger
->get_myaddrs() << " >> "
21 << *connection
->peer_addrs
<< " conn(" << connection
<< " "
23 << " " << ceph_con_mode_name(auth_meta
->con_mode
)
24 << " :" << connection
->port
25 << " s=" << get_state_name(state
) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq
<< " l=" << connection
->policy
.lossy
27 << " rx=" << session_stream_handlers
.rx
.get()
28 << " tx=" << session_stream_handlers
.tx
.get()
32 using namespace ceph::msgr::v2
;
34 using CtPtr
= Ct
<ProtocolV2
> *;
35 using CtRef
= Ct
<ProtocolV2
> &;
37 void ProtocolV2::run_continuation(CtPtr pcontinuation
) {
39 run_continuation(*pcontinuation
);
43 void ProtocolV2::run_continuation(CtRef continuation
) {
45 CONTINUATION_RUN(continuation
)
46 } catch (const buffer::error
&e
) {
47 lderr(cct
) << __func__
<< " failed decoding of frame header: " << e
50 } catch (const ceph::crypto::onwire::MsgAuthError
&e
) {
51 lderr(cct
) << __func__
<< " " << e
.what() << dendl
;
53 } catch (const DecryptionError
&) {
54 lderr(cct
) << __func__
<< " failed to decrypt frame payload" << dendl
;
58 #define WRITE(B, D, C) write(D, CONTINUATION(C), B)
60 #define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
62 #define READ_RXBUF(B, C) read(CONTINUATION(C), B)
64 #ifdef UNIT_TESTS_BUILT
66 #define INTERCEPT(S) { \
67 if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
71 } else if (a == Interceptor::ACTION::STOP) { \
73 connection->dispatch_queue->queue_reset(connection); \
81 ProtocolV2::ProtocolV2(AsyncConnection
*connection
)
82 : Protocol(2, connection
),
84 peer_required_features(0),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag
>(0)),
99 ProtocolV2::~ProtocolV2() {
102 void ProtocolV2::connect() {
103 ldout(cct
, 1) << __func__
<< dendl
;
104 state
= START_CONNECT
;
105 pre_auth
.enabled
= true;
108 void ProtocolV2::accept() {
109 ldout(cct
, 1) << __func__
<< dendl
;
110 state
= START_ACCEPT
;
113 bool ProtocolV2::is_connected() { return can_write
; }
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
119 void ProtocolV2::discard_out_queue() {
120 ldout(cct
, 10) << __func__
<< " started" << dendl
;
122 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
123 ldout(cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
127 for (auto& [ prio
, entries
] : out_queue
) {
128 static_cast<void>(prio
);
129 for (auto& entry
: entries
) {
130 ldout(cct
, 20) << __func__
<< " discard " << *entry
.m
<< dendl
;
135 write_in_progress
= false;
138 void ProtocolV2::reset_session() {
139 ldout(cct
, 1) << __func__
<< dendl
;
141 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
142 if (connection
->delay_state
) {
143 connection
->delay_state
->discard();
146 connection
->dispatch_queue
->discard_queue(connection
->conn_id
);
148 connection
->outgoing_bl
.clear();
150 connection
->dispatch_queue
->queue_remote_reset(connection
);
163 void ProtocolV2::stop() {
164 ldout(cct
, 1) << __func__
<< dendl
;
165 if (state
== CLOSED
) {
169 if (connection
->delay_state
) connection
->delay_state
->flush();
171 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
182 void ProtocolV2::fault() { _fault(); }
184 void ProtocolV2::requeue_sent() {
185 write_in_progress
= false;
190 auto& rq
= out_queue
[CEPH_MSG_PRIO_HIGHEST
];
191 out_seq
-= sent
.size();
192 while (!sent
.empty()) {
193 Message
*m
= sent
.back();
195 ldout(cct
, 5) << __func__
<< " requeueing message m=" << m
196 << " seq=" << m
->get_seq() << " type=" << m
->get_type() << " "
199 rq
.emplace_front(out_queue_entry_t
{false, m
});
203 uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
) {
204 ldout(cct
, 10) << __func__
<< " " << seq
<< dendl
;
205 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
206 if (out_queue
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
209 auto& rq
= out_queue
[CEPH_MSG_PRIO_HIGHEST
];
210 uint64_t count
= out_seq
;
211 while (!rq
.empty()) {
212 Message
* const m
= rq
.front().m
;
213 if (m
->get_seq() == 0 || m
->get_seq() > seq
) break;
214 ldout(cct
, 5) << __func__
<< " discarding message m=" << m
215 << " seq=" << m
->get_seq() << " ack_seq=" << seq
<< " "
221 if (rq
.empty()) out_queue
.erase(CEPH_MSG_PRIO_HIGHEST
);
225 void ProtocolV2::reset_security() {
226 ldout(cct
, 5) << __func__
<< dendl
;
228 auth_meta
.reset(new AuthConnectionMeta
);
229 session_stream_handlers
.rx
.reset(nullptr);
230 session_stream_handlers
.tx
.reset(nullptr);
231 pre_auth
.rxbuf
.clear();
232 pre_auth
.txbuf
.clear();
235 // it's expected the `write_lock` is held while calling this method.
236 void ProtocolV2::reset_recv_state() {
237 ldout(cct
, 5) << __func__
<< dendl
;
239 if (!connection
->center
->in_thread()) {
240 // execute in the same thread that uses the rx/tx handlers. We need
241 // to do the warp because holding `write_lock` is not enough as
242 // `write_event()` unlocks it just before calling `write_message()`.
243 // `submit_to()` here is NOT blocking.
244 connection
->center
->submit_to(connection
->center
->get_id(), [this] {
245 ldout(cct
, 5) << "reset_recv_state (warped) reseting crypto handlers"
247 // Possibly unnecessary. See the comment in `deactivate_existing`.
248 std::lock_guard
<std::mutex
> l(connection
->lock
);
249 std::lock_guard
<std::mutex
> wl(connection
->write_lock
);
251 }, /* always_async = */true);
256 // clean read and write callbacks
257 connection
->pendingReadLen
.reset();
258 connection
->writeCallback
.reset();
260 next_tag
= static_cast<Tag
>(0);
265 size_t ProtocolV2::get_current_msg_size() const {
266 ceph_assert(!rx_segments_desc
.empty());
268 // we don't include SegmentIndex::Msg::HEADER.
269 for (__u8 idx
= 1; idx
< rx_segments_desc
.size(); idx
++) {
270 sum
+= rx_segments_desc
[idx
].length
;
275 void ProtocolV2::reset_throttle() {
276 if (state
> THROTTLE_MESSAGE
&& state
<= THROTTLE_DONE
&&
277 connection
->policy
.throttler_messages
) {
278 ldout(cct
, 10) << __func__
<< " releasing " << 1
279 << " message to policy throttler "
280 << connection
->policy
.throttler_messages
->get_current()
281 << "/" << connection
->policy
.throttler_messages
->get_max()
283 connection
->policy
.throttler_messages
->put();
285 if (state
> THROTTLE_BYTES
&& state
<= THROTTLE_DONE
) {
286 if (connection
->policy
.throttler_bytes
) {
287 const size_t cur_msg_size
= get_current_msg_size();
288 ldout(cct
, 10) << __func__
<< " releasing " << cur_msg_size
289 << " bytes to policy throttler "
290 << connection
->policy
.throttler_bytes
->get_current() << "/"
291 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
292 connection
->policy
.throttler_bytes
->put(cur_msg_size
);
295 if (state
> THROTTLE_DISPATCH_QUEUE
&& state
<= THROTTLE_DONE
) {
296 const size_t cur_msg_size
= get_current_msg_size();
298 << __func__
<< " releasing " << cur_msg_size
299 << " bytes to dispatch_queue throttler "
300 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
301 << connection
->dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
302 connection
->dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
306 CtPtr
ProtocolV2::_fault() {
307 ldout(cct
, 10) << __func__
<< dendl
;
309 if (state
== CLOSED
|| state
== NONE
) {
310 ldout(cct
, 10) << __func__
<< " connection is already closed" << dendl
;
314 if (connection
->policy
.lossy
&&
315 !(state
>= START_CONNECT
&& state
<= SESSION_RECONNECTING
)) {
316 ldout(cct
, 2) << __func__
<< " on lossy channel, failing" << dendl
;
318 connection
->dispatch_queue
->queue_reset(connection
);
322 connection
->write_lock
.lock();
325 // requeue sent items
328 if (out_queue
.empty() && state
>= START_ACCEPT
&&
329 state
<= SESSION_ACCEPTING
&& !replacing
) {
330 ldout(cct
, 2) << __func__
<< " with nothing to send and in the half "
331 << " accept state just closed" << dendl
;
332 connection
->write_lock
.unlock();
334 connection
->dispatch_queue
->queue_reset(connection
);
342 reconnecting
= false;
344 if (connection
->policy
.standby
&& out_queue
.empty() && !keepalive
&&
346 ldout(cct
, 1) << __func__
<< " with nothing to send, going to standby"
349 connection
->write_lock
.unlock();
352 if (connection
->policy
.server
) {
353 ldout(cct
, 1) << __func__
<< " server, going to standby, even though i have stuff queued" << dendl
;
355 connection
->write_lock
.unlock();
359 connection
->write_lock
.unlock();
361 if (!(state
>= START_CONNECT
&& state
<= SESSION_RECONNECTING
) &&
363 state
!= SESSION_ACCEPTING
/* due to connection race */) {
364 // policy maybe empty when state is in accept
365 if (connection
->policy
.server
) {
366 ldout(cct
, 1) << __func__
<< " server, going to standby" << dendl
;
369 ldout(cct
, 1) << __func__
<< " initiating reconnect" << dendl
;
371 global_seq
= messenger
->get_global_seq();
372 state
= START_CONNECT
;
373 pre_auth
.enabled
= true;
374 connection
->state
= AsyncConnection::STATE_CONNECTING
;
377 connection
->center
->dispatch_event_external(connection
->read_handler
);
380 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
381 } else if (backoff
== utime_t()) {
382 backoff
.set_from_double(cct
->_conf
->ms_initial_backoff
);
385 if (backoff
> cct
->_conf
->ms_max_backoff
)
386 backoff
.set_from_double(cct
->_conf
->ms_max_backoff
);
393 global_seq
= messenger
->get_global_seq();
394 state
= START_CONNECT
;
395 pre_auth
.enabled
= true;
396 connection
->state
= AsyncConnection::STATE_CONNECTING
;
397 ldout(cct
, 1) << __func__
<< " waiting " << backoff
<< dendl
;
399 connection
->register_time_events
.insert(
400 connection
->center
->create_time_event(backoff
.to_nsec() / 1000,
401 connection
->wakeup_handler
));
406 void ProtocolV2::prepare_send_message(uint64_t features
,
408 ldout(cct
, 20) << __func__
<< " m=" << *m
<< dendl
;
410 // associate message with Connection (for benefit of encode_payload)
411 ldout(cct
, 20) << __func__
<< (m
->empty_payload() ? " encoding features " : " half-reencoding features ")
412 << features
<< " " << m
<< " " << *m
<< dendl
;
414 // encode and copy out of *m
415 m
->encode(features
, 0);
418 void ProtocolV2::send_message(Message
*m
) {
419 uint64_t f
= connection
->get_features();
421 // TODO: Currently not all messages supports reencode like MOSDMap, so here
422 // only let fast dispatch support messages prepare message
423 const bool can_fast_prepare
= messenger
->ms_can_fast_dispatch(m
);
424 if (can_fast_prepare
) {
425 prepare_send_message(f
, m
);
428 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
429 bool is_prepared
= can_fast_prepare
;
430 // "features" changes will change the payload encoding
431 if (can_fast_prepare
&& (!can_write
|| connection
->get_features() != f
)) {
432 // ensure the correctness of message encoding
435 ldout(cct
, 10) << __func__
<< " clear encoded buffer previous " << f
436 << " != " << connection
->get_features() << dendl
;
438 if (state
== CLOSED
) {
439 ldout(cct
, 10) << __func__
<< " connection closed."
440 << " Drop message " << m
<< dendl
;
443 ldout(cct
, 5) << __func__
<< " enqueueing message m=" << m
444 << " type=" << m
->get_type() << " " << *m
<< dendl
;
445 m
->queue_start
= ceph::mono_clock::now();
446 m
->trace
.event("async enqueueing message");
447 out_queue
[m
->get_priority()].emplace_back(
448 out_queue_entry_t
{is_prepared
, m
});
449 ldout(cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
451 if (((!replacing
&& can_write
) || state
== STANDBY
) && !write_in_progress
) {
452 write_in_progress
= true;
453 connection
->center
->dispatch_event_external(connection
->write_handler
);
458 void ProtocolV2::send_keepalive() {
459 ldout(cct
, 10) << __func__
<< dendl
;
460 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
461 if (state
!= CLOSED
) {
463 connection
->center
->dispatch_event_external(connection
->write_handler
);
467 void ProtocolV2::read_event() {
468 ldout(cct
, 20) << __func__
<< dendl
;
472 run_continuation(CONTINUATION(start_client_banner_exchange
));
475 run_continuation(CONTINUATION(start_server_banner_exchange
));
478 run_continuation(CONTINUATION(read_frame
));
480 case THROTTLE_MESSAGE
:
481 run_continuation(CONTINUATION(throttle_message
));
484 run_continuation(CONTINUATION(throttle_bytes
));
486 case THROTTLE_DISPATCH_QUEUE
:
487 run_continuation(CONTINUATION(throttle_dispatch_queue
));
494 ProtocolV2::out_queue_entry_t
ProtocolV2::_get_next_outgoing() {
495 out_queue_entry_t out_entry
;
497 if (!out_queue
.empty()) {
498 auto it
= out_queue
.rbegin();
499 auto& entries
= it
->second
;
500 ceph_assert(!entries
.empty());
501 out_entry
= entries
.front();
503 if (entries
.empty()) {
504 out_queue
.erase(it
->first
);
510 ssize_t
ProtocolV2::write_message(Message
*m
, bool more
) {
512 ceph_assert(connection
->center
->in_thread());
513 m
->set_seq(++out_seq
);
515 connection
->lock
.lock();
516 uint64_t ack_seq
= in_seq
;
518 connection
->lock
.unlock();
520 ceph_msg_header
&header
= m
->get_header();
521 ceph_msg_footer
&footer
= m
->get_footer();
523 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
524 header
.type
, header
.priority
,
526 init_le32(0), header
.data_off
,
528 footer
.flags
, header
.compat_version
,
531 auto message
= MessageFrame::Encode(
536 if (!append_frame(message
)) {
541 ldout(cct
, 5) << __func__
<< " sending message m=" << m
542 << " seq=" << m
->get_seq() << " " << *m
<< dendl
;
544 m
->trace
.event("async writing message");
545 ldout(cct
, 20) << __func__
<< " sending m=" << m
<< " seq=" << m
->get_seq()
546 << " src=" << entity_name_t(messenger
->get_myname())
547 << " off=" << header2
.data_off
549 ssize_t total_send_size
= connection
->outgoing_bl
.length();
550 ssize_t rc
= connection
->_try_send(more
);
552 ldout(cct
, 1) << __func__
<< " error sending " << m
<< ", "
553 << cpp_strerror(rc
) << dendl
;
555 connection
->logger
->inc(
556 l_msgr_send_bytes
, total_send_size
- connection
->outgoing_bl
.length());
557 ldout(cct
, 10) << __func__
<< " sending " << m
558 << (rc
? " continuely." : " done.") << dendl
;
561 #if defined(WITH_EVENTTRACE)
562 if (m
->get_type() == CEPH_MSG_OSD_OP
)
563 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
564 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
565 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
573 bool ProtocolV2::append_frame(F
& frame
) {
576 bl
= frame
.get_buffer(session_stream_handlers
);
577 } catch (ceph::crypto::onwire::TxHandlerError
&e
) {
578 ldout(cct
, 1) << __func__
<< " " << e
.what() << dendl
;
581 connection
->outgoing_bl
.append(bl
);
585 void ProtocolV2::handle_message_ack(uint64_t seq
) {
586 if (connection
->policy
.lossy
) { // lossy connections don't keep sent messages
590 ldout(cct
, 15) << __func__
<< " seq=" << seq
<< dendl
;
593 static const int max_pending
= 128;
595 Message
*pending
[max_pending
];
596 auto now
= ceph::mono_clock::now();
597 connection
->write_lock
.lock();
598 while (!sent
.empty() && sent
.front()->get_seq() <= seq
&& i
< max_pending
) {
599 Message
*m
= sent
.front();
602 ldout(cct
, 10) << __func__
<< " got ack seq " << seq
603 << " >= " << m
->get_seq() << " on " << m
<< " " << *m
606 connection
->write_lock
.unlock();
607 connection
->logger
->tinc(l_msgr_handle_ack_lat
, ceph::mono_clock::now() - now
);
608 for (int k
= 0; k
< i
; k
++) {
613 void ProtocolV2::write_event() {
614 ldout(cct
, 10) << __func__
<< dendl
;
617 connection
->write_lock
.lock();
620 ldout(cct
, 10) << __func__
<< " appending keepalive" << dendl
;
621 auto keepalive_frame
= KeepAliveFrame::Encode();
622 if (!append_frame(keepalive_frame
)) {
623 connection
->write_lock
.unlock();
624 connection
->lock
.lock();
626 connection
->lock
.unlock();
632 auto start
= ceph::mono_clock::now();
635 const auto out_entry
= _get_next_outgoing();
640 if (!connection
->policy
.lossy
) {
642 sent
.push_back(out_entry
.m
);
645 more
= !out_queue
.empty();
646 connection
->write_lock
.unlock();
648 // send_message or requeue messages may not encode message
649 if (!out_entry
.is_prepared
) {
650 prepare_send_message(connection
->get_features(), out_entry
.m
);
653 if (out_entry
.m
->queue_start
!= ceph::mono_time()) {
654 connection
->logger
->tinc(l_msgr_send_messages_queue_lat
,
655 ceph::mono_clock::now() -
656 out_entry
.m
->queue_start
);
659 r
= write_message(out_entry
.m
, more
);
661 connection
->write_lock
.lock();
665 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
668 // Outbound message in-progress, thread will be re-awoken
669 // when the outbound socket is writeable again
673 write_in_progress
= false;
675 // if r > 0 mean data still lefted, so no need _try_send.
677 uint64_t left
= ack_left
;
679 ldout(cct
, 10) << __func__
<< " try send msg ack, acked " << left
680 << " messages" << dendl
;
681 auto ack_frame
= AckFrame::Encode(in_seq
);
682 if (append_frame(ack_frame
)) {
685 r
= connection
->_try_send(left
);
689 } else if (is_queued()) {
690 r
= connection
->_try_send();
693 connection
->write_lock
.unlock();
695 connection
->logger
->tinc(l_msgr_running_send_time
,
696 ceph::mono_clock::now() - start
);
698 ldout(cct
, 1) << __func__
<< " send msg failed" << dendl
;
699 connection
->lock
.lock();
701 connection
->lock
.unlock();
705 write_in_progress
= false;
706 connection
->write_lock
.unlock();
707 connection
->lock
.lock();
708 connection
->write_lock
.lock();
709 if (state
== STANDBY
&& !connection
->policy
.server
&& is_queued()) {
710 ldout(cct
, 10) << __func__
<< " policy.server is false" << dendl
;
711 if (server_cookie
) { // only increment connect_seq if there is a session
714 connection
->_connect();
715 } else if (connection
->cs
&& state
!= NONE
&& state
!= CLOSED
&&
716 state
!= START_CONNECT
) {
717 r
= connection
->_try_send();
719 ldout(cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
720 connection
->write_lock
.unlock();
722 connection
->lock
.unlock();
726 connection
->write_lock
.unlock();
727 connection
->lock
.unlock();
731 bool ProtocolV2::is_queued() {
732 return !out_queue
.empty() || connection
->is_queued();
735 uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size
) const {
736 if (session_stream_handlers
.rx
) {
737 return segment_onwire_size(logical_size
);
743 uint32_t ProtocolV2::get_epilogue_size() const {
744 // In secure mode size of epilogue is flexible and depends on particular
745 // cipher implementation. See the comment for epilogue_secure_block_t or
746 // epilogue_plain_block_t.
747 if (session_stream_handlers
.rx
) {
748 return FRAME_SECURE_EPILOGUE_SIZE
+ \
749 session_stream_handlers
.rx
->get_extra_size_at_final();
751 return FRAME_PLAIN_EPILOGUE_SIZE
;
755 CtPtr
ProtocolV2::read(CONTINUATION_RXBPTR_TYPE
<ProtocolV2
> &next
,
756 rx_buffer_t
&&buffer
) {
757 const auto len
= buffer
->length();
758 const auto buf
= buffer
->c_str();
759 next
.node
= std::move(buffer
);
760 ssize_t r
= connection
->read(len
, buf
,
761 [&next
, this](char *buffer
, int r
) {
762 if (unlikely(pre_auth
.enabled
) && r
>= 0) {
763 pre_auth
.rxbuf
.append(*next
.node
);
764 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
765 pre_auth
.rxbuf
.length() < 1000000);
768 run_continuation(next
);
771 // error or done synchronously
772 if (unlikely(pre_auth
.enabled
) && r
>= 0) {
773 pre_auth
.rxbuf
.append(*next
.node
);
774 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
775 pre_auth
.rxbuf
.length() < 1000000);
785 CtPtr
ProtocolV2::write(const std::string
&desc
,
786 CONTINUATION_TYPE
<ProtocolV2
> &next
,
790 bl
= frame
.get_buffer(session_stream_handlers
);
791 } catch (ceph::crypto::onwire::TxHandlerError
&e
) {
792 ldout(cct
, 1) << __func__
<< " " << e
.what() << dendl
;
795 return write(desc
, next
, bl
);
798 CtPtr
ProtocolV2::write(const std::string
&desc
,
799 CONTINUATION_TYPE
<ProtocolV2
> &next
,
800 bufferlist
&buffer
) {
801 if (unlikely(pre_auth
.enabled
)) {
802 pre_auth
.txbuf
.append(buffer
);
803 ceph_assert(!cct
->_conf
->ms_die_on_bug
||
804 pre_auth
.txbuf
.length() < 1000000);
808 connection
->write(buffer
, [&next
, desc
, this](int r
) {
810 ldout(cct
, 1) << __func__
<< " " << desc
<< " write failed r=" << r
811 << " (" << cpp_strerror(r
) << ")" << dendl
;
812 connection
->inject_delay();
815 run_continuation(next
);
819 ldout(cct
, 1) << __func__
<< " " << desc
<< " write failed r=" << r
820 << " (" << cpp_strerror(r
) << ")" << dendl
;
830 CtPtr
ProtocolV2::_banner_exchange(CtRef callback
) {
831 ldout(cct
, 20) << __func__
<< dendl
;
832 bannerExchangeCallback
= &callback
;
834 bufferlist banner_payload
;
835 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
836 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
839 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
840 encode((uint16_t)banner_payload
.length(), bl
, 0);
841 bl
.claim_append(banner_payload
);
843 INTERCEPT(state
== BANNER_CONNECTING
? 3 : 4);
845 return WRITE(bl
, "banner", _wait_for_peer_banner
);
848 CtPtr
ProtocolV2::_wait_for_peer_banner() {
849 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(ceph_le16
);
850 return READ(banner_len
, _handle_peer_banner
);
853 CtPtr
ProtocolV2::_handle_peer_banner(rx_buffer_t
&&buffer
, int r
) {
854 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
857 ldout(cct
, 1) << __func__
<< " read peer banner failed r=" << r
<< " ("
858 << cpp_strerror(r
) << ")" << dendl
;
862 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
864 if (memcmp(buffer
->c_str(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
)) {
865 if (memcmp(buffer
->c_str(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
866 lderr(cct
) << __func__
<< " peer " << *connection
->peer_addrs
867 << " is using msgr V1 protocol" << dendl
;
870 ldout(cct
, 1) << __func__
<< " accept peer sent bad banner" << dendl
;
874 uint16_t payload_len
;
876 buffer
->set_offset(banner_prefix_len
);
877 buffer
->set_length(sizeof(ceph_le16
));
878 bl
.push_back(std::move(buffer
));
879 auto ti
= bl
.cbegin();
881 decode(payload_len
, ti
);
882 } catch (const buffer::error
&e
) {
883 lderr(cct
) << __func__
<< " decode banner payload len failed " << dendl
;
887 INTERCEPT(state
== BANNER_CONNECTING
? 5 : 6);
889 return READ(payload_len
, _handle_peer_banner_payload
);
892 CtPtr
ProtocolV2::_handle_peer_banner_payload(rx_buffer_t
&&buffer
, int r
) {
893 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
896 ldout(cct
, 1) << __func__
<< " read peer banner payload failed r=" << r
897 << " (" << cpp_strerror(r
) << ")" << dendl
;
901 uint64_t peer_supported_features
;
902 uint64_t peer_required_features
;
905 bl
.push_back(std::move(buffer
));
906 auto ti
= bl
.cbegin();
908 decode(peer_supported_features
, ti
);
909 decode(peer_required_features
, ti
);
910 } catch (const buffer::error
&e
) {
911 lderr(cct
) << __func__
<< " decode banner payload failed " << dendl
;
915 ldout(cct
, 1) << __func__
<< " supported=" << std::hex
916 << peer_supported_features
<< " required=" << std::hex
917 << peer_required_features
<< std::dec
<< dendl
;
919 // Check feature bit compatibility
921 uint64_t supported_features
= CEPH_MSGR2_SUPPORTED_FEATURES
;
922 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
924 if ((required_features
& peer_supported_features
) != required_features
) {
925 ldout(cct
, 1) << __func__
<< " peer does not support all required features"
926 << " required=" << std::hex
<< required_features
927 << " supported=" << std::hex
<< peer_supported_features
928 << std::dec
<< dendl
;
930 connection
->dispatch_queue
->queue_reset(connection
);
933 if ((supported_features
& peer_required_features
) != peer_required_features
) {
934 ldout(cct
, 1) << __func__
<< " we do not support all peer required features"
935 << " required=" << std::hex
<< peer_required_features
936 << " supported=" << supported_features
<< std::dec
<< dendl
;
938 connection
->dispatch_queue
->queue_reset(connection
);
942 this->peer_required_features
= peer_required_features
;
943 if (this->peer_required_features
== 0) {
944 this->connection_features
= msgr2_required
;
947 // at this point we can change how the client protocol behaves based on
948 // this->peer_required_features
950 if (state
== BANNER_CONNECTING
) {
951 state
= HELLO_CONNECTING
;
954 ceph_assert(state
== BANNER_ACCEPTING
);
955 state
= HELLO_ACCEPTING
;
958 auto hello
= HelloFrame::Encode(messenger
->get_mytype(),
959 connection
->target_addr
);
961 INTERCEPT(state
== HELLO_CONNECTING
? 7 : 8);
963 return WRITE(hello
, "hello frame", read_frame
);
966 CtPtr
ProtocolV2::handle_hello(ceph::bufferlist
&payload
)
968 ldout(cct
, 20) << __func__
969 << " payload.length()=" << payload
.length() << dendl
;
971 if (state
!= HELLO_CONNECTING
&& state
!= HELLO_ACCEPTING
) {
972 lderr(cct
) << __func__
<< " not in hello exchange state!" << dendl
;
976 auto hello
= HelloFrame::Decode(payload
);
978 ldout(cct
, 5) << __func__
<< " received hello:"
979 << " peer_type=" << (int)hello
.entity_type()
980 << " peer_addr_for_me=" << hello
.peer_addr() << dendl
;
983 socklen_t len
= sizeof(ss
);
984 getsockname(connection
->cs
.fd(), (sockaddr
*)&ss
, &len
);
985 ldout(cct
, 5) << __func__
<< " getsockname says I am " << (sockaddr
*)&ss
986 << " when talking to " << connection
->target_addr
<< dendl
;
988 if (connection
->get_peer_type() == -1) {
989 connection
->set_peer_type(hello
.entity_type());
991 ceph_assert(state
== HELLO_ACCEPTING
);
992 connection
->policy
= messenger
->get_policy(hello
.entity_type());
993 ldout(cct
, 10) << __func__
<< " accept of host_type "
994 << (int)hello
.entity_type()
995 << ", policy.lossy=" << connection
->policy
.lossy
996 << " policy.server=" << connection
->policy
.server
997 << " policy.standby=" << connection
->policy
.standby
998 << " policy.resetcheck=" << connection
->policy
.resetcheck
1001 ceph_assert(state
== HELLO_CONNECTING
);
1002 if (connection
->get_peer_type() != hello
.entity_type()) {
1003 ldout(cct
, 1) << __func__
<< " connection peer type does not match what"
1004 << " peer advertises " << connection
->get_peer_type()
1005 << " != " << (int)hello
.entity_type() << dendl
;
1007 connection
->dispatch_queue
->queue_reset(connection
);
1012 if (messenger
->get_myaddrs().empty() ||
1013 messenger
->get_myaddrs().front().is_blank_ip()) {
1015 if (cct
->_conf
->ms_learn_addr_from_peer
) {
1016 ldout(cct
, 1) << __func__
<< " peer " << connection
->target_addr
1017 << " says I am " << hello
.peer_addr() << " (socket says "
1018 << (sockaddr
*)&ss
<< ")" << dendl
;
1019 a
= hello
.peer_addr();
1021 ldout(cct
, 1) << __func__
<< " socket to " << connection
->target_addr
1022 << " says I am " << (sockaddr
*)&ss
1023 << " (peer says " << hello
.peer_addr() << ")" << dendl
;
1024 a
.set_sockaddr((sockaddr
*)&ss
);
1026 a
.set_type(entity_addr_t::TYPE_MSGR2
); // anything but NONE; learned_addr ignores this
1028 connection
->lock
.unlock();
1029 messenger
->learned_addr(a
);
1030 if (cct
->_conf
->ms_inject_internal_delays
&&
1031 cct
->_conf
->ms_inject_socket_failures
) {
1032 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
1033 ldout(cct
, 10) << __func__
<< " sleep for "
1034 << cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1036 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
1040 connection
->lock
.lock();
1041 if (state
!= HELLO_CONNECTING
) {
1042 ldout(cct
, 1) << __func__
1043 << " state changed while learned_addr, mark_down or "
1044 << " replacing must be happened just now" << dendl
;
1052 callback
= bannerExchangeCallback
;
1053 bannerExchangeCallback
= nullptr;
1054 ceph_assert(callback
);
1058 CtPtr
ProtocolV2::read_frame() {
1059 if (state
== CLOSED
) {
1063 ldout(cct
, 20) << __func__
<< dendl
;
1064 return READ(FRAME_PREAMBLE_SIZE
, handle_read_frame_preamble_main
);
1067 CtPtr
ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t
&&buffer
, int r
) {
1068 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1071 ldout(cct
, 1) << __func__
<< " read frame length and tag failed r=" << r
1072 << " (" << cpp_strerror(r
) << ")" << dendl
;
1076 ceph::bufferlist preamble
;
1077 preamble
.push_back(std::move(buffer
));
1079 ldout(cct
, 30) << __func__
<< " preamble\n";
1080 preamble
.hexdump(*_dout
);
1083 if (session_stream_handlers
.rx
) {
1084 ceph_assert(session_stream_handlers
.rx
);
1086 session_stream_handlers
.rx
->reset_rx_handler();
1087 preamble
= session_stream_handlers
.rx
->authenticated_decrypt_update(
1088 std::move(preamble
), segment_t::DEFAULT_ALIGNMENT
);
1090 ldout(cct
, 10) << __func__
<< " got encrypted preamble."
1091 << " after decrypt premable.length()=" << preamble
.length()
1094 ldout(cct
, 30) << __func__
<< " preamble after decrypt\n";
1095 preamble
.hexdump(*_dout
);
1100 // I expect ceph_le32 will make the endian conversion for me. Passing
1101 // everything through ::Decode is unnecessary.
1102 const auto& main_preamble
= \
1103 reinterpret_cast<preamble_block_t
&>(*preamble
.c_str());
1105 // verify preamble's CRC before any further processing
1106 const auto rx_crc
= ceph_crc32c(0,
1107 reinterpret_cast<const unsigned char*>(&main_preamble
),
1108 sizeof(main_preamble
) - sizeof(main_preamble
.crc
));
1109 if (rx_crc
!= main_preamble
.crc
) {
1110 ldout(cct
, 10) << __func__
<< " crc mismatch for main preamble"
1111 << " rx_crc=" << rx_crc
1112 << " tx_crc=" << main_preamble
.crc
<< dendl
;
1116 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1117 if (main_preamble
.num_segments
< 1 ||
1118 main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
1119 ldout(cct
, 10) << __func__
<< " unsupported num_segments="
1120 << " tx_crc=" << main_preamble
.num_segments
<< dendl
;
1124 next_tag
= static_cast<Tag
>(main_preamble
.tag
);
1126 rx_segments_desc
.clear();
1127 rx_segments_data
.clear();
1129 if (main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
1130 ldout(cct
, 30) << __func__
1131 << " num_segments=" << main_preamble
.num_segments
1132 << " is too much" << dendl
;
1135 for (std::uint8_t idx
= 0; idx
< main_preamble
.num_segments
; idx
++) {
1136 ldout(cct
, 10) << __func__
<< " got new segment:"
1137 << " len=" << main_preamble
.segments
[idx
].length
1138 << " align=" << main_preamble
.segments
[idx
].alignment
1140 rx_segments_desc
.emplace_back(main_preamble
.segments
[idx
]);
1144 // does it need throttle?
1145 if (next_tag
== Tag::MESSAGE
) {
1146 if (state
!= READY
) {
1147 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1150 state
= THROTTLE_MESSAGE
;
1151 return CONTINUE(throttle_message
);
1153 return read_frame_segment();
1157 CtPtr
ProtocolV2::handle_read_frame_dispatch() {
1158 ldout(cct
, 10) << __func__
1159 << " tag=" << static_cast<uint32_t>(next_tag
) << dendl
;
1163 case Tag::AUTH_REQUEST
:
1164 case Tag::AUTH_BAD_METHOD
:
1165 case Tag::AUTH_REPLY_MORE
:
1166 case Tag::AUTH_REQUEST_MORE
:
1167 case Tag::AUTH_DONE
:
1168 case Tag::AUTH_SIGNATURE
:
1169 case Tag::CLIENT_IDENT
:
1170 case Tag::SERVER_IDENT
:
1171 case Tag::IDENT_MISSING_FEATURES
:
1172 case Tag::SESSION_RECONNECT
:
1173 case Tag::SESSION_RESET
:
1174 case Tag::SESSION_RETRY
:
1175 case Tag::SESSION_RETRY_GLOBAL
:
1176 case Tag::SESSION_RECONNECT_OK
:
1177 case Tag::KEEPALIVE2
:
1178 case Tag::KEEPALIVE2_ACK
:
1181 return handle_frame_payload();
1183 return handle_message();
1185 lderr(cct
) << __func__
1186 << " received unknown tag=" << static_cast<uint32_t>(next_tag
)
1195 CtPtr
ProtocolV2::read_frame_segment() {
1196 ldout(cct
, 20) << __func__
<< dendl
;
1197 ceph_assert(!rx_segments_desc
.empty());
1199 // description of current segment to read
1200 const auto& cur_rx_desc
= rx_segments_desc
.at(rx_segments_data
.size());
1201 rx_buffer_t rx_buffer
;
1203 rx_buffer
= buffer::ptr_node::create(buffer::create_aligned(
1204 get_onwire_size(cur_rx_desc
.length
), cur_rx_desc
.alignment
));
1205 } catch (std::bad_alloc
&) {
1206 // Catching because of potential issues with satisfying alignment.
1207 ldout(cct
, 20) << __func__
<< " can't allocate aligned rx_buffer "
1208 << " len=" << get_onwire_size(cur_rx_desc
.length
)
1209 << " align=" << cur_rx_desc
.alignment
1214 return READ_RXBUF(std::move(rx_buffer
), handle_read_frame_segment
);
1217 CtPtr
ProtocolV2::handle_read_frame_segment(rx_buffer_t
&&rx_buffer
, int r
) {
1218 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1221 ldout(cct
, 1) << __func__
<< " read frame segment failed r=" << r
<< " ("
1222 << cpp_strerror(r
) << ")" << dendl
;
1226 rx_segments_data
.emplace_back();
1227 rx_segments_data
.back().push_back(std::move(rx_buffer
));
1229 // decrypt incoming data
1230 // FIXME: if (auth_meta->is_mode_secure()) {
1231 if (session_stream_handlers
.rx
) {
1232 ceph_assert(session_stream_handlers
.rx
);
1234 auto& new_seg
= rx_segments_data
.back();
1235 if (new_seg
.length()) {
1236 auto padded
= session_stream_handlers
.rx
->authenticated_decrypt_update(
1237 std::move(new_seg
), segment_t::DEFAULT_ALIGNMENT
);
1238 const auto idx
= rx_segments_data
.size() - 1;
1240 padded
.splice(0, rx_segments_desc
[idx
].length
, &new_seg
);
1242 ldout(cct
, 20) << __func__
1243 << " unpadded new_seg.length()=" << new_seg
.length()
1248 if (rx_segments_desc
.size() == rx_segments_data
.size()) {
1249 // OK, all segments planned to read are read. Can go with epilogue.
1250 return READ(get_epilogue_size(), handle_read_frame_epilogue_main
);
1252 // TODO: for makeshift only. This will be more generic and throttled
1253 return read_frame_segment();
1257 CtPtr
ProtocolV2::handle_frame_payload() {
1258 ceph_assert(!rx_segments_data
.empty());
1259 auto& payload
= rx_segments_data
.back();
1261 ldout(cct
, 30) << __func__
<< "\n";
1262 payload
.hexdump(*_dout
);
1267 return handle_hello(payload
);
1268 case Tag::AUTH_REQUEST
:
1269 return handle_auth_request(payload
);
1270 case Tag::AUTH_BAD_METHOD
:
1271 return handle_auth_bad_method(payload
);
1272 case Tag::AUTH_REPLY_MORE
:
1273 return handle_auth_reply_more(payload
);
1274 case Tag::AUTH_REQUEST_MORE
:
1275 return handle_auth_request_more(payload
);
1276 case Tag::AUTH_DONE
:
1277 return handle_auth_done(payload
);
1278 case Tag::AUTH_SIGNATURE
:
1279 return handle_auth_signature(payload
);
1280 case Tag::CLIENT_IDENT
:
1281 return handle_client_ident(payload
);
1282 case Tag::SERVER_IDENT
:
1283 return handle_server_ident(payload
);
1284 case Tag::IDENT_MISSING_FEATURES
:
1285 return handle_ident_missing_features(payload
);
1286 case Tag::SESSION_RECONNECT
:
1287 return handle_reconnect(payload
);
1288 case Tag::SESSION_RESET
:
1289 return handle_session_reset(payload
);
1290 case Tag::SESSION_RETRY
:
1291 return handle_session_retry(payload
);
1292 case Tag::SESSION_RETRY_GLOBAL
:
1293 return handle_session_retry_global(payload
);
1294 case Tag::SESSION_RECONNECT_OK
:
1295 return handle_reconnect_ok(payload
);
1296 case Tag::KEEPALIVE2
:
1297 return handle_keepalive2(payload
);
1298 case Tag::KEEPALIVE2_ACK
:
1299 return handle_keepalive2_ack(payload
);
1301 return handle_message_ack(payload
);
1303 return handle_wait(payload
);
1310 CtPtr
ProtocolV2::ready() {
1311 ldout(cct
, 25) << __func__
<< dendl
;
1313 reconnecting
= false;
1316 // make sure no pending tick timer
1317 if (connection
->last_tick_id
) {
1318 connection
->center
->delete_time_event(connection
->last_tick_id
);
1320 connection
->last_tick_id
= connection
->center
->create_time_event(
1321 connection
->inactive_timeout_us
, connection
->tick_handler
);
1324 std::lock_guard
<std::mutex
> l(connection
->write_lock
);
1326 if (!out_queue
.empty()) {
1327 connection
->center
->dispatch_event_external(connection
->write_handler
);
1331 connection
->maybe_start_delay_thread();
1334 ldout(cct
, 1) << __func__
<< " entity=" << peer_name
<< " client_cookie="
1335 << std::hex
<< client_cookie
<< " server_cookie="
1336 << server_cookie
<< std::dec
<< " in_seq=" << in_seq
1337 << " out_seq=" << out_seq
<< dendl
;
1341 return CONTINUE(read_frame
);
1344 CtPtr
ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t
&&buffer
, int r
)
1346 ldout(cct
, 20) << __func__
<< " r=" << r
<< dendl
;
1349 ldout(cct
, 1) << __func__
<< " read data error " << dendl
;
1355 // FIXME: if (auth_meta->is_mode_secure()) {
1356 if (session_stream_handlers
.rx
) {
1357 ldout(cct
, 1) << __func__
<< " read frame epilogue bytes="
1358 << get_epilogue_size() << dendl
;
1360 // decrypt epilogue and authenticate entire frame.
1361 ceph::bufferlist epilogue_bl
;
1363 epilogue_bl
.push_back(std::move(buffer
));
1366 session_stream_handlers
.rx
->authenticated_decrypt_update_final(
1367 std::move(epilogue_bl
), segment_t::DEFAULT_ALIGNMENT
);
1368 } catch (ceph::crypto::onwire::MsgAuthError
&e
) {
1369 ldout(cct
, 5) << __func__
<< " message authentication failed: "
1370 << e
.what() << dendl
;
1375 reinterpret_cast<epilogue_plain_block_t
&>(*epilogue_bl
.c_str());
1376 late_flags
= epilogue
.late_flags
;
1378 auto& epilogue
= reinterpret_cast<epilogue_plain_block_t
&>(*buffer
->c_str());
1380 for (std::uint8_t idx
= 0; idx
< rx_segments_data
.size(); idx
++) {
1381 const __u32 expected_crc
= epilogue
.crc_values
[idx
];
1382 const __u32 calculated_crc
= rx_segments_data
[idx
].crc32c(-1);
1383 if (expected_crc
!= calculated_crc
) {
1384 ldout(cct
, 5) << __func__
<< " message integrity check failed: "
1385 << " expected_crc=" << expected_crc
1386 << " calculated_crc=" << calculated_crc
1390 ldout(cct
, 20) << __func__
<< " message integrity check success: "
1391 << " expected_crc=" << expected_crc
1392 << " calculated_crc=" << calculated_crc
1396 late_flags
= epilogue
.late_flags
;
1399 // we do have a mechanism that allows transmitter to start sending message
1400 // and abort after putting entire data field on wire. This will be used by
1401 // the kernel client to avoid unnecessary buffering.
1402 if (late_flags
& FRAME_FLAGS_LATEABRT
) {
1405 return CONTINUE(read_frame
);
1407 return handle_read_frame_dispatch();
1411 CtPtr
ProtocolV2::handle_message() {
1412 ldout(cct
, 20) << __func__
<< dendl
;
1413 ceph_assert(state
== THROTTLE_DONE
);
1415 #if defined(WITH_EVENTTRACE)
1416 utime_t ltt_recv_stamp
= ceph_clock_now();
1418 recv_stamp
= ceph_clock_now();
1420 // we need to get the size before std::moving segments data
1421 const size_t cur_msg_size
= get_current_msg_size();
1422 auto msg_frame
= MessageFrame::Decode(std::move(rx_segments_data
));
1424 // XXX: paranoid copy just to avoid oops
1425 ceph_msg_header2 current_header
= msg_frame
.header();
1427 ldout(cct
, 5) << __func__
1428 << " got " << msg_frame
.front_len()
1429 << " + " << msg_frame
.middle_len()
1430 << " + " << msg_frame
.data_len()
1432 << " envelope type=" << current_header
.type
1433 << " src " << peer_name
1434 << " off " << current_header
.data_off
1438 ceph_msg_header header
{current_header
.seq
,
1440 current_header
.type
,
1441 current_header
.priority
,
1442 current_header
.version
,
1443 init_le32(msg_frame
.front_len()),
1444 init_le32(msg_frame
.middle_len()),
1445 init_le32(msg_frame
.data_len()),
1446 current_header
.data_off
,
1448 current_header
.compat_version
,
1449 current_header
.reserved
,
1451 ceph_msg_footer footer
{init_le32(0), init_le32(0),
1452 init_le32(0), init_le64(0), current_header
.flags
};
1454 Message
*message
= decode_message(cct
, 0, header
, footer
,
1460 ldout(cct
, 1) << __func__
<< " decode message failed " << dendl
;
1463 state
= READ_MESSAGE_COMPLETE
;
1468 message
->set_byte_throttler(connection
->policy
.throttler_bytes
);
1469 message
->set_message_throttler(connection
->policy
.throttler_messages
);
1471 // store reservation size in message, so we don't get confused
1472 // by messages entering the dispatch queue through other paths.
1473 message
->set_dispatch_throttle_size(cur_msg_size
);
1475 message
->set_recv_stamp(recv_stamp
);
1476 message
->set_throttle_stamp(throttle_stamp
);
1477 message
->set_recv_complete_stamp(ceph_clock_now());
1479 // check received seq#. if it is old, drop the message.
1480 // note that incoming messages may skip ahead. this is convenient for the
1481 // client side queueing because messages can't be renumbered, but the (kernel)
1482 // client will occasionally pull a message out of the sent queue to send
1483 // elsewhere. in that case it doesn't matter if we "got" it or not.
1484 uint64_t cur_seq
= in_seq
;
1485 if (message
->get_seq() <= cur_seq
) {
1486 ldout(cct
, 0) << __func__
<< " got old message " << message
->get_seq()
1487 << " <= " << cur_seq
<< " " << message
<< " " << *message
1488 << ", discarding" << dendl
;
1490 if (connection
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
1491 cct
->_conf
->ms_die_on_old_message
) {
1492 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1496 if (message
->get_seq() > cur_seq
+ 1) {
1497 ldout(cct
, 0) << __func__
<< " missed message? skipped from seq "
1498 << cur_seq
<< " to " << message
->get_seq() << dendl
;
1499 if (cct
->_conf
->ms_die_on_skipped_message
) {
1500 ceph_assert(0 == "skipped incoming seq");
1504 #if defined(WITH_EVENTTRACE)
1505 if (message
->get_type() == CEPH_MSG_OSD_OP
||
1506 message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
1507 utime_t ltt_processed_stamp
= ceph_clock_now();
1508 double usecs_elapsed
=
1509 (ltt_processed_stamp
.to_nsec() - ltt_recv_stamp
.to_nsec()) / 1000;
1511 if (message
->get_type() == CEPH_MSG_OSD_OP
)
1512 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP",
1515 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY",
1520 // note last received message.
1521 in_seq
= message
->get_seq();
1522 ldout(cct
, 5) << __func__
<< " received message m=" << message
1523 << " seq=" << message
->get_seq()
1524 << " from=" << message
->get_source() << " type=" << header
.type
1525 << " " << *message
<< dendl
;
1527 bool need_dispatch_writer
= false;
1528 if (!connection
->policy
.lossy
) {
1530 need_dispatch_writer
= true;
1535 ceph::mono_time fast_dispatch_time
;
1537 if (connection
->is_blackhole()) {
1538 ldout(cct
, 10) << __func__
<< " blackhole " << *message
<< dendl
;
1543 connection
->logger
->inc(l_msgr_recv_messages
);
1544 connection
->logger
->inc(
1546 cur_msg_size
+ sizeof(ceph_msg_header
) + sizeof(ceph_msg_footer
));
1548 messenger
->ms_fast_preprocess(message
);
1549 fast_dispatch_time
= ceph::mono_clock::now();
1550 connection
->logger
->tinc(l_msgr_running_recv_time
,
1551 fast_dispatch_time
- connection
->recv_start_time
);
1552 if (connection
->delay_state
) {
1553 double delay_period
= 0;
1554 if (rand() % 10000 < cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1556 cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1557 ldout(cct
, 1) << "queue_received will delay after "
1558 << (ceph_clock_now() + delay_period
) << " on " << message
1559 << " " << *message
<< dendl
;
1561 connection
->delay_state
->queue(delay_period
, message
);
1562 } else if (messenger
->ms_can_fast_dispatch(message
)) {
1563 connection
->lock
.unlock();
1564 connection
->dispatch_queue
->fast_dispatch(message
);
1565 connection
->recv_start_time
= ceph::mono_clock::now();
1566 connection
->logger
->tinc(l_msgr_running_fast_dispatch_time
,
1567 connection
->recv_start_time
- fast_dispatch_time
);
1568 connection
->lock
.lock();
1569 // we might have been reused by another connection
1570 // let's check if that is the case
1571 if (state
!= READY
) {
1572 // yes, that was the case, let's do nothing
1576 connection
->dispatch_queue
->enqueue(message
, message
->get_priority(),
1577 connection
->conn_id
);
1580 handle_message_ack(current_header
.ack_seq
);
1583 if (need_dispatch_writer
&& connection
->is_connected()) {
1584 connection
->center
->dispatch_event_external(connection
->write_handler
);
1587 return CONTINUE(read_frame
);
1591 CtPtr
ProtocolV2::throttle_message() {
1592 ldout(cct
, 20) << __func__
<< dendl
;
1594 if (connection
->policy
.throttler_messages
) {
1595 ldout(cct
, 10) << __func__
<< " wants " << 1
1596 << " message from policy throttler "
1597 << connection
->policy
.throttler_messages
->get_current()
1598 << "/" << connection
->policy
.throttler_messages
->get_max()
1600 if (!connection
->policy
.throttler_messages
->get_or_fail()) {
1601 ldout(cct
, 10) << __func__
<< " wants 1 message from policy throttle "
1602 << connection
->policy
.throttler_messages
->get_current()
1603 << "/" << connection
->policy
.throttler_messages
->get_max()
1604 << " failed, just wait." << dendl
;
1605 // following thread pool deal with th full message queue isn't a
1606 // short time, so we can wait a ms.
1607 if (connection
->register_time_events
.empty()) {
1608 connection
->register_time_events
.insert(
1609 connection
->center
->create_time_event(1000,
1610 connection
->wakeup_handler
));
1616 state
= THROTTLE_BYTES
;
1617 return CONTINUE(throttle_bytes
);
1620 CtPtr
ProtocolV2::throttle_bytes() {
1621 ldout(cct
, 20) << __func__
<< dendl
;
1623 const size_t cur_msg_size
= get_current_msg_size();
1625 if (connection
->policy
.throttler_bytes
) {
1626 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
1627 << " bytes from policy throttler "
1628 << connection
->policy
.throttler_bytes
->get_current() << "/"
1629 << connection
->policy
.throttler_bytes
->get_max() << dendl
;
1630 if (!connection
->policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
1631 ldout(cct
, 10) << __func__
<< " wants " << cur_msg_size
1632 << " bytes from policy throttler "
1633 << connection
->policy
.throttler_bytes
->get_current()
1634 << "/" << connection
->policy
.throttler_bytes
->get_max()
1635 << " failed, just wait." << dendl
;
1636 // following thread pool deal with th full message queue isn't a
1637 // short time, so we can wait a ms.
1638 if (connection
->register_time_events
.empty()) {
1639 connection
->register_time_events
.insert(
1640 connection
->center
->create_time_event(
1641 1000, connection
->wakeup_handler
));
1648 state
= THROTTLE_DISPATCH_QUEUE
;
1649 return CONTINUE(throttle_dispatch_queue
);
1652 CtPtr
ProtocolV2::throttle_dispatch_queue() {
1653 ldout(cct
, 20) << __func__
<< dendl
;
1655 const size_t cur_msg_size
= get_current_msg_size();
1657 if (!connection
->dispatch_queue
->dispatch_throttler
.get_or_fail(
1660 << __func__
<< " wants " << cur_msg_size
1661 << " bytes from dispatch throttle "
1662 << connection
->dispatch_queue
->dispatch_throttler
.get_current() << "/"
1663 << connection
->dispatch_queue
->dispatch_throttler
.get_max()
1664 << " failed, just wait." << dendl
;
1665 // following thread pool deal with th full message queue isn't a
1666 // short time, so we can wait a ms.
1667 if (connection
->register_time_events
.empty()) {
1668 connection
->register_time_events
.insert(
1669 connection
->center
->create_time_event(1000,
1670 connection
->wakeup_handler
));
1676 throttle_stamp
= ceph_clock_now();
1677 state
= THROTTLE_DONE
;
1679 return read_frame_segment();
1682 CtPtr
ProtocolV2::handle_keepalive2(ceph::bufferlist
&payload
)
1684 ldout(cct
, 20) << __func__
1685 << " payload.length()=" << payload
.length() << dendl
;
1687 if (state
!= READY
) {
1688 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1692 auto keepalive_frame
= KeepAliveFrame::Decode(payload
);
1694 ldout(cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
1696 connection
->write_lock
.lock();
1697 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(keepalive_frame
.timestamp());
1698 if (!append_frame(keepalive_ack_frame
)) {
1699 connection
->write_lock
.unlock();
1702 connection
->write_lock
.unlock();
1704 ldout(cct
, 20) << __func__
<< " got KEEPALIVE2 "
1705 << keepalive_frame
.timestamp() << dendl
;
1706 connection
->set_last_keepalive(ceph_clock_now());
1708 if (is_connected()) {
1709 connection
->center
->dispatch_event_external(connection
->write_handler
);
1712 return CONTINUE(read_frame
);
1715 CtPtr
ProtocolV2::handle_keepalive2_ack(ceph::bufferlist
&payload
)
1717 ldout(cct
, 20) << __func__
1718 << " payload.length()=" << payload
.length() << dendl
;
1720 if (state
!= READY
) {
1721 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1725 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(payload
);
1726 connection
->set_last_keepalive_ack(keepalive_ack_frame
.timestamp());
1727 ldout(cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
1729 return CONTINUE(read_frame
);
1732 CtPtr
ProtocolV2::handle_message_ack(ceph::bufferlist
&payload
)
1734 ldout(cct
, 20) << __func__
1735 << " payload.length()=" << payload
.length() << dendl
;
1737 if (state
!= READY
) {
1738 lderr(cct
) << __func__
<< " not in ready state!" << dendl
;
1742 auto ack
= AckFrame::Decode(payload
);
1743 handle_message_ack(ack
.seq());
1744 return CONTINUE(read_frame
);
1747 /* Client Protocol Methods */
1749 CtPtr
ProtocolV2::start_client_banner_exchange() {
1750 ldout(cct
, 20) << __func__
<< dendl
;
1754 state
= BANNER_CONNECTING
;
1756 global_seq
= messenger
->get_global_seq();
1758 return _banner_exchange(CONTINUATION(post_client_banner_exchange
));
1761 CtPtr
ProtocolV2::post_client_banner_exchange() {
1762 ldout(cct
, 20) << __func__
<< dendl
;
1764 state
= AUTH_CONNECTING
;
1766 return send_auth_request();
1769 CtPtr
ProtocolV2::send_auth_request(std::vector
<uint32_t> &allowed_methods
) {
1770 ceph_assert(messenger
->auth_client
);
1771 ldout(cct
, 20) << __func__
<< " peer_type " << (int)connection
->peer_type
1772 << " auth_client " << messenger
->auth_client
<< dendl
;
1775 vector
<uint32_t> preferred_modes
;
1776 auto am
= auth_meta
;
1777 connection
->lock
.unlock();
1778 int r
= messenger
->auth_client
->get_auth_request(
1779 connection
, am
.get(),
1780 &am
->auth_method
, &preferred_modes
, &bl
);
1781 connection
->lock
.lock();
1782 if (state
!= AUTH_CONNECTING
) {
1783 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1787 ldout(cct
, 0) << __func__
<< " get_initial_auth_request returned " << r
1790 connection
->dispatch_queue
->queue_reset(connection
);
1796 auto frame
= AuthRequestFrame::Encode(auth_meta
->auth_method
, preferred_modes
,
1798 return WRITE(frame
, "auth request", read_frame
);
1801 CtPtr
ProtocolV2::handle_auth_bad_method(ceph::bufferlist
&payload
) {
1802 ldout(cct
, 20) << __func__
1803 << " payload.length()=" << payload
.length() << dendl
;
1805 if (state
!= AUTH_CONNECTING
) {
1806 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1810 auto bad_method
= AuthBadMethodFrame::Decode(payload
);
1811 ldout(cct
, 1) << __func__
<< " method=" << bad_method
.method()
1812 << " result " << cpp_strerror(bad_method
.result())
1813 << ", allowed methods=" << bad_method
.allowed_methods()
1814 << ", allowed modes=" << bad_method
.allowed_modes()
1816 ceph_assert(messenger
->auth_client
);
1817 auto am
= auth_meta
;
1818 connection
->lock
.unlock();
1819 int r
= messenger
->auth_client
->handle_auth_bad_method(
1822 bad_method
.method(), bad_method
.result(),
1823 bad_method
.allowed_methods(),
1824 bad_method
.allowed_modes());
1825 connection
->lock
.lock();
1826 if (state
!= AUTH_CONNECTING
|| r
< 0) {
1829 return send_auth_request(bad_method
.allowed_methods());
1832 CtPtr
ProtocolV2::handle_auth_reply_more(ceph::bufferlist
&payload
)
1834 ldout(cct
, 20) << __func__
1835 << " payload.length()=" << payload
.length() << dendl
;
1837 if (state
!= AUTH_CONNECTING
) {
1838 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1842 auto auth_more
= AuthReplyMoreFrame::Decode(payload
);
1843 ldout(cct
, 5) << __func__
1844 << " auth reply more len=" << auth_more
.auth_payload().length()
1846 ceph_assert(messenger
->auth_client
);
1847 ceph::bufferlist reply
;
1848 auto am
= auth_meta
;
1849 connection
->lock
.unlock();
1850 int r
= messenger
->auth_client
->handle_auth_reply_more(
1851 connection
, am
.get(), auth_more
.auth_payload(), &reply
);
1852 connection
->lock
.lock();
1853 if (state
!= AUTH_CONNECTING
) {
1854 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1858 lderr(cct
) << __func__
<< " auth_client handle_auth_reply_more returned "
1862 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
1863 return WRITE(more_reply
, "auth request more", read_frame
);
1866 CtPtr
ProtocolV2::handle_auth_done(ceph::bufferlist
&payload
)
1868 ldout(cct
, 20) << __func__
1869 << " payload.length()=" << payload
.length() << dendl
;
1871 if (state
!= AUTH_CONNECTING
) {
1872 lderr(cct
) << __func__
<< " not in auth connect state!" << dendl
;
1876 auto auth_done
= AuthDoneFrame::Decode(payload
);
1878 ceph_assert(messenger
->auth_client
);
1879 auto am
= auth_meta
;
1880 connection
->lock
.unlock();
1881 int r
= messenger
->auth_client
->handle_auth_done(
1884 auth_done
.global_id(),
1885 auth_done
.con_mode(),
1886 auth_done
.auth_payload(),
1888 &am
->connection_secret
);
1889 connection
->lock
.lock();
1890 if (state
!= AUTH_CONNECTING
) {
1891 ldout(cct
, 1) << __func__
<< " state changed!" << dendl
;
1897 auth_meta
->con_mode
= auth_done
.con_mode();
1898 session_stream_handlers
= \
1899 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct
, *auth_meta
, false);
1901 state
= AUTH_CONNECTING_SIGN
;
1903 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1904 auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.rxbuf
);
1905 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1906 pre_auth
.enabled
= false;
1907 pre_auth
.rxbuf
.clear();
1908 return WRITE(sig_frame
, "auth signature", read_frame
);
1911 CtPtr
ProtocolV2::finish_client_auth() {
1912 if (!server_cookie
) {
1913 ceph_assert(connect_seq
== 0);
1914 state
= SESSION_CONNECTING
;
1915 return send_client_ident();
1916 } else { // reconnecting to previous session
1917 state
= SESSION_RECONNECTING
;
1918 ceph_assert(connect_seq
> 0);
1919 return send_reconnect();
1923 CtPtr
ProtocolV2::send_client_ident() {
1924 ldout(cct
, 20) << __func__
<< dendl
;
1926 if (!connection
->policy
.lossy
&& !client_cookie
) {
1927 client_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1931 if (connection
->policy
.lossy
) {
1932 flags
|= CEPH_MSG_CONNECT_LOSSY
;
1935 auto client_ident
= ClientIdentFrame::Encode(
1936 messenger
->get_myaddrs(),
1937 connection
->target_addr
,
1938 messenger
->get_myname().num(),
1940 connection
->policy
.features_supported
,
1941 connection
->policy
.features_required
| msgr2_required
,
1945 ldout(cct
, 5) << __func__
<< " sending identification: "
1946 << "addrs=" << messenger
->get_myaddrs()
1947 << " target=" << connection
->target_addr
1948 << " gid=" << messenger
->get_myname().num()
1949 << " global_seq=" << global_seq
1950 << " features_supported=" << std::hex
1951 << connection
->policy
.features_supported
1952 << " features_required="
1953 << (connection
->policy
.features_required
| msgr2_required
)
1954 << " flags=" << flags
1955 << " cookie=" << client_cookie
<< std::dec
<< dendl
;
1959 return WRITE(client_ident
, "client ident", read_frame
);
1962 CtPtr
ProtocolV2::send_reconnect() {
1963 ldout(cct
, 20) << __func__
<< dendl
;
1965 auto reconnect
= ReconnectFrame::Encode(messenger
->get_myaddrs(),
1972 ldout(cct
, 5) << __func__
<< " reconnect to session: client_cookie="
1973 << std::hex
<< client_cookie
<< " server_cookie="
1974 << server_cookie
<< std::dec
1975 << " gs=" << global_seq
<< " cs=" << connect_seq
1976 << " ms=" << in_seq
<< dendl
;
1980 return WRITE(reconnect
, "reconnect", read_frame
);
1983 CtPtr
ProtocolV2::handle_ident_missing_features(ceph::bufferlist
&payload
)
1985 ldout(cct
, 20) << __func__
1986 << " payload.length()=" << payload
.length() << dendl
;
1988 if (state
!= SESSION_CONNECTING
) {
1989 lderr(cct
) << __func__
<< " not in session connect state!" << dendl
;
1993 auto ident_missing
=
1994 IdentMissingFeaturesFrame::Decode(payload
);
1995 lderr(cct
) << __func__
1996 << " client does not support all server features: " << std::hex
1997 << ident_missing
.features() << std::dec
<< dendl
;
2002 CtPtr
ProtocolV2::handle_session_reset(ceph::bufferlist
&payload
)
2004 ldout(cct
, 20) << __func__
2005 << " payload.length()=" << payload
.length() << dendl
;
2007 if (state
!= SESSION_RECONNECTING
) {
2008 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
2012 auto reset
= ResetFrame::Decode(payload
);
2014 ldout(cct
, 1) << __func__
<< " received session reset full=" << reset
.full()
2024 state
= SESSION_CONNECTING
;
2025 return send_client_ident();
2028 CtPtr
ProtocolV2::handle_session_retry(ceph::bufferlist
&payload
)
2030 ldout(cct
, 20) << __func__
2031 << " payload.length()=" << payload
.length() << dendl
;
2033 if (state
!= SESSION_RECONNECTING
) {
2034 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
2038 auto retry
= RetryFrame::Decode(payload
);
2039 connect_seq
= retry
.connect_seq() + 1;
2041 ldout(cct
, 1) << __func__
2042 << " received session retry connect_seq=" << retry
.connect_seq()
2043 << ", inc to cs=" << connect_seq
<< dendl
;
2045 return send_reconnect();
2048 CtPtr
ProtocolV2::handle_session_retry_global(ceph::bufferlist
&payload
)
2050 ldout(cct
, 20) << __func__
2051 << " payload.length()=" << payload
.length() << dendl
;
2053 if (state
!= SESSION_RECONNECTING
) {
2054 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
2058 auto retry
= RetryGlobalFrame::Decode(payload
);
2059 global_seq
= messenger
->get_global_seq(retry
.global_seq());
2061 ldout(cct
, 1) << __func__
<< " received session retry global global_seq="
2062 << retry
.global_seq() << ", choose new gs=" << global_seq
2065 return send_reconnect();
2068 CtPtr
ProtocolV2::handle_wait(ceph::bufferlist
&payload
) {
2069 ldout(cct
, 20) << __func__
2070 << " received WAIT (connection race)"
2071 << " payload.length()=" << payload
.length()
2074 if (state
!= SESSION_CONNECTING
&& state
!= SESSION_RECONNECTING
) {
2075 lderr(cct
) << __func__
<< " not in session (re)connect state!" << dendl
;
2080 WaitFrame::Decode(payload
);
2084 CtPtr
ProtocolV2::handle_reconnect_ok(ceph::bufferlist
&payload
)
2086 ldout(cct
, 20) << __func__
2087 << " payload.length()=" << payload
.length() << dendl
;
2089 if (state
!= SESSION_RECONNECTING
) {
2090 lderr(cct
) << __func__
<< " not in session reconnect state!" << dendl
;
2094 auto reconnect_ok
= ReconnectOkFrame::Decode(payload
);
2095 ldout(cct
, 5) << __func__
2096 << " reconnect accepted: sms=" << reconnect_ok
.msg_seq()
2099 out_seq
= discard_requeued_up_to(out_seq
, reconnect_ok
.msg_seq());
2101 backoff
= utime_t();
2102 ldout(cct
, 10) << __func__
<< " reconnect success " << connect_seq
2103 << ", lossy = " << connection
->policy
.lossy
<< ", features "
2104 << connection
->get_features() << dendl
;
2106 if (connection
->delay_state
) {
2107 ceph_assert(connection
->delay_state
->ready());
2110 connection
->dispatch_queue
->queue_connect(connection
);
2111 messenger
->ms_deliver_handle_fast_connect(connection
);
2116 CtPtr
ProtocolV2::handle_server_ident(ceph::bufferlist
&payload
)
2118 ldout(cct
, 20) << __func__
2119 << " payload.length()=" << payload
.length() << dendl
;
2121 if (state
!= SESSION_CONNECTING
) {
2122 lderr(cct
) << __func__
<< " not in session connect state!" << dendl
;
2126 auto server_ident
= ServerIdentFrame::Decode(payload
);
2127 ldout(cct
, 5) << __func__
<< " received server identification:"
2128 << " addrs=" << server_ident
.addrs()
2129 << " gid=" << server_ident
.gid()
2130 << " global_seq=" << server_ident
.global_seq()
2131 << " features_supported=" << std::hex
2132 << server_ident
.supported_features()
2133 << " features_required=" << server_ident
.required_features()
2134 << " flags=" << server_ident
.flags() << " cookie=" << std::dec
2135 << server_ident
.cookie() << dendl
;
2137 // is this who we intended to talk to?
2138 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2139 // of mon_host or something.
2140 if (!server_ident
.addrs().contains(connection
->target_addr
)) {
2141 ldout(cct
,1) << __func__
<< " peer identifies as " << server_ident
.addrs()
2142 << ", does not include " << connection
->target_addr
<< dendl
;
2146 server_cookie
= server_ident
.cookie();
2148 connection
->set_peer_addrs(server_ident
.addrs());
2149 peer_name
= entity_name_t(connection
->get_peer_type(), server_ident
.gid());
2150 connection
->set_features(server_ident
.supported_features() &
2151 connection
->policy
.features_supported
);
2152 peer_global_seq
= server_ident
.global_seq();
2154 connection
->policy
.lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
2156 backoff
= utime_t();
2157 ldout(cct
, 10) << __func__
<< " connect success " << connect_seq
2158 << ", lossy = " << connection
->policy
.lossy
<< ", features "
2159 << connection
->get_features() << dendl
;
2161 if (connection
->delay_state
) {
2162 ceph_assert(connection
->delay_state
->ready());
2165 connection
->dispatch_queue
->queue_connect(connection
);
2166 messenger
->ms_deliver_handle_fast_connect(connection
);
2171 /* Server Protocol Methods */
2173 CtPtr
ProtocolV2::start_server_banner_exchange() {
2174 ldout(cct
, 20) << __func__
<< dendl
;
2178 state
= BANNER_ACCEPTING
;
2180 return _banner_exchange(CONTINUATION(post_server_banner_exchange
));
2183 CtPtr
ProtocolV2::post_server_banner_exchange() {
2184 ldout(cct
, 20) << __func__
<< dendl
;
2186 state
= AUTH_ACCEPTING
;
2188 return CONTINUE(read_frame
);
2191 CtPtr
ProtocolV2::handle_auth_request(ceph::bufferlist
&payload
) {
2192 ldout(cct
, 20) << __func__
<< " payload.length()=" << payload
.length()
2195 if (state
!= AUTH_ACCEPTING
) {
2196 lderr(cct
) << __func__
<< " not in auth accept state!" << dendl
;
2200 auto request
= AuthRequestFrame::Decode(payload
);
2201 ldout(cct
, 10) << __func__
<< " AuthRequest(method=" << request
.method()
2202 << ", preferred_modes=" << request
.preferred_modes()
2203 << ", payload_len=" << request
.auth_payload().length() << ")"
2205 auth_meta
->auth_method
= request
.method();
2206 auth_meta
->con_mode
= messenger
->auth_server
->pick_con_mode(
2207 connection
->get_peer_type(), auth_meta
->auth_method
,
2208 request
.preferred_modes());
2209 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
2210 return _auth_bad_method(-EOPNOTSUPP
);
2212 return _handle_auth_request(request
.auth_payload(), false);
2215 CtPtr
ProtocolV2::_auth_bad_method(int r
)
2218 std::vector
<uint32_t> allowed_methods
;
2219 std::vector
<uint32_t> allowed_modes
;
2220 messenger
->auth_server
->get_supported_auth_methods(
2221 connection
->get_peer_type(), &allowed_methods
, &allowed_modes
);
2222 ldout(cct
, 1) << __func__
<< " auth_method " << auth_meta
->auth_method
2223 << " r " << cpp_strerror(r
)
2224 << ", allowed_methods " << allowed_methods
2225 << ", allowed_modes " << allowed_modes
2227 auto bad_method
= AuthBadMethodFrame::Encode(auth_meta
->auth_method
, r
,
2228 allowed_methods
, allowed_modes
);
2229 return WRITE(bad_method
, "bad auth method", read_frame
);
2232 CtPtr
ProtocolV2::_handle_auth_request(bufferlist
& auth_payload
, bool more
)
2234 if (!messenger
->auth_server
) {
2238 auto am
= auth_meta
;
2239 connection
->lock
.unlock();
2240 int r
= messenger
->auth_server
->handle_auth_request(
2241 connection
, am
.get(),
2242 more
, am
->auth_method
, auth_payload
,
2244 connection
->lock
.lock();
2245 if (state
!= AUTH_ACCEPTING
&& state
!= AUTH_ACCEPTING_MORE
) {
2246 ldout(cct
, 1) << __func__
2247 << " state changed while accept, it must be mark_down"
2249 ceph_assert(state
== CLOSED
);
2254 state
= AUTH_ACCEPTING_SIGN
;
2256 auto auth_done
= AuthDoneFrame::Encode(connection
->peer_global_id
,
2257 auth_meta
->con_mode
,
2259 return WRITE(auth_done
, "auth done", finish_auth
);
2260 } else if (r
== 0) {
2261 state
= AUTH_ACCEPTING_MORE
;
2263 auto more
= AuthReplyMoreFrame::Encode(reply
);
2264 return WRITE(more
, "auth reply more", read_frame
);
2265 } else if (r
== -EBUSY
) {
2266 // kick the client and maybe they'll come back later
2269 return _auth_bad_method(r
);
2273 CtPtr
ProtocolV2::finish_auth()
2275 ceph_assert(auth_meta
);
2276 // TODO: having a possibility to check whether we're server or client could
2277 // allow reusing finish_auth().
2278 session_stream_handlers
= \
2279 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct
, *auth_meta
, true);
2281 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
2282 auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.rxbuf
);
2283 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
2284 pre_auth
.enabled
= false;
2285 pre_auth
.rxbuf
.clear();
2286 return WRITE(sig_frame
, "auth signature", read_frame
);
2289 CtPtr
ProtocolV2::handle_auth_request_more(ceph::bufferlist
&payload
)
2291 ldout(cct
, 20) << __func__
2292 << " payload.length()=" << payload
.length() << dendl
;
2294 if (state
!= AUTH_ACCEPTING_MORE
) {
2295 lderr(cct
) << __func__
<< " not in auth accept more state!" << dendl
;
2299 auto auth_more
= AuthRequestMoreFrame::Decode(payload
);
2300 return _handle_auth_request(auth_more
.auth_payload(), true);
2303 CtPtr
ProtocolV2::handle_auth_signature(ceph::bufferlist
&payload
)
2305 ldout(cct
, 20) << __func__
2306 << " payload.length()=" << payload
.length() << dendl
;
2308 if (state
!= AUTH_ACCEPTING_SIGN
&& state
!= AUTH_CONNECTING_SIGN
) {
2309 lderr(cct
) << __func__
2310 << " pre-auth verification signature seen in wrong state!"
2315 auto sig_frame
= AuthSignatureFrame::Decode(payload
);
2317 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
2318 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(cct
, pre_auth
.txbuf
);
2319 if (sig_frame
.signature() != actual_tx_sig
) {
2320 ldout(cct
, 2) << __func__
<< " pre-auth signature mismatch"
2321 << " actual_tx_sig=" << actual_tx_sig
2322 << " sig_frame.signature()=" << sig_frame
.signature()
2326 ldout(cct
, 20) << __func__
<< " pre-auth signature success"
2327 << " sig_frame.signature()=" << sig_frame
.signature()
2329 pre_auth
.txbuf
.clear();
2332 if (state
== AUTH_ACCEPTING_SIGN
) {
2333 // server had sent AuthDone and client responded with correct pre-auth
2334 // signature. we can start accepting new sessions/reconnects.
2335 state
= SESSION_ACCEPTING
;
2336 return CONTINUE(read_frame
);
2337 } else if (state
== AUTH_CONNECTING_SIGN
) {
2338 // this happened at client side
2339 return finish_client_auth();
2341 ceph_abort("state corruption");
2345 CtPtr
ProtocolV2::handle_client_ident(ceph::bufferlist
&payload
)
2347 ldout(cct
, 20) << __func__
2348 << " payload.length()=" << payload
.length() << dendl
;
2350 if (state
!= SESSION_ACCEPTING
) {
2351 lderr(cct
) << __func__
<< " not in session accept state!" << dendl
;
2355 auto client_ident
= ClientIdentFrame::Decode(payload
);
2357 ldout(cct
, 5) << __func__
<< " received client identification:"
2358 << " addrs=" << client_ident
.addrs()
2359 << " target=" << client_ident
.target_addr()
2360 << " gid=" << client_ident
.gid()
2361 << " global_seq=" << client_ident
.global_seq()
2362 << " features_supported=" << std::hex
2363 << client_ident
.supported_features()
2364 << " features_required=" << client_ident
.required_features()
2365 << " flags=" << client_ident
.flags()
2366 << " cookie=" << client_ident
.cookie() << std::dec
<< dendl
;
2368 if (client_ident
.addrs().empty() ||
2369 client_ident
.addrs().front() == entity_addr_t()) {
2370 ldout(cct
,5) << __func__
<< " oops, client_ident.addrs() is empty" << dendl
;
2371 return _fault(); // a v2 peer should never do this
2373 if (!messenger
->get_myaddrs().contains(client_ident
.target_addr())) {
2374 ldout(cct
,5) << __func__
<< " peer is trying to reach "
2375 << client_ident
.target_addr()
2376 << " which is not us (" << messenger
->get_myaddrs() << ")"
2381 connection
->set_peer_addrs(client_ident
.addrs());
2382 connection
->target_addr
= connection
->_infer_target_addr(client_ident
.addrs());
2384 peer_name
= entity_name_t(connection
->get_peer_type(), client_ident
.gid());
2385 connection
->set_peer_id(client_ident
.gid());
2387 client_cookie
= client_ident
.cookie();
2389 uint64_t feat_missing
=
2390 (connection
->policy
.features_required
| msgr2_required
) &
2391 ~(uint64_t)client_ident
.supported_features();
2393 ldout(cct
, 1) << __func__
<< " peer missing required features " << std::hex
2394 << feat_missing
<< std::dec
<< dendl
;
2395 auto ident_missing_features
=
2396 IdentMissingFeaturesFrame::Encode(feat_missing
);
2398 return WRITE(ident_missing_features
, "ident missing features", read_frame
);
2401 connection_features
=
2402 client_ident
.supported_features() & connection
->policy
.features_supported
;
2404 peer_global_seq
= client_ident
.global_seq();
2406 if (connection
->policy
.server
&&
2407 connection
->policy
.lossy
&&
2408 !connection
->policy
.register_lossy_clients
) {
2409 // incoming lossy client, no need to register this connection
2411 // Looks good so far, let's check if there is already an existing connection
2413 connection
->lock
.unlock();
2414 AsyncConnectionRef existing
= messenger
->lookup_conn(
2415 *connection
->peer_addrs
);
2418 existing
->protocol
->proto_type
!= 2) {
2419 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2420 << existing
->protocol
.get() << " version is "
2421 << existing
->protocol
->proto_type
<< ", marking down"
2423 existing
->mark_down();
2427 connection
->inject_delay();
2429 connection
->lock
.lock();
2430 if (state
!= SESSION_ACCEPTING
) {
2431 ldout(cct
, 1) << __func__
2432 << " state changed while accept, it must be mark_down"
2434 ceph_assert(state
== CLOSED
);
2439 return handle_existing_connection(existing
);
2443 // if everything is OK reply with server identification
2444 return send_server_ident();
2447 CtPtr
ProtocolV2::handle_reconnect(ceph::bufferlist
&payload
)
2449 ldout(cct
, 20) << __func__
2450 << " payload.length()=" << payload
.length() << dendl
;
2452 if (state
!= SESSION_ACCEPTING
) {
2453 lderr(cct
) << __func__
<< " not in session accept state!" << dendl
;
2457 auto reconnect
= ReconnectFrame::Decode(payload
);
2459 ldout(cct
, 5) << __func__
2460 << " received reconnect:"
2461 << " client_cookie=" << std::hex
<< reconnect
.client_cookie()
2462 << " server_cookie=" << reconnect
.server_cookie() << std::dec
2463 << " gs=" << reconnect
.global_seq()
2464 << " cs=" << reconnect
.connect_seq()
2465 << " ms=" << reconnect
.msg_seq()
2468 // Should we check if one of the ident.addrs match connection->target_addr
2469 // as we do in ProtocolV1?
2470 connection
->set_peer_addrs(reconnect
.addrs());
2471 connection
->target_addr
= connection
->_infer_target_addr(reconnect
.addrs());
2472 peer_global_seq
= reconnect
.global_seq();
2474 connection
->lock
.unlock();
2475 AsyncConnectionRef existing
= messenger
->lookup_conn(*connection
->peer_addrs
);
2478 existing
->protocol
->proto_type
!= 2) {
2479 ldout(cct
,1) << __func__
<< " existing " << existing
<< " proto "
2480 << existing
->protocol
.get() << " version is "
2481 << existing
->protocol
->proto_type
<< ", marking down" << dendl
;
2482 existing
->mark_down();
2486 connection
->inject_delay();
2488 connection
->lock
.lock();
2489 if (state
!= SESSION_ACCEPTING
) {
2490 ldout(cct
, 1) << __func__
2491 << " state changed while accept, it must be mark_down"
2493 ceph_assert(state
== CLOSED
);
2498 // there is no existing connection therefore cannot reconnect to previous
2500 ldout(cct
, 0) << __func__
2501 << " no existing connection exists, reseting client" << dendl
;
2502 auto reset
= ResetFrame::Encode(true);
2503 return WRITE(reset
, "session reset", read_frame
);
2506 std::lock_guard
<std::mutex
> l(existing
->lock
);
2508 ProtocolV2
*exproto
= dynamic_cast<ProtocolV2
*>(existing
->protocol
.get());
2510 ldout(cct
, 1) << __func__
<< " existing=" << existing
<< dendl
;
2514 if (exproto
->state
== CLOSED
) {
2515 ldout(cct
, 5) << __func__
<< " existing " << existing
2516 << " already closed. Reseting client" << dendl
;
2517 auto reset
= ResetFrame::Encode(true);
2518 return WRITE(reset
, "session reset", read_frame
);
2521 if (exproto
->replacing
) {
2522 ldout(cct
, 1) << __func__
2523 << " existing racing replace happened while replacing."
2524 << " existing=" << existing
<< dendl
;
2525 auto retry
= RetryGlobalFrame::Encode(exproto
->peer_global_seq
);
2526 return WRITE(retry
, "session retry", read_frame
);
2529 if (exproto
->client_cookie
!= reconnect
.client_cookie()) {
2530 ldout(cct
, 1) << __func__
<< " existing=" << existing
2531 << " client cookie mismatch, I must have reseted:"
2532 << " cc=" << std::hex
<< exproto
->client_cookie
2533 << " rcc=" << reconnect
.client_cookie()
2534 << ", reseting client." << std::dec
2536 auto reset
= ResetFrame::Encode(connection
->policy
.resetcheck
);
2537 return WRITE(reset
, "session reset", read_frame
);
2538 } else if (exproto
->server_cookie
== 0) {
2539 // this happens when:
2540 // - a connects to b
2541 // - a sends client_ident
2542 // - b gets client_ident, sends server_ident and sets cookie X
2543 // - connection fault
2544 // - b reconnects to a with cookie X, connect_seq=1
2545 // - a has cookie==0
2546 ldout(cct
, 1) << __func__
<< " I was a client and didn't received the"
2547 << " server_ident. Asking peer to resume session"
2548 << " establishment" << dendl
;
2549 auto reset
= ResetFrame::Encode(false);
2550 return WRITE(reset
, "session reset", read_frame
);
2553 if (exproto
->peer_global_seq
> reconnect
.global_seq()) {
2554 ldout(cct
, 5) << __func__
2555 << " stale global_seq: sgs=" << exproto
->peer_global_seq
2556 << " cgs=" << reconnect
.global_seq()
2557 << ", ask client to retry global" << dendl
;
2558 auto retry
= RetryGlobalFrame::Encode(exproto
->peer_global_seq
);
2562 return WRITE(retry
, "session retry", read_frame
);
2565 if (exproto
->connect_seq
> reconnect
.connect_seq()) {
2566 ldout(cct
, 5) << __func__
2567 << " stale connect_seq scs=" << exproto
->connect_seq
2568 << " ccs=" << reconnect
.connect_seq()
2569 << " , ask client to retry" << dendl
;
2570 auto retry
= RetryFrame::Encode(exproto
->connect_seq
);
2571 return WRITE(retry
, "session retry", read_frame
);
2574 if (exproto
->connect_seq
== reconnect
.connect_seq()) {
2575 // reconnect race: both peers are sending reconnect messages
2576 if (existing
->peer_addrs
->msgr2_addr() >
2577 messenger
->get_myaddrs().msgr2_addr() &&
2578 !existing
->policy
.server
) {
2579 // the existing connection wins
2582 << " reconnect race detected, this connection loses to existing="
2583 << existing
<< dendl
;
2585 auto wait
= WaitFrame::Encode();
2586 return WRITE(wait
, "wait", read_frame
);
2588 // this connection wins
2589 ldout(cct
, 1) << __func__
2590 << " reconnect race detected, replacing existing="
2591 << existing
<< " socket by this connection's socket"
2596 ldout(cct
, 1) << __func__
<< " reconnect to existing=" << existing
<< dendl
;
2598 reconnecting
= true;
2600 // everything looks good
2601 exproto
->connect_seq
= reconnect
.connect_seq();
2602 exproto
->message_seq
= reconnect
.msg_seq();
2604 return reuse_connection(existing
, exproto
);
2607 CtPtr
ProtocolV2::handle_existing_connection(const AsyncConnectionRef
& existing
) {
2608 ldout(cct
, 20) << __func__
<< " existing=" << existing
<< dendl
;
2610 std::lock_guard
<std::mutex
> l(existing
->lock
);
2612 ProtocolV2
*exproto
= dynamic_cast<ProtocolV2
*>(existing
->protocol
.get());
2614 ldout(cct
, 1) << __func__
<< " existing=" << existing
<< dendl
;
2618 if (exproto
->state
== CLOSED
) {
2619 ldout(cct
, 1) << __func__
<< " existing " << existing
<< " already closed."
2621 return send_server_ident();
2624 if (exproto
->replacing
) {
2625 ldout(cct
, 1) << __func__
2626 << " existing racing replace happened while replacing."
2627 << " existing=" << existing
<< dendl
;
2628 auto wait
= WaitFrame::Encode();
2629 return WRITE(wait
, "wait", read_frame
);
2632 if (exproto
->peer_global_seq
> peer_global_seq
) {
2633 ldout(cct
, 1) << __func__
<< " this is a stale connection, peer_global_seq="
2635 << " existing->peer_global_seq=" << exproto
->peer_global_seq
2636 << ", stopping this connection." << dendl
;
2638 connection
->dispatch_queue
->queue_reset(connection
);
2642 if (existing
->policy
.lossy
) {
2643 // existing connection can be thrown out in favor of this one
2645 << __func__
<< " existing=" << existing
2646 << " is a lossy channel. Stopping existing in favor of this connection"
2648 existing
->protocol
->stop();
2649 existing
->dispatch_queue
->queue_reset(existing
.get());
2650 return send_server_ident();
2653 if (exproto
->server_cookie
&& exproto
->client_cookie
&&
2654 exproto
->client_cookie
!= client_cookie
) {
2655 // Found previous session
2656 // peer has reseted and we're going to reuse the existing connection
2657 // by replacing the communication socket
2658 ldout(cct
, 1) << __func__
<< " found previous session existing=" << existing
2659 << ", peer must have reseted." << dendl
;
2660 if (connection
->policy
.resetcheck
) {
2661 exproto
->reset_session();
2663 return reuse_connection(existing
, exproto
);
2666 if (exproto
->client_cookie
== client_cookie
) {
2667 // session establishment interrupted between client_ident and server_ident,
2669 ldout(cct
, 1) << __func__
<< " found previous session existing=" << existing
2670 << ", continuing session establishment." << dendl
;
2671 return reuse_connection(existing
, exproto
);
2674 if (exproto
->state
== READY
|| exproto
->state
== STANDBY
) {
2675 ldout(cct
, 1) << __func__
<< " existing=" << existing
2676 << " is READY/STANDBY, lets reuse it" << dendl
;
2677 return reuse_connection(existing
, exproto
);
2680 // Looks like a connection race: server and client are both connecting to
2681 // each other at the same time.
2682 if (connection
->peer_addrs
->msgr2_addr() <
2683 messenger
->get_myaddrs().msgr2_addr() ||
2684 existing
->policy
.server
) {
2685 // this connection wins
2686 ldout(cct
, 1) << __func__
2687 << " connection race detected, replacing existing="
2688 << existing
<< " socket by this connection's socket" << dendl
;
2689 return reuse_connection(existing
, exproto
);
2691 // the existing connection wins
2694 << " connection race detected, this connection loses to existing="
2695 << existing
<< dendl
;
2696 ceph_assert(connection
->peer_addrs
->msgr2_addr() >
2697 messenger
->get_myaddrs().msgr2_addr());
2699 // make sure we follow through with opening the existing
2700 // connection (if it isn't yet open) since we know the peer
2701 // has something to send to us.
2702 existing
->send_keepalive();
2703 auto wait
= WaitFrame::Encode();
2704 return WRITE(wait
, "wait", read_frame
);
2708 CtPtr
ProtocolV2::reuse_connection(const AsyncConnectionRef
& existing
,
2709 ProtocolV2
*exproto
) {
2710 ldout(cct
, 20) << __func__
<< " existing=" << existing
2711 << " reconnect=" << reconnecting
<< dendl
;
2713 connection
->inject_delay();
2715 std::lock_guard
<std::mutex
> l(existing
->write_lock
);
2717 connection
->center
->delete_file_event(connection
->cs
.fd(),
2718 EVENT_READABLE
| EVENT_WRITABLE
);
2720 if (existing
->delay_state
) {
2721 existing
->delay_state
->flush();
2722 ceph_assert(!connection
->delay_state
);
2724 exproto
->reset_recv_state();
2725 exproto
->pre_auth
.enabled
= false;
2727 if (!reconnecting
) {
2728 exproto
->client_cookie
= client_cookie
;
2729 exproto
->peer_name
= peer_name
;
2730 exproto
->connection_features
= connection_features
;
2731 existing
->set_features(connection_features
);
2733 exproto
->peer_global_seq
= peer_global_seq
;
2735 ceph_assert(connection
->center
->in_thread());
2736 auto temp_cs
= std::move(connection
->cs
);
2737 EventCenter
*new_center
= connection
->center
;
2738 Worker
*new_worker
= connection
->worker
;
2739 // we can steal the session_stream_handlers under the assumption
2740 // this happens in the event center's thread as there should be
2741 // no user outside its boundaries (simlarly to e.g. outgoing_bl).
2742 auto temp_stream_handlers
= std::move(session_stream_handlers
);
2743 exproto
->auth_meta
= auth_meta
;
2745 ldout(messenger
->cct
, 5) << __func__
<< " stop myself to swap existing"
2748 // avoid _stop shutdown replacing socket
2749 // queue a reset on the new connection, which we're dumping for the old
2752 connection
->dispatch_queue
->queue_reset(connection
);
2754 exproto
->can_write
= false;
2755 exproto
->write_in_progress
= false;
2756 exproto
->reconnecting
= reconnecting
;
2757 exproto
->replacing
= true;
2758 existing
->state_offset
= 0;
2759 // avoid previous thread modify event
2760 exproto
->state
= NONE
;
2761 existing
->state
= AsyncConnection::STATE_NONE
;
2762 // Discard existing prefetch buffer in `recv_buf`
2763 existing
->recv_start
= existing
->recv_end
= 0;
2764 // there shouldn't exist any buffer
2765 ceph_assert(connection
->recv_start
== connection
->recv_end
);
2767 auto deactivate_existing
= std::bind(
2772 temp_stream_handlers
=std::move(temp_stream_handlers
)
2773 ](ConnectedSocket
&cs
) mutable {
2774 // we need to delete time event in original thread
2776 std::lock_guard
<std::mutex
> l(existing
->lock
);
2777 existing
->write_lock
.lock();
2778 exproto
->requeue_sent();
2779 // XXX: do we really need the locking for `outgoing_bl`? There is
2780 // a comment just above its definition saying "lockfree, only used
2781 // in own thread". I'm following lockfull schema just in the case.
2782 // From performance point of view it should be fine – this happens
2783 // far away from hot paths.
2784 existing
->outgoing_bl
.clear();
2785 existing
->open_write
= false;
2786 exproto
->session_stream_handlers
= std::move(temp_stream_handlers
);
2787 existing
->write_lock
.unlock();
2788 if (exproto
->state
== NONE
) {
2789 existing
->shutdown_socket();
2790 existing
->cs
= std::move(cs
);
2791 existing
->worker
->references
--;
2792 new_worker
->references
++;
2793 existing
->logger
= new_worker
->get_perf_counter();
2794 existing
->worker
= new_worker
;
2795 existing
->center
= new_center
;
2796 if (existing
->delay_state
)
2797 existing
->delay_state
->set_center(new_center
);
2798 } else if (exproto
->state
== CLOSED
) {
2799 auto back_to_close
= std::bind(
2800 [](ConnectedSocket
&cs
) mutable { cs
.close(); }, std::move(cs
));
2801 new_center
->submit_to(new_center
->get_id(),
2802 std::move(back_to_close
), true);
2809 // Before changing existing->center, it may already exists some
2810 // events in existing->center's queue. Then if we mark down
2811 // `existing`, it will execute in another thread and clean up
2812 // connection. Previous event will result in segment fault
2813 auto transfer_existing
= [existing
, exproto
]() mutable {
2814 std::lock_guard
<std::mutex
> l(existing
->lock
);
2815 if (exproto
->state
== CLOSED
) return;
2816 ceph_assert(exproto
->state
== NONE
);
2818 exproto
->state
= SESSION_ACCEPTING
;
2819 // we have called shutdown_socket above
2820 ceph_assert(existing
->last_tick_id
== 0);
2821 // restart timer since we are going to re-build connection
2822 existing
->last_connect_started
= ceph::coarse_mono_clock::now();
2823 existing
->last_tick_id
= existing
->center
->create_time_event(
2824 existing
->connect_timeout_us
, existing
->tick_handler
);
2825 existing
->state
= AsyncConnection::STATE_CONNECTION_ESTABLISHED
;
2826 existing
->center
->create_file_event(existing
->cs
.fd(), EVENT_READABLE
,
2827 existing
->read_handler
);
2828 if (!exproto
->reconnecting
) {
2829 exproto
->run_continuation(exproto
->send_server_ident());
2831 exproto
->run_continuation(exproto
->send_reconnect_ok());
2834 if (existing
->center
->in_thread())
2835 transfer_existing();
2837 existing
->center
->submit_to(existing
->center
->get_id(),
2838 std::move(transfer_existing
), true);
2840 std::move(temp_cs
));
2842 existing
->center
->submit_to(existing
->center
->get_id(),
2843 std::move(deactivate_existing
), true);
2847 CtPtr
ProtocolV2::send_server_ident() {
2848 ldout(cct
, 20) << __func__
<< dendl
;
2850 // this is required for the case when this connection is being replaced
2851 out_seq
= discard_requeued_up_to(out_seq
, 0);
2854 if (!connection
->policy
.lossy
) {
2855 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
2859 if (connection
->policy
.lossy
) {
2860 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
2863 uint64_t gs
= messenger
->get_global_seq();
2864 auto server_ident
= ServerIdentFrame::Encode(
2865 messenger
->get_myaddrs(),
2866 messenger
->get_myname().num(),
2868 connection
->policy
.features_supported
,
2869 connection
->policy
.features_required
| msgr2_required
,
2873 ldout(cct
, 5) << __func__
<< " sending identification:"
2874 << " addrs=" << messenger
->get_myaddrs()
2875 << " gid=" << messenger
->get_myname().num()
2876 << " global_seq=" << gs
<< " features_supported=" << std::hex
2877 << connection
->policy
.features_supported
2878 << " features_required="
2879 << (connection
->policy
.features_required
| msgr2_required
)
2880 << " flags=" << flags
<< " cookie=" << std::dec
<< server_cookie
2883 connection
->lock
.unlock();
2884 // Because "replacing" will prevent other connections preempt this addr,
2885 // it's safe that here we don't acquire Connection's lock
2886 ssize_t r
= messenger
->accept_conn(connection
);
2888 connection
->inject_delay();
2890 connection
->lock
.lock();
2893 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2894 << connection
->peer_addrs
->msgr2_addr()
2895 << " just fail later one(this)" << dendl
;
2896 connection
->inject_delay();
2899 if (state
!= SESSION_ACCEPTING
) {
2900 ldout(cct
, 1) << __func__
2901 << " state changed while accept_conn, it must be mark_down"
2903 ceph_assert(state
== CLOSED
|| state
== NONE
);
2904 messenger
->unregister_conn(connection
);
2905 connection
->inject_delay();
2909 connection
->set_features(connection_features
);
2912 connection
->dispatch_queue
->queue_accept(connection
);
2913 messenger
->ms_deliver_handle_fast_accept(connection
);
2917 return WRITE(server_ident
, "server ident", server_ready
);
2920 CtPtr
ProtocolV2::server_ready() {
2921 ldout(cct
, 20) << __func__
<< dendl
;
2923 if (connection
->delay_state
) {
2924 ceph_assert(connection
->delay_state
->ready());
2930 CtPtr
ProtocolV2::send_reconnect_ok() {
2931 ldout(cct
, 20) << __func__
<< dendl
;
2933 out_seq
= discard_requeued_up_to(out_seq
, message_seq
);
2935 uint64_t ms
= in_seq
;
2936 auto reconnect_ok
= ReconnectOkFrame::Encode(ms
);
2938 ldout(cct
, 5) << __func__
<< " sending reconnect_ok: msg_seq=" << ms
<< dendl
;
2940 connection
->lock
.unlock();
2941 // Because "replacing" will prevent other connections preempt this addr,
2942 // it's safe that here we don't acquire Connection's lock
2943 ssize_t r
= messenger
->accept_conn(connection
);
2945 connection
->inject_delay();
2947 connection
->lock
.lock();
2950 ldout(cct
, 1) << __func__
<< " existing race replacing process for addr = "
2951 << connection
->peer_addrs
->msgr2_addr()
2952 << " just fail later one(this)" << dendl
;
2953 connection
->inject_delay();
2956 if (state
!= SESSION_ACCEPTING
) {
2957 ldout(cct
, 1) << __func__
2958 << " state changed while accept_conn, it must be mark_down"
2960 ceph_assert(state
== CLOSED
|| state
== NONE
);
2961 messenger
->unregister_conn(connection
);
2962 connection
->inject_delay();
2967 connection
->dispatch_queue
->queue_accept(connection
);
2968 messenger
->ms_deliver_handle_fast_accept(connection
);
2972 return WRITE(reconnect_ok
, "reconnect ok", server_ready
);