1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include "include/Context.h"
20 #include "common/errno.h"
21 #include "AsyncMessenger.h"
22 #include "AsyncConnection.h"
24 #include "messages/MOSDOp.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/EventTrace.h"
28 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
29 #define SEQ_MASK 0x7fffffff
31 #define dout_subsys ceph_subsys_ms
33 #define dout_prefix _conn_prefix(_dout)
34 ostream
& AsyncConnection::_conn_prefix(std::ostream
*_dout
) {
35 return *_dout
<< "-- " << async_msgr
->get_myinst().addr
<< " >> " << peer_addr
<< " conn(" << this
37 << " s=" << get_state_name(state
)
38 << " pgs=" << peer_global_seq
39 << " cs=" << connect_seq
40 << " l=" << policy
.lossy
45 // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
47 const int AsyncConnection::TCP_PREFETCH_MIN_SIZE
= 512;
48 const int ASYNC_COALESCE_THRESHOLD
= 256;
50 class C_time_wakeup
: public EventCallback
{
51 AsyncConnectionRef conn
;
54 explicit C_time_wakeup(AsyncConnectionRef c
): conn(c
) {}
55 void do_request(int fd_or_id
) override
{
56 conn
->wakeup_from(fd_or_id
);
60 class C_handle_read
: public EventCallback
{
61 AsyncConnectionRef conn
;
64 explicit C_handle_read(AsyncConnectionRef c
): conn(c
) {}
65 void do_request(int fd_or_id
) override
{
70 class C_handle_write
: public EventCallback
{
71 AsyncConnectionRef conn
;
74 explicit C_handle_write(AsyncConnectionRef c
): conn(c
) {}
75 void do_request(int fd
) override
{
80 class C_clean_handler
: public EventCallback
{
81 AsyncConnectionRef conn
;
83 explicit C_clean_handler(AsyncConnectionRef c
): conn(c
) {}
84 void do_request(int id
) override
{
90 class C_tick_wakeup
: public EventCallback
{
91 AsyncConnectionRef conn
;
94 explicit C_tick_wakeup(AsyncConnectionRef c
): conn(c
) {}
95 void do_request(int fd_or_id
) override
{
100 static void alloc_aligned_buffer(bufferlist
& data
, unsigned len
, unsigned off
)
102 // create a buffer to read into that matches the data alignment
104 if (off
& ~CEPH_PAGE_MASK
) {
107 head
= MIN(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
108 data
.push_back(buffer::create(head
));
111 unsigned middle
= left
& CEPH_PAGE_MASK
;
113 data
.push_back(buffer::create_page_aligned(middle
));
117 data
.push_back(buffer::create(left
));
121 AsyncConnection::AsyncConnection(CephContext
*cct
, AsyncMessenger
*m
, DispatchQueue
*q
,
123 : Connection(cct
, m
), delay_state(NULL
), async_msgr(m
), conn_id(q
->get_id()),
124 logger(w
->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
125 state(STATE_NONE
), state_after_send(STATE_NONE
), port(-1),
126 dispatch_queue(q
), can_write(WriteStatus::NOWRITE
),
127 keepalive(false), recv_buf(NULL
),
128 recv_max_prefetch(MAX(msgr
->cct
->_conf
->ms_tcp_prefetch_max_size
, TCP_PREFETCH_MIN_SIZE
)),
129 recv_start(0), recv_end(0),
130 last_active(ceph::coarse_mono_clock::now()),
131 inactive_timeout_us(cct
->_conf
->ms_tcp_read_timeout
*1000*1000),
132 authorizer(NULL
), replacing(false),
133 is_reset_from_peer(false), once_ready(false), state_buffer(NULL
), state_offset(0),
134 worker(w
), center(&w
->center
)
136 read_handler
= new C_handle_read(this);
137 write_handler
= new C_handle_write(this);
138 wakeup_handler
= new C_time_wakeup(this);
139 tick_handler
= new C_tick_wakeup(this);
140 memset(msgvec
, 0, sizeof(msgvec
));
141 // double recv_max_prefetch see "read_until"
142 recv_buf
= new char[2*recv_max_prefetch
];
143 state_buffer
= new char[4096];
144 logger
->inc(l_msgr_created_connections
);
147 AsyncConnection::~AsyncConnection()
149 assert(out_q
.empty());
150 assert(sent
.empty());
155 delete[] state_buffer
;
156 assert(!delay_state
);
159 void AsyncConnection::maybe_start_delay_thread()
162 auto pos
= async_msgr
->cct
->_conf
->get_val
<std::string
>("ms_inject_delay_type").find(ceph_entity_type_name(peer_type
));
163 if (pos
!= string::npos
) {
164 ldout(msgr
->cct
, 1) << __func__
<< " setting up a delay queue" << dendl
;
165 delay_state
= new DelayedDelivery(async_msgr
, center
, dispatch_queue
, conn_id
);
170 /* return -1 means `fd` occurs error or closed, it should be closed
171 * return 0 means EAGAIN or EINTR */
172 ssize_t
AsyncConnection::read_bulk(char *buf
, unsigned len
)
176 nread
= cs
.read(buf
, len
);
178 if (nread
== -EAGAIN
) {
180 } else if (nread
== -EINTR
) {
183 ldout(async_msgr
->cct
, 1) << __func__
<< " reading from fd=" << cs
.fd()
184 << " : "<< strerror(nread
) << dendl
;
187 } else if (nread
== 0) {
188 ldout(async_msgr
->cct
, 1) << __func__
<< " peer close file descriptor "
195 // return the remaining bytes, it may larger than the length of ptr
196 // else return < 0 means error
197 ssize_t
AsyncConnection::_try_send(bool more
)
199 if (async_msgr
->cct
->_conf
->ms_inject_socket_failures
&& cs
) {
200 if (rand() % async_msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
201 ldout(async_msgr
->cct
, 0) << __func__
<< " injecting socket failure" << dendl
;
206 assert(center
->in_thread());
207 ssize_t r
= cs
.send(outcoming_bl
, more
);
209 ldout(async_msgr
->cct
, 1) << __func__
<< " send error: " << cpp_strerror(r
) << dendl
;
213 ldout(async_msgr
->cct
, 10) << __func__
<< " sent bytes " << r
214 << " remaining bytes " << outcoming_bl
.length() << dendl
;
216 if (!open_write
&& is_queued()) {
217 center
->create_file_event(cs
.fd(), EVENT_WRITABLE
, write_handler
);
221 if (open_write
&& !is_queued()) {
222 center
->delete_file_event(cs
.fd(), EVENT_WRITABLE
);
224 if (state_after_send
!= STATE_NONE
)
225 center
->dispatch_event_external(read_handler
);
228 return outcoming_bl
.length();
231 // Because this func will be called multi times to populate
232 // the needed buffer, so the passed in bufferptr must be the same.
233 // Normally, only "read_message" will pass existing bufferptr in
235 // And it will uses readahead method to reduce small read overhead,
236 // "recv_buf" is used to store read buffer
238 // return the remaining bytes, 0 means this buffer is finished
239 // else return < 0 means error
240 ssize_t
AsyncConnection::read_until(unsigned len
, char *p
)
242 ldout(async_msgr
->cct
, 25) << __func__
<< " len is " << len
<< " state_offset is "
243 << state_offset
<< dendl
;
245 if (async_msgr
->cct
->_conf
->ms_inject_socket_failures
&& cs
) {
246 if (rand() % async_msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
247 ldout(async_msgr
->cct
, 0) << __func__
<< " injecting socket failure" << dendl
;
253 uint64_t left
= len
- state_offset
;
254 if (recv_end
> recv_start
) {
255 uint64_t to_read
= MIN(recv_end
- recv_start
, left
);
256 memcpy(p
, recv_buf
+recv_start
, to_read
);
257 recv_start
+= to_read
;
259 ldout(async_msgr
->cct
, 25) << __func__
<< " got " << to_read
<< " in buffer "
260 << " left is " << left
<< " buffer still has "
261 << recv_end
- recv_start
<< dendl
;
265 state_offset
+= to_read
;
268 recv_end
= recv_start
= 0;
269 /* nothing left in the prefetch buffer */
270 if (len
> recv_max_prefetch
) {
271 /* this was a large read, we don't prefetch for these */
273 r
= read_bulk(p
+state_offset
, left
);
274 ldout(async_msgr
->cct
, 25) << __func__
<< " read_bulk left is " << left
<< " got " << r
<< dendl
;
276 ldout(async_msgr
->cct
, 1) << __func__
<< " read failed" << dendl
;
278 } else if (r
== static_cast<int>(left
)) {
287 r
= read_bulk(recv_buf
+recv_end
, recv_max_prefetch
);
288 ldout(async_msgr
->cct
, 25) << __func__
<< " read_bulk recv_end is " << recv_end
289 << " left is " << left
<< " got " << r
<< dendl
;
291 ldout(async_msgr
->cct
, 1) << __func__
<< " read failed" << dendl
;
295 if (r
>= static_cast<int>(left
)) {
296 recv_start
= len
- state_offset
;
297 memcpy(p
+state_offset
, recv_buf
, recv_start
);
303 memcpy(p
+state_offset
, recv_buf
, recv_end
-recv_start
);
304 state_offset
+= (recv_end
- recv_start
);
305 recv_end
= recv_start
= 0;
307 ldout(async_msgr
->cct
, 25) << __func__
<< " need len " << len
<< " remaining "
308 << len
- state_offset
<< " bytes" << dendl
;
309 return len
- state_offset
;
312 void AsyncConnection::inject_delay() {
313 if (async_msgr
->cct
->_conf
->ms_inject_internal_delays
) {
314 ldout(async_msgr
->cct
, 10) << __func__
<< " sleep for " <<
315 async_msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
317 t
.set_from_double(async_msgr
->cct
->_conf
->ms_inject_internal_delays
);
322 void AsyncConnection::process()
325 int prev_state
= state
;
326 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
327 utime_t ltt_recv_stamp
= ceph_clock_now();
329 bool need_dispatch_writer
= false;
330 std::lock_guard
<std::mutex
> l(lock
);
331 last_active
= ceph::coarse_mono_clock::now();
332 auto recv_start_time
= ceph::mono_clock::now();
334 ldout(async_msgr
->cct
, 20) << __func__
<< " prev state is " << get_state_name(prev_state
) << dendl
;
340 r
= read_until(sizeof(tag
), &tag
);
342 ldout(async_msgr
->cct
, 1) << __func__
<< " read tag failed" << dendl
;
348 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
349 ldout(async_msgr
->cct
, 20) << __func__
<< " got KEEPALIVE" << dendl
;
350 set_last_keepalive(ceph_clock_now());
351 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
352 state
= STATE_OPEN_KEEPALIVE2
;
353 } else if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
354 state
= STATE_OPEN_KEEPALIVE2_ACK
;
355 } else if (tag
== CEPH_MSGR_TAG_ACK
) {
356 state
= STATE_OPEN_TAG_ACK
;
357 } else if (tag
== CEPH_MSGR_TAG_MSG
) {
358 state
= STATE_OPEN_MESSAGE_HEADER
;
359 } else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
360 state
= STATE_OPEN_TAG_CLOSE
;
362 ldout(async_msgr
->cct
, 0) << __func__
<< " bad tag " << (int)tag
<< dendl
;
369 case STATE_OPEN_KEEPALIVE2
:
372 r
= read_until(sizeof(*t
), state_buffer
);
374 ldout(async_msgr
->cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
380 ldout(async_msgr
->cct
, 30) << __func__
<< " got KEEPALIVE2 tag ..." << dendl
;
381 t
= (ceph_timespec
*)state_buffer
;
382 utime_t kp_t
= utime_t(*t
);
384 _append_keepalive_or_ack(true, &kp_t
);
386 ldout(async_msgr
->cct
, 20) << __func__
<< " got KEEPALIVE2 " << kp_t
<< dendl
;
387 set_last_keepalive(ceph_clock_now());
388 need_dispatch_writer
= true;
393 case STATE_OPEN_KEEPALIVE2_ACK
:
396 r
= read_until(sizeof(*t
), state_buffer
);
398 ldout(async_msgr
->cct
, 1) << __func__
<< " read keeplive timespec failed" << dendl
;
404 t
= (ceph_timespec
*)state_buffer
;
405 set_last_keepalive_ack(utime_t(*t
));
406 ldout(async_msgr
->cct
, 20) << __func__
<< " got KEEPALIVE_ACK" << dendl
;
411 case STATE_OPEN_TAG_ACK
:
414 r
= read_until(sizeof(*seq
), state_buffer
);
416 ldout(async_msgr
->cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
422 seq
= (ceph_le64
*)state_buffer
;
423 ldout(async_msgr
->cct
, 20) << __func__
<< " got ACK" << dendl
;
429 case STATE_OPEN_MESSAGE_HEADER
:
431 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
432 ltt_recv_stamp
= ceph_clock_now();
434 recv_stamp
= ceph_clock_now();
435 ldout(async_msgr
->cct
, 20) << __func__
<< " begin MSG" << dendl
;
436 ceph_msg_header header
;
437 ceph_msg_header_old oldheader
;
438 __u32 header_crc
= 0;
440 if (has_feature(CEPH_FEATURE_NOSRCADDR
))
441 len
= sizeof(header
);
443 len
= sizeof(oldheader
);
445 r
= read_until(len
, state_buffer
);
447 ldout(async_msgr
->cct
, 1) << __func__
<< " read message header failed" << dendl
;
453 ldout(async_msgr
->cct
, 20) << __func__
<< " got MSG header" << dendl
;
455 if (has_feature(CEPH_FEATURE_NOSRCADDR
)) {
456 header
= *((ceph_msg_header
*)state_buffer
);
457 if (msgr
->crcflags
& MSG_CRC_HEADER
)
458 header_crc
= ceph_crc32c(0, (unsigned char *)&header
,
459 sizeof(header
) - sizeof(header
.crc
));
461 oldheader
= *((ceph_msg_header_old
*)state_buffer
);
463 memcpy(&header
, &oldheader
, sizeof(header
));
464 header
.src
= oldheader
.src
.name
;
465 header
.reserved
= oldheader
.reserved
;
466 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
467 header
.crc
= oldheader
.crc
;
468 header_crc
= ceph_crc32c(0, (unsigned char *)&oldheader
, sizeof(oldheader
) - sizeof(oldheader
.crc
));
472 ldout(async_msgr
->cct
, 20) << __func__
<< " got envelope type=" << header
.type
473 << " src " << entity_name_t(header
.src
)
474 << " front=" << header
.front_len
475 << " data=" << header
.data_len
476 << " off " << header
.data_off
<< dendl
;
479 if (msgr
->crcflags
& MSG_CRC_HEADER
&& header_crc
!= header
.crc
) {
480 ldout(async_msgr
->cct
,0) << __func__
<< " got bad header crc "
481 << header_crc
<< " != " << header
.crc
<< dendl
;
490 current_header
= header
;
491 state
= STATE_OPEN_MESSAGE_THROTTLE_MESSAGE
;
495 case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE
:
497 if (policy
.throttler_messages
) {
498 ldout(async_msgr
->cct
, 10) << __func__
<< " wants " << 1 << " message from policy throttler "
499 << policy
.throttler_messages
->get_current() << "/"
500 << policy
.throttler_messages
->get_max() << dendl
;
501 if (!policy
.throttler_messages
->get_or_fail()) {
502 ldout(async_msgr
->cct
, 10) << __func__
<< " wants 1 message from policy throttle "
503 << policy
.throttler_messages
->get_current() << "/"
504 << policy
.throttler_messages
->get_max() << " failed, just wait." << dendl
;
505 // following thread pool deal with th full message queue isn't a
506 // short time, so we can wait a ms.
507 if (register_time_events
.empty())
508 register_time_events
.insert(center
->create_time_event(1000, wakeup_handler
));
513 state
= STATE_OPEN_MESSAGE_THROTTLE_BYTES
;
517 case STATE_OPEN_MESSAGE_THROTTLE_BYTES
:
519 cur_msg_size
= current_header
.front_len
+ current_header
.middle_len
+ current_header
.data_len
;
521 if (policy
.throttler_bytes
) {
522 ldout(async_msgr
->cct
, 10) << __func__
<< " wants " << cur_msg_size
<< " bytes from policy throttler "
523 << policy
.throttler_bytes
->get_current() << "/"
524 << policy
.throttler_bytes
->get_max() << dendl
;
525 if (!policy
.throttler_bytes
->get_or_fail(cur_msg_size
)) {
526 ldout(async_msgr
->cct
, 10) << __func__
<< " wants " << cur_msg_size
<< " bytes from policy throttler "
527 << policy
.throttler_bytes
->get_current() << "/"
528 << policy
.throttler_bytes
->get_max() << " failed, just wait." << dendl
;
529 // following thread pool deal with th full message queue isn't a
530 // short time, so we can wait a ms.
531 if (register_time_events
.empty())
532 register_time_events
.insert(center
->create_time_event(1000, wakeup_handler
));
538 state
= STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE
;
542 case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE
:
545 if (!dispatch_queue
->dispatch_throttler
.get_or_fail(cur_msg_size
)) {
546 ldout(async_msgr
->cct
, 10) << __func__
<< " wants " << cur_msg_size
<< " bytes from dispatch throttle "
547 << dispatch_queue
->dispatch_throttler
.get_current() << "/"
548 << dispatch_queue
->dispatch_throttler
.get_max() << " failed, just wait." << dendl
;
549 // following thread pool deal with th full message queue isn't a
550 // short time, so we can wait a ms.
551 if (register_time_events
.empty())
552 register_time_events
.insert(center
->create_time_event(1000, wakeup_handler
));
557 throttle_stamp
= ceph_clock_now();
558 state
= STATE_OPEN_MESSAGE_READ_FRONT
;
562 case STATE_OPEN_MESSAGE_READ_FRONT
:
565 unsigned front_len
= current_header
.front_len
;
568 front
.push_back(buffer::create(front_len
));
570 r
= read_until(front_len
, front
.c_str());
572 ldout(async_msgr
->cct
, 1) << __func__
<< " read message front failed" << dendl
;
578 ldout(async_msgr
->cct
, 20) << __func__
<< " got front " << front
.length() << dendl
;
580 state
= STATE_OPEN_MESSAGE_READ_MIDDLE
;
583 case STATE_OPEN_MESSAGE_READ_MIDDLE
:
586 unsigned middle_len
= current_header
.middle_len
;
588 if (!middle
.length())
589 middle
.push_back(buffer::create(middle_len
));
591 r
= read_until(middle_len
, middle
.c_str());
593 ldout(async_msgr
->cct
, 1) << __func__
<< " read message middle failed" << dendl
;
598 ldout(async_msgr
->cct
, 20) << __func__
<< " got middle " << middle
.length() << dendl
;
601 state
= STATE_OPEN_MESSAGE_READ_DATA_PREPARE
;
604 case STATE_OPEN_MESSAGE_READ_DATA_PREPARE
:
607 unsigned data_len
= le32_to_cpu(current_header
.data_len
);
608 unsigned data_off
= le32_to_cpu(current_header
.data_off
);
611 map
<ceph_tid_t
,pair
<bufferlist
,int> >::iterator p
= rx_buffers
.find(current_header
.tid
);
612 if (p
!= rx_buffers
.end()) {
613 ldout(async_msgr
->cct
,10) << __func__
<< " seleting rx buffer v " << p
->second
.second
614 << " at offset " << data_off
615 << " len " << p
->second
.first
.length() << dendl
;
616 data_buf
= p
->second
.first
;
617 // make sure it's big enough
618 if (data_buf
.length() < data_len
)
619 data_buf
.push_back(buffer::create(data_len
- data_buf
.length()));
620 data_blp
= data_buf
.begin();
622 ldout(async_msgr
->cct
,20) << __func__
<< " allocating new rx buffer at offset " << data_off
<< dendl
;
623 alloc_aligned_buffer(data_buf
, data_len
, data_off
);
624 data_blp
= data_buf
.begin();
629 state
= STATE_OPEN_MESSAGE_READ_DATA
;
632 case STATE_OPEN_MESSAGE_READ_DATA
:
634 while (msg_left
> 0) {
635 bufferptr bp
= data_blp
.get_current_ptr();
636 unsigned read
= MIN(bp
.length(), msg_left
);
637 r
= read_until(read
, bp
.c_str());
639 ldout(async_msgr
->cct
, 1) << __func__
<< " read data error " << dendl
;
645 data_blp
.advance(read
);
646 data
.append(bp
, 0, read
);
653 state
= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
;
656 case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
:
658 ceph_msg_footer footer
;
659 ceph_msg_footer_old old_footer
;
662 if (has_feature(CEPH_FEATURE_MSG_AUTH
))
663 len
= sizeof(footer
);
665 len
= sizeof(old_footer
);
667 r
= read_until(len
, state_buffer
);
669 ldout(async_msgr
->cct
, 1) << __func__
<< " read footer data error " << dendl
;
675 if (has_feature(CEPH_FEATURE_MSG_AUTH
)) {
676 footer
= *((ceph_msg_footer
*)state_buffer
);
678 old_footer
= *((ceph_msg_footer_old
*)state_buffer
);
679 footer
.front_crc
= old_footer
.front_crc
;
680 footer
.middle_crc
= old_footer
.middle_crc
;
681 footer
.data_crc
= old_footer
.data_crc
;
683 footer
.flags
= old_footer
.flags
;
685 int aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
686 ldout(async_msgr
->cct
, 10) << __func__
<< " aborted = " << aborted
<< dendl
;
688 ldout(async_msgr
->cct
, 0) << __func__
<< " got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
689 << " byte message.. ABORTED" << dendl
;
693 ldout(async_msgr
->cct
, 20) << __func__
<< " got " << front
.length() << " + " << middle
.length()
694 << " + " << data
.length() << " byte message" << dendl
;
695 Message
*message
= decode_message(async_msgr
->cct
, async_msgr
->crcflags
, current_header
, footer
,
696 front
, middle
, data
, this);
698 ldout(async_msgr
->cct
, 1) << __func__
<< " decode message failed " << dendl
;
703 // Check the signature if one should be present. A zero return indicates success. PLR
706 if (session_security
.get() == NULL
) {
707 ldout(async_msgr
->cct
, 10) << __func__
<< " no session security set" << dendl
;
709 if (session_security
->check_message_signature(message
)) {
710 ldout(async_msgr
->cct
, 0) << __func__
<< " Signature check failed" << dendl
;
715 message
->set_byte_throttler(policy
.throttler_bytes
);
716 message
->set_message_throttler(policy
.throttler_messages
);
718 // store reservation size in message, so we don't get confused
719 // by messages entering the dispatch queue through other paths.
720 message
->set_dispatch_throttle_size(cur_msg_size
);
722 message
->set_recv_stamp(recv_stamp
);
723 message
->set_throttle_stamp(throttle_stamp
);
724 message
->set_recv_complete_stamp(ceph_clock_now());
726 // check received seq#. if it is old, drop the message.
727 // note that incoming messages may skip ahead. this is convenient for the client
728 // side queueing because messages can't be renumbered, but the (kernel) client will
729 // occasionally pull a message out of the sent queue to send elsewhere. in that case
730 // it doesn't matter if we "got" it or not.
731 uint64_t cur_seq
= in_seq
;
732 if (message
->get_seq() <= cur_seq
) {
733 ldout(async_msgr
->cct
,0) << __func__
<< " got old message "
734 << message
->get_seq() << " <= " << cur_seq
<< " " << message
<< " " << *message
735 << ", discarding" << dendl
;
737 if (has_feature(CEPH_FEATURE_RECONNECT_SEQ
) && async_msgr
->cct
->_conf
->ms_die_on_old_message
)
738 assert(0 == "old msgs despite reconnect_seq feature");
741 if (message
->get_seq() > cur_seq
+ 1) {
742 ldout(async_msgr
->cct
, 0) << __func__
<< " missed message? skipped from seq "
743 << cur_seq
<< " to " << message
->get_seq() << dendl
;
744 if (async_msgr
->cct
->_conf
->ms_die_on_skipped_message
)
745 assert(0 == "skipped incoming seq");
748 message
->set_connection(this);
750 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
751 if (message
->get_type() == CEPH_MSG_OSD_OP
|| message
->get_type() == CEPH_MSG_OSD_OPREPLY
) {
752 utime_t ltt_processed_stamp
= ceph_clock_now();
753 double usecs_elapsed
= (ltt_processed_stamp
.to_nsec()-ltt_recv_stamp
.to_nsec())/1000;
755 if (message
->get_type() == CEPH_MSG_OSD_OP
)
756 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OP", false);
758 OID_ELAPSED_WITH_MSG(message
, usecs_elapsed
, "TIME_TO_DECODE_OSD_OPREPLY", false);
762 // note last received message.
763 in_seq
= message
->get_seq();
764 ldout(async_msgr
->cct
, 5) << " rx " << message
->get_source() << " seq "
765 << message
->get_seq() << " " << message
766 << " " << *message
<< dendl
;
770 need_dispatch_writer
= true;
774 logger
->inc(l_msgr_recv_messages
);
775 logger
->inc(l_msgr_recv_bytes
, cur_msg_size
+ sizeof(ceph_msg_header
) + sizeof(ceph_msg_footer
));
777 async_msgr
->ms_fast_preprocess(message
);
778 auto fast_dispatch_time
= ceph::mono_clock::now();
779 logger
->tinc(l_msgr_running_recv_time
, fast_dispatch_time
- recv_start_time
);
781 utime_t release
= message
->get_recv_stamp();
782 double delay_period
= 0;
783 if (rand() % 10000 < async_msgr
->cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
784 delay_period
= async_msgr
->cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
785 release
+= delay_period
;
786 ldout(async_msgr
->cct
, 1) << "queue_received will delay until " << release
<< " on "
787 << message
<< " " << *message
<< dendl
;
789 delay_state
->queue(delay_period
, release
, message
);
790 } else if (async_msgr
->ms_can_fast_dispatch(message
)) {
792 dispatch_queue
->fast_dispatch(message
);
793 recv_start_time
= ceph::mono_clock::now();
794 logger
->tinc(l_msgr_running_fast_dispatch_time
,
795 recv_start_time
- fast_dispatch_time
);
798 dispatch_queue
->enqueue(message
, message
->get_priority(), conn_id
);
801 // clean up local buffer references
810 case STATE_OPEN_TAG_CLOSE
:
812 ldout(async_msgr
->cct
, 20) << __func__
<< " got CLOSE" << dendl
;
819 ldout(async_msgr
->cct
, 20) << __func__
<< " enter STANDY" << dendl
;
826 ldout(async_msgr
->cct
, 20) << __func__
<< " enter none state" << dendl
;
832 ldout(async_msgr
->cct
, 20) << __func__
<< " socket closed" << dendl
;
838 ldout(async_msgr
->cct
, 1) << __func__
<< " enter wait state, failing" << dendl
;
844 if (_process_connection() < 0)
849 } while (prev_state
!= state
);
851 if (need_dispatch_writer
&& is_connected())
852 center
->dispatch_event_external(write_handler
);
854 logger
->tinc(l_msgr_running_recv_time
, ceph::mono_clock::now() - recv_start_time
);
861 ssize_t
AsyncConnection::_process_connection()
866 case STATE_WAIT_SEND
:
868 std::lock_guard
<std::mutex
> l(write_lock
);
869 if (!outcoming_bl
.length()) {
870 assert(state_after_send
);
871 state
= state_after_send
;
872 state_after_send
= STATE_NONE
;
877 case STATE_CONNECTING
:
879 assert(!policy
.server
);
881 // reset connect state variables
884 authorizer_buf
.clear();
885 memset(&connect_msg
, 0, sizeof(connect_msg
));
886 memset(&connect_reply
, 0, sizeof(connect_reply
));
888 global_seq
= async_msgr
->get_global_seq();
889 // close old socket. this is safe because we stopped the reader thread above.
891 center
->delete_file_event(cs
.fd(), EVENT_READABLE
|EVENT_WRITABLE
);
896 opts
.priority
= async_msgr
->get_socket_priority();
897 opts
.connect_bind_addr
= msgr
->get_myaddr();
898 r
= worker
->connect(get_peer_addr(), opts
, &cs
);
902 center
->create_file_event(cs
.fd(), EVENT_READABLE
, read_handler
);
903 state
= STATE_CONNECTING_RE
;
907 case STATE_CONNECTING_RE
:
909 r
= cs
.is_connected();
911 ldout(async_msgr
->cct
, 1) << __func__
<< " reconnect failed " << dendl
;
912 if (r
== -ECONNREFUSED
) {
913 ldout(async_msgr
->cct
, 2) << __func__
<< " connection refused!" << dendl
;
914 dispatch_queue
->queue_refused(this);
918 ldout(async_msgr
->cct
, 10) << __func__
<< " nonblock connect inprogress" << dendl
;
919 if (async_msgr
->get_stack()->nonblock_connect_need_writable_event())
920 center
->create_file_event(cs
.fd(), EVENT_WRITABLE
, read_handler
);
924 center
->delete_file_event(cs
.fd(), EVENT_WRITABLE
);
925 ldout(async_msgr
->cct
, 10) << __func__
<< " connect successfully, ready to send banner" << dendl
;
928 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
931 state
= STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
;
932 ldout(async_msgr
->cct
, 10) << __func__
<< " connect write banner done: "
933 << get_peer_addr() << dendl
;
935 state
= STATE_WAIT_SEND
;
936 state_after_send
= STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
;
937 ldout(async_msgr
->cct
, 10) << __func__
<< " connect wait for write banner: "
938 << get_peer_addr() << dendl
;
946 case STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
:
948 entity_addr_t paddr
, peer_addr_for_me
;
950 unsigned banner_len
= strlen(CEPH_BANNER
);
951 unsigned need_len
= banner_len
+ sizeof(ceph_entity_addr
)*2;
952 r
= read_until(need_len
, state_buffer
);
954 ldout(async_msgr
->cct
, 1) << __func__
<< " read banner and identify addresses failed" << dendl
;
960 if (memcmp(state_buffer
, CEPH_BANNER
, banner_len
)) {
961 ldout(async_msgr
->cct
, 0) << __func__
<< " connect protocol error (bad banner) on peer "
962 << get_peer_addr() << dendl
;
967 bl
.append(state_buffer
+banner_len
, sizeof(ceph_entity_addr
)*2);
968 bufferlist::iterator p
= bl
.begin();
971 ::decode(peer_addr_for_me
, p
);
972 } catch (const buffer::error
& e
) {
973 lderr(async_msgr
->cct
) << __func__
<< " decode peer addr failed " << dendl
;
976 ldout(async_msgr
->cct
, 20) << __func__
<< " connect read peer addr "
977 << paddr
<< " on socket " << cs
.fd() << dendl
;
978 if (peer_addr
!= paddr
) {
979 if (paddr
.is_blank_ip() && peer_addr
.get_port() == paddr
.get_port() &&
980 peer_addr
.get_nonce() == paddr
.get_nonce()) {
981 ldout(async_msgr
->cct
, 0) << __func__
<< " connect claims to be " << paddr
982 << " not " << peer_addr
983 << " - presumably this is the same node!" << dendl
;
985 ldout(async_msgr
->cct
, 10) << __func__
<< " connect claims to be "
986 << paddr
<< " not " << peer_addr
<< dendl
;
991 ldout(async_msgr
->cct
, 20) << __func__
<< " connect peer addr for me is " << peer_addr_for_me
<< dendl
;
993 async_msgr
->learned_addr(peer_addr_for_me
);
994 if (async_msgr
->cct
->_conf
->ms_inject_internal_delays
995 && async_msgr
->cct
->_conf
->ms_inject_socket_failures
) {
996 if (rand() % async_msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
997 ldout(msgr
->cct
, 10) << __func__
<< " sleep for "
998 << async_msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1000 t
.set_from_double(async_msgr
->cct
->_conf
->ms_inject_internal_delays
);
1006 if (state
!= STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
) {
1007 ldout(async_msgr
->cct
, 1) << __func__
<< " state changed while learned_addr, mark_down or "
1008 << " replacing must be happened just now" << dendl
;
1012 ::encode(async_msgr
->get_myaddr(), myaddrbl
, 0); // legacy
1013 r
= try_send(myaddrbl
);
1015 state
= STATE_CONNECTING_SEND_CONNECT_MSG
;
1016 ldout(async_msgr
->cct
, 10) << __func__
<< " connect sent my addr "
1017 << async_msgr
->get_myaddr() << dendl
;
1019 state
= STATE_WAIT_SEND
;
1020 state_after_send
= STATE_CONNECTING_SEND_CONNECT_MSG
;
1021 ldout(async_msgr
->cct
, 10) << __func__
<< " connect send my addr done: "
1022 << async_msgr
->get_myaddr() << dendl
;
1024 ldout(async_msgr
->cct
, 2) << __func__
<< " connect couldn't write my addr, "
1025 << cpp_strerror(r
) << dendl
;
1032 case STATE_CONNECTING_SEND_CONNECT_MSG
:
1035 authorizer
= async_msgr
->get_authorizer(peer_type
, false);
1039 connect_msg
.features
= policy
.features_supported
;
1040 connect_msg
.host_type
= async_msgr
->get_myinst().name
.type();
1041 connect_msg
.global_seq
= global_seq
;
1042 connect_msg
.connect_seq
= connect_seq
;
1043 connect_msg
.protocol_version
= async_msgr
->get_proto_version(peer_type
, true);
1044 connect_msg
.authorizer_protocol
= authorizer
? authorizer
->protocol
: 0;
1045 connect_msg
.authorizer_len
= authorizer
? authorizer
->bl
.length() : 0;
1047 ldout(async_msgr
->cct
, 10) << __func__
<< " connect_msg.authorizer_len="
1048 << connect_msg
.authorizer_len
<< " protocol="
1049 << connect_msg
.authorizer_protocol
<< dendl
;
1050 connect_msg
.flags
= 0;
1052 connect_msg
.flags
|= CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1053 bl
.append((char*)&connect_msg
, sizeof(connect_msg
));
1055 bl
.append(authorizer
->bl
.c_str(), authorizer
->bl
.length());
1057 ldout(async_msgr
->cct
, 10) << __func__
<< " connect sending gseq=" << global_seq
<< " cseq="
1058 << connect_seq
<< " proto=" << connect_msg
.protocol_version
<< dendl
;
1062 state
= STATE_CONNECTING_WAIT_CONNECT_REPLY
;
1063 ldout(async_msgr
->cct
,20) << __func__
<< " connect wrote (self +) cseq, waiting for reply" << dendl
;
1065 state
= STATE_WAIT_SEND
;
1066 state_after_send
= STATE_CONNECTING_WAIT_CONNECT_REPLY
;
1067 ldout(async_msgr
->cct
, 10) << __func__
<< " continue send reply " << dendl
;
1069 ldout(async_msgr
->cct
, 2) << __func__
<< " connect couldn't send reply "
1070 << cpp_strerror(r
) << dendl
;
1077 case STATE_CONNECTING_WAIT_CONNECT_REPLY
:
1079 r
= read_until(sizeof(connect_reply
), state_buffer
);
1081 ldout(async_msgr
->cct
, 1) << __func__
<< " read connect reply failed" << dendl
;
1087 connect_reply
= *((ceph_msg_connect_reply
*)state_buffer
);
1089 ldout(async_msgr
->cct
, 20) << __func__
<< " connect got reply tag " << (int)connect_reply
.tag
1090 << " connect_seq " << connect_reply
.connect_seq
<< " global_seq "
1091 << connect_reply
.global_seq
<< " proto " << connect_reply
.protocol_version
1092 << " flags " << (int)connect_reply
.flags
<< " features "
1093 << connect_reply
.features
<< dendl
;
1094 state
= STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH
;
1099 case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH
:
1101 bufferlist authorizer_reply
;
1102 if (connect_reply
.authorizer_len
) {
1103 ldout(async_msgr
->cct
, 10) << __func__
<< " reply.authorizer_len=" << connect_reply
.authorizer_len
<< dendl
;
1104 assert(connect_reply
.authorizer_len
< 4096);
1105 r
= read_until(connect_reply
.authorizer_len
, state_buffer
);
1107 ldout(async_msgr
->cct
, 1) << __func__
<< " read connect reply authorizer failed" << dendl
;
1113 authorizer_reply
.append(state_buffer
, connect_reply
.authorizer_len
);
1115 if (connect_reply
.tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
) {
1116 ldout(async_msgr
->cct
,10) << __func__
<< " connect got auth challenge" << dendl
;
1117 authorizer
->add_challenge(async_msgr
->cct
, authorizer_reply
);
1118 state
= STATE_CONNECTING_SEND_CONNECT_MSG
;
1122 auto iter
= authorizer_reply
.begin();
1123 if (authorizer
&& !authorizer
->verify_reply(iter
)) {
1124 ldout(async_msgr
->cct
, 0) << __func__
<< " failed verifying authorize reply" << dendl
;
1128 r
= handle_connect_reply(connect_msg
, connect_reply
);
1132 // state must be changed!
1133 assert(state
!= STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH
);
1137 case STATE_CONNECTING_WAIT_ACK_SEQ
:
1139 uint64_t newly_acked_seq
= 0;
1141 r
= read_until(sizeof(newly_acked_seq
), state_buffer
);
1143 ldout(async_msgr
->cct
, 1) << __func__
<< " read connect ack seq failed" << dendl
;
1149 newly_acked_seq
= *((uint64_t*)state_buffer
);
1150 ldout(async_msgr
->cct
, 2) << __func__
<< " got newly_acked_seq " << newly_acked_seq
1151 << " vs out_seq " << out_seq
<< dendl
;
1152 discard_requeued_up_to(newly_acked_seq
);
1153 //while (newly_acked_seq > out_seq.read()) {
1154 // Message *m = _get_next_outgoing(NULL);
1156 // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
1157 // << " " << *m << dendl;
1158 // assert(m->get_seq() <= newly_acked_seq);
1164 uint64_t s
= in_seq
;
1165 bl
.append((char*)&s
, sizeof(s
));
1168 state
= STATE_CONNECTING_READY
;
1169 ldout(async_msgr
->cct
, 10) << __func__
<< " send in_seq done " << dendl
;
1171 state_after_send
= STATE_CONNECTING_READY
;
1172 state
= STATE_WAIT_SEND
;
1173 ldout(async_msgr
->cct
, 10) << __func__
<< " continue send in_seq " << dendl
;
1180 case STATE_CONNECTING_READY
:
1183 peer_global_seq
= connect_reply
.global_seq
;
1184 policy
.lossy
= connect_reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1188 assert(connect_seq
== connect_reply
.connect_seq
);
1189 backoff
= utime_t();
1190 set_features((uint64_t)connect_reply
.features
& (uint64_t)connect_msg
.features
);
1191 ldout(async_msgr
->cct
, 10) << __func__
<< " connect success " << connect_seq
1192 << ", lossy = " << policy
.lossy
<< ", features "
1193 << get_features() << dendl
;
1195 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1197 if (authorizer
!= NULL
) {
1198 session_security
.reset(
1199 get_auth_session_handler(async_msgr
->cct
,
1200 authorizer
->protocol
,
1201 authorizer
->session_key
,
1204 // We have no authorizer, so we shouldn't be applying security to messages in this AsyncConnection. PLR
1205 session_security
.reset();
1209 assert(delay_state
->ready());
1210 dispatch_queue
->queue_connect(this);
1211 async_msgr
->ms_deliver_handle_fast_connect(this);
1213 // make sure no pending tick timer
1215 center
->delete_time_event(last_tick_id
);
1216 last_tick_id
= center
->create_time_event(inactive_timeout_us
, tick_handler
);
1218 // message may in queue between last _try_send and connection ready
1219 // write event may already notify and we need to force scheduler again
1221 can_write
= WriteStatus::CANWRITE
;
1223 center
->dispatch_event_external(write_handler
);
1224 write_lock
.unlock();
1225 maybe_start_delay_thread();
1229 case STATE_ACCEPTING
:
1232 center
->create_file_event(cs
.fd(), EVENT_READABLE
, read_handler
);
1234 bl
.append(CEPH_BANNER
, strlen(CEPH_BANNER
));
1236 ::encode(async_msgr
->get_myaddr(), bl
, 0); // legacy
1237 port
= async_msgr
->get_myaddr().get_port();
1238 ::encode(socket_addr
, bl
, 0); // legacy
1239 ldout(async_msgr
->cct
, 1) << __func__
<< " sd=" << cs
.fd() << " " << socket_addr
<< dendl
;
1243 state
= STATE_ACCEPTING_WAIT_BANNER_ADDR
;
1244 ldout(async_msgr
->cct
, 10) << __func__
<< " write banner and addr done: "
1245 << get_peer_addr() << dendl
;
1247 state
= STATE_WAIT_SEND
;
1248 state_after_send
= STATE_ACCEPTING_WAIT_BANNER_ADDR
;
1249 ldout(async_msgr
->cct
, 10) << __func__
<< " wait for write banner and addr: "
1250 << get_peer_addr() << dendl
;
1257 case STATE_ACCEPTING_WAIT_BANNER_ADDR
:
1260 entity_addr_t peer_addr
;
1262 r
= read_until(strlen(CEPH_BANNER
) + sizeof(ceph_entity_addr
), state_buffer
);
1264 ldout(async_msgr
->cct
, 1) << __func__
<< " read peer banner and addr failed" << dendl
;
1270 if (memcmp(state_buffer
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1271 ldout(async_msgr
->cct
, 1) << __func__
<< " accept peer sent bad banner '" << state_buffer
1272 << "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
1276 addr_bl
.append(state_buffer
+strlen(CEPH_BANNER
), sizeof(ceph_entity_addr
));
1278 bufferlist::iterator ti
= addr_bl
.begin();
1279 ::decode(peer_addr
, ti
);
1280 } catch (const buffer::error
& e
) {
1281 lderr(async_msgr
->cct
) << __func__
<< " decode peer_addr failed " << dendl
;
1285 ldout(async_msgr
->cct
, 10) << __func__
<< " accept peer addr is " << peer_addr
<< dendl
;
1286 if (peer_addr
.is_blank_ip()) {
1287 // peer apparently doesn't know what ip they have; figure it out for them.
1288 int port
= peer_addr
.get_port();
1289 peer_addr
.u
= socket_addr
.u
;
1290 peer_addr
.set_port(port
);
1291 ldout(async_msgr
->cct
, 0) << __func__
<< " accept peer addr is really " << peer_addr
1292 << " (socket is " << socket_addr
<< ")" << dendl
;
1294 set_peer_addr(peer_addr
); // so that connection_state gets set up
1295 state
= STATE_ACCEPTING_WAIT_CONNECT_MSG
;
1299 case STATE_ACCEPTING_WAIT_CONNECT_MSG
:
1301 r
= read_until(sizeof(connect_msg
), state_buffer
);
1303 ldout(async_msgr
->cct
, 1) << __func__
<< " read connect msg failed" << dendl
;
1309 connect_msg
= *((ceph_msg_connect
*)state_buffer
);
1310 state
= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
;
1314 case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
:
1316 bufferlist authorizer_reply
;
1318 if (connect_msg
.authorizer_len
) {
1319 if (!authorizer_buf
.length())
1320 authorizer_buf
.push_back(buffer::create(connect_msg
.authorizer_len
));
1322 r
= read_until(connect_msg
.authorizer_len
, authorizer_buf
.c_str());
1324 ldout(async_msgr
->cct
, 1) << __func__
<< " read connect authorizer failed" << dendl
;
1331 ldout(async_msgr
->cct
, 20) << __func__
<< " accept got peer connect_seq "
1332 << connect_msg
.connect_seq
<< " global_seq "
1333 << connect_msg
.global_seq
<< dendl
;
1334 set_peer_type(connect_msg
.host_type
);
1335 policy
= async_msgr
->get_policy(connect_msg
.host_type
);
1336 ldout(async_msgr
->cct
, 10) << __func__
<< " accept of host_type " << connect_msg
.host_type
1337 << ", policy.lossy=" << policy
.lossy
<< " policy.server="
1338 << policy
.server
<< " policy.standby=" << policy
.standby
1339 << " policy.resetcheck=" << policy
.resetcheck
<< dendl
;
1341 r
= handle_connect_msg(connect_msg
, authorizer_buf
, authorizer_reply
);
1345 // state is changed by "handle_connect_msg"
1346 assert(state
!= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
);
1350 case STATE_ACCEPTING_WAIT_SEQ
:
1352 uint64_t newly_acked_seq
;
1353 r
= read_until(sizeof(newly_acked_seq
), state_buffer
);
1355 ldout(async_msgr
->cct
, 1) << __func__
<< " read ack seq failed" << dendl
;
1356 goto fail_registered
;
1361 newly_acked_seq
= *((uint64_t*)state_buffer
);
1362 ldout(async_msgr
->cct
, 2) << __func__
<< " accept get newly_acked_seq " << newly_acked_seq
<< dendl
;
1363 discard_requeued_up_to(newly_acked_seq
);
1364 state
= STATE_ACCEPTING_READY
;
1368 case STATE_ACCEPTING_READY
:
1370 ldout(async_msgr
->cct
, 20) << __func__
<< " accept done" << dendl
;
1372 memset(&connect_msg
, 0, sizeof(connect_msg
));
1375 assert(delay_state
->ready());
1376 // make sure no pending tick timer
1378 center
->delete_time_event(last_tick_id
);
1379 last_tick_id
= center
->create_time_event(inactive_timeout_us
, tick_handler
);
1382 can_write
= WriteStatus::CANWRITE
;
1384 center
->dispatch_event_external(write_handler
);
1385 write_lock
.unlock();
1386 maybe_start_delay_thread();
1392 lderr(async_msgr
->cct
) << __func__
<< " bad state: " << state
<< dendl
;
1400 ldout(async_msgr
->cct
, 10) << "accept fault after register" << dendl
;
1407 int AsyncConnection::handle_connect_reply(ceph_msg_connect
&connect
, ceph_msg_connect_reply
&reply
)
1409 uint64_t feat_missing
;
1410 if (reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1411 ldout(async_msgr
->cct
, 0) << __func__
<< " connect protocol feature mismatch, my "
1412 << std::hex
<< connect
.features
<< " < peer "
1413 << reply
.features
<< " missing "
1414 << (reply
.features
& ~policy
.features_supported
)
1415 << std::dec
<< dendl
;
1419 if (reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1420 ldout(async_msgr
->cct
, 0) << __func__
<< " connect protocol version mismatch, my "
1421 << connect
.protocol_version
<< " != " << reply
.protocol_version
1426 if (reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1427 ldout(async_msgr
->cct
,0) << __func__
<< " connect got BADAUTHORIZER" << dendl
;
1430 if (reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1431 ldout(async_msgr
->cct
, 0) << __func__
<< " connect got RESETSESSION" << dendl
;
1432 was_session_reset();
1433 // see was_session_reset
1434 outcoming_bl
.clear();
1435 state
= STATE_CONNECTING_SEND_CONNECT_MSG
;
1437 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1438 global_seq
= async_msgr
->get_global_seq(reply
.global_seq
);
1439 ldout(async_msgr
->cct
, 5) << __func__
<< " connect got RETRY_GLOBAL "
1440 << reply
.global_seq
<< " chose new "
1441 << global_seq
<< dendl
;
1442 state
= STATE_CONNECTING_SEND_CONNECT_MSG
;
1444 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1445 assert(reply
.connect_seq
> connect_seq
);
1446 ldout(async_msgr
->cct
, 5) << __func__
<< " connect got RETRY_SESSION "
1447 << connect_seq
<< " -> "
1448 << reply
.connect_seq
<< dendl
;
1449 connect_seq
= reply
.connect_seq
;
1450 state
= STATE_CONNECTING_SEND_CONNECT_MSG
;
1452 if (reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1453 ldout(async_msgr
->cct
, 1) << __func__
<< " connect got WAIT (connection race)" << dendl
;
1457 feat_missing
= policy
.features_required
& ~(uint64_t)connect_reply
.features
;
1459 ldout(async_msgr
->cct
, 1) << __func__
<< " missing required features " << std::hex
1460 << feat_missing
<< std::dec
<< dendl
;
1464 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1465 ldout(async_msgr
->cct
, 10) << __func__
<< " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl
;
1466 state
= STATE_CONNECTING_WAIT_ACK_SEQ
;
1468 if (reply
.tag
== CEPH_MSGR_TAG_READY
) {
1469 ldout(async_msgr
->cct
, 10) << __func__
<< " got CEPH_MSGR_TAG_READY " << dendl
;
1470 state
= STATE_CONNECTING_READY
;
1479 ssize_t
AsyncConnection::handle_connect_msg(ceph_msg_connect
&connect
, bufferlist
&authorizer_bl
,
1480 bufferlist
&authorizer_reply
)
1483 ceph_msg_connect_reply reply
;
1484 bufferlist reply_bl
;
1486 memset(&reply
, 0, sizeof(reply
));
1487 reply
.protocol_version
= async_msgr
->get_proto_version(peer_type
, false);
1490 ldout(async_msgr
->cct
, 10) << __func__
<< " accept my proto " << reply
.protocol_version
1491 << ", their proto " << connect
.protocol_version
<< dendl
;
1492 if (connect
.protocol_version
!= reply
.protocol_version
) {
1493 return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER
, connect
, reply
, authorizer_reply
);
1495 // require signatures for cephx?
1496 if (connect
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
1497 if (peer_type
== CEPH_ENTITY_TYPE_OSD
||
1498 peer_type
== CEPH_ENTITY_TYPE_MDS
||
1499 peer_type
== CEPH_ENTITY_TYPE_MGR
) {
1500 if (async_msgr
->cct
->_conf
->cephx_require_signatures
||
1501 async_msgr
->cct
->_conf
->cephx_cluster_require_signatures
) {
1502 ldout(async_msgr
->cct
, 10) << __func__
<< " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl
;
1503 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
1505 if (async_msgr
->cct
->_conf
->cephx_require_version
>= 2 ||
1506 async_msgr
->cct
->_conf
->cephx_cluster_require_version
>= 2) {
1507 ldout(async_msgr
->cct
, 10) << __func__
<< " using cephx, requiring cephx v2 feature bit for cluster" << dendl
;
1508 policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
1511 if (async_msgr
->cct
->_conf
->cephx_require_signatures
||
1512 async_msgr
->cct
->_conf
->cephx_service_require_signatures
) {
1513 ldout(async_msgr
->cct
, 10) << __func__
<< " using cephx, requiring MSG_AUTH feature bit for service" << dendl
;
1514 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
1516 if (async_msgr
->cct
->_conf
->cephx_require_version
>= 2 ||
1517 async_msgr
->cct
->_conf
->cephx_service_require_version
>= 2) {
1518 ldout(async_msgr
->cct
, 10) << __func__
<< " using cephx, requiring cephx v2 feature bit for service" << dendl
;
1519 policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
1524 uint64_t feat_missing
= policy
.features_required
& ~(uint64_t)connect
.features
;
1526 ldout(async_msgr
->cct
, 1) << __func__
<< " peer missing required features "
1527 << std::hex
<< feat_missing
<< std::dec
<< dendl
;
1528 return _reply_accept(CEPH_MSGR_TAG_FEATURES
, connect
, reply
, authorizer_reply
);
1533 bool authorizer_valid
;
1534 bool need_challenge
= HAVE_FEATURE(connect
.features
, CEPHX_V2
);
1535 bool had_challenge
= (bool)authorizer_challenge
;
1536 if (!async_msgr
->verify_authorizer(
1537 this, peer_type
, connect
.authorizer_protocol
, authorizer_bl
,
1538 authorizer_reply
, authorizer_valid
, session_key
,
1539 need_challenge
? &authorizer_challenge
: nullptr) ||
1540 !authorizer_valid
) {
1542 if (state
!= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
1543 ldout(async_msgr
->cct
, 1) << __func__
1544 << " state changed while verify_authorizer,"
1545 << " it must be mark_down"
1547 ceph_assert(state
== STATE_CLOSED
);
1551 if (need_challenge
&& !had_challenge
&& authorizer_challenge
) {
1552 ldout(async_msgr
->cct
,10) << __func__
<< ": challenging authorizer"
1554 assert(authorizer_reply
.length());
1555 tag
= CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
;
1557 ldout(async_msgr
->cct
,0) << __func__
<< ": got bad authorizer" << dendl
;
1558 tag
= CEPH_MSGR_TAG_BADAUTHORIZER
;
1560 session_security
.reset();
1561 return _reply_accept(tag
, connect
, reply
, authorizer_reply
);
1564 // We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR
1565 ldout(async_msgr
->cct
, 10) << __func__
<< " accept setting up session_security." << dendl
;
1568 AsyncConnectionRef existing
= async_msgr
->lookup_conn(peer_addr
);
1573 if (state
!= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
1574 ldout(async_msgr
->cct
, 1) << __func__
<< " state changed while accept, it must be mark_down" << dendl
;
1575 assert(state
== STATE_CLOSED
);
1579 if (existing
== this)
1582 // There is no possible that existing connection will acquire this
1583 // connection's lock
1584 existing
->lock
.lock(); // skip lockdep check (we are locking a second AsyncConnection here)
1586 if (existing
->state
== STATE_CLOSED
) {
1587 ldout(async_msgr
->cct
, 1) << __func__
<< " existing already closed." << dendl
;
1588 existing
->lock
.unlock();
1593 if (existing
->replacing
) {
1594 ldout(async_msgr
->cct
, 1) << __func__
<< " existing racing replace happened while replacing."
1595 << " existing_state=" << get_state_name(existing
->state
) << dendl
;
1596 reply
.global_seq
= existing
->peer_global_seq
;
1597 r
= _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL
, connect
, reply
, authorizer_reply
);
1598 existing
->lock
.unlock();
1604 if (connect
.global_seq
< existing
->peer_global_seq
) {
1605 ldout(async_msgr
->cct
, 10) << __func__
<< " accept existing " << existing
1606 << ".gseq " << existing
->peer_global_seq
<< " > "
1607 << connect
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
1608 reply
.global_seq
= existing
->peer_global_seq
; // so we can send it below..
1609 existing
->lock
.unlock();
1610 return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL
, connect
, reply
, authorizer_reply
);
1612 ldout(async_msgr
->cct
, 10) << __func__
<< " accept existing " << existing
1613 << ".gseq " << existing
->peer_global_seq
1614 << " <= " << connect
.global_seq
<< ", looks ok" << dendl
;
1617 if (existing
->policy
.lossy
) {
1618 ldout(async_msgr
->cct
, 0) << __func__
<< " accept replacing existing (lossy) channel (new one lossy="
1619 << policy
.lossy
<< ")" << dendl
;
1620 existing
->was_session_reset();
1624 ldout(async_msgr
->cct
, 0) << __func__
<< " accept connect_seq " << connect
.connect_seq
1625 << " vs existing csq=" << existing
->connect_seq
<< " existing_state="
1626 << get_state_name(existing
->state
) << dendl
;
1628 if (connect
.connect_seq
== 0 && existing
->connect_seq
> 0) {
1629 ldout(async_msgr
->cct
,0) << __func__
<< " accept peer reset, then tried to connect to us, replacing" << dendl
;
1630 // this is a hard reset from peer
1631 is_reset_from_peer
= true;
1632 if (policy
.resetcheck
)
1633 existing
->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
1637 if (connect
.connect_seq
< existing
->connect_seq
) {
1638 // old attempt, or we sent READY but they didn't get it.
1639 ldout(async_msgr
->cct
, 10) << __func__
<< " accept existing " << existing
<< ".cseq "
1640 << existing
->connect_seq
<< " > " << connect
.connect_seq
1641 << ", RETRY_SESSION" << dendl
;
1642 reply
.connect_seq
= existing
->connect_seq
+ 1;
1643 existing
->lock
.unlock();
1644 return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION
, connect
, reply
, authorizer_reply
);
1647 if (connect
.connect_seq
== existing
->connect_seq
) {
1648 // if the existing connection successfully opened, and/or
1649 // subsequently went to standby, then the peer should bump
1650 // their connect_seq and retry: this is not a connection race
1651 // we need to resolve here.
1652 if (existing
->state
== STATE_OPEN
||
1653 existing
->state
== STATE_STANDBY
) {
1654 ldout(async_msgr
->cct
, 10) << __func__
<< " accept connection race, existing " << existing
1655 << ".cseq " << existing
->connect_seq
<< " == "
1656 << connect
.connect_seq
<< ", OPEN|STANDBY, RETRY_SESSION" << dendl
;
1657 reply
.connect_seq
= existing
->connect_seq
+ 1;
1658 existing
->lock
.unlock();
1659 return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION
, connect
, reply
, authorizer_reply
);
1663 if (peer_addr
< async_msgr
->get_myaddr() || existing
->policy
.server
) {
1665 ldout(async_msgr
->cct
, 10) << __func__
<< " accept connection race, existing " << existing
1666 << ".cseq " << existing
->connect_seq
<< " == " << connect
.connect_seq
1667 << ", or we are server, replacing my attempt" << dendl
;
1670 // our existing outgoing wins
1671 ldout(async_msgr
->cct
,10) << __func__
<< " accept connection race, existing "
1672 << existing
<< ".cseq " << existing
->connect_seq
1673 << " == " << connect
.connect_seq
<< ", sending WAIT" << dendl
;
1674 assert(peer_addr
> async_msgr
->get_myaddr());
1675 existing
->lock
.unlock();
1676 return _reply_accept(CEPH_MSGR_TAG_WAIT
, connect
, reply
, authorizer_reply
);
1680 assert(connect
.connect_seq
> existing
->connect_seq
);
1681 assert(connect
.global_seq
>= existing
->peer_global_seq
);
1682 if (policy
.resetcheck
&& // RESETSESSION only used by servers; peers do not reset each other
1683 existing
->connect_seq
== 0) {
1684 ldout(async_msgr
->cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
1685 << connect
.connect_seq
<< ", " << existing
<< ".cseq = "
1686 << existing
->connect_seq
<< "), sending RESETSESSION" << dendl
;
1687 existing
->lock
.unlock();
1688 return _reply_accept(CEPH_MSGR_TAG_RESETSESSION
, connect
, reply
, authorizer_reply
);
1692 ldout(async_msgr
->cct
, 10) << __func__
<< " accept peer sent cseq " << connect
.connect_seq
1693 << " > " << existing
->connect_seq
<< dendl
;
1696 else if (!replacing
&& connect
.connect_seq
> 0) {
1697 // we reset, and they are opening a new session
1698 ldout(async_msgr
->cct
, 0) << __func__
<< " accept we reset (peer sent cseq "
1699 << connect
.connect_seq
<< "), sending RESETSESSION" << dendl
;
1700 return _reply_accept(CEPH_MSGR_TAG_RESETSESSION
, connect
, reply
, authorizer_reply
);
1703 ldout(async_msgr
->cct
, 10) << __func__
<< " accept new session" << dendl
;
1710 ldout(async_msgr
->cct
, 10) << __func__
<< " accept replacing " << existing
<< dendl
;
1713 if (existing
->policy
.lossy
) {
1714 // disconnect from the Connection
1715 ldout(async_msgr
->cct
, 1) << __func__
<< " replacing on lossy channel, failing existing" << dendl
;
1717 existing
->dispatch_queue
->queue_reset(existing
.get());
1719 assert(can_write
== WriteStatus::NOWRITE
);
1720 existing
->write_lock
.lock();
1722 // reset the in_seq if this is a hard reset from peer,
1723 // otherwise we respect our original connection's value
1724 if (is_reset_from_peer
) {
1725 existing
->is_reset_from_peer
= true;
1728 center
->delete_file_event(cs
.fd(), EVENT_READABLE
|EVENT_WRITABLE
);
1730 if (existing
->delay_state
) {
1731 existing
->delay_state
->flush();
1732 assert(!delay_state
);
1734 existing
->reset_recv_state();
1736 auto temp_cs
= std::move(cs
);
1737 EventCenter
*new_center
= center
;
1738 Worker
*new_worker
= worker
;
1739 // avoid _stop shutdown replacing socket
1740 // queue a reset on the new connection, which we're dumping for the old
1743 dispatch_queue
->queue_reset(this);
1744 ldout(async_msgr
->cct
, 1) << __func__
<< " stop myself to swap existing" << dendl
;
1745 existing
->can_write
= WriteStatus::REPLACING
;
1746 existing
->replacing
= true;
1747 existing
->state_offset
= 0;
1748 // avoid previous thread modify event
1749 existing
->state
= STATE_NONE
;
1750 // Discard existing prefetch buffer in `recv_buf`
1751 existing
->recv_start
= existing
->recv_end
= 0;
1752 // there shouldn't exist any buffer
1753 assert(recv_start
== recv_end
);
1755 existing
->authorizer_challenge
.reset();
1757 auto deactivate_existing
= std::bind(
1758 [existing
, new_worker
, new_center
, connect
, reply
, authorizer_reply
](ConnectedSocket
&cs
) mutable {
1759 // we need to delete time event in original thread
1761 std::lock_guard
<std::mutex
> l(existing
->lock
);
1762 existing
->write_lock
.lock();
1763 existing
->requeue_sent();
1764 existing
->outcoming_bl
.clear();
1765 existing
->open_write
= false;
1766 existing
->write_lock
.unlock();
1767 if (existing
->state
== STATE_NONE
) {
1768 existing
->shutdown_socket();
1769 existing
->cs
= std::move(cs
);
1770 existing
->worker
->references
--;
1771 new_worker
->references
++;
1772 existing
->logger
= new_worker
->get_perf_counter();
1773 existing
->worker
= new_worker
;
1774 existing
->center
= new_center
;
1775 if (existing
->delay_state
)
1776 existing
->delay_state
->set_center(new_center
);
1777 } else if (existing
->state
== STATE_CLOSED
) {
1778 auto back_to_close
= std::bind(
1779 [](ConnectedSocket
&cs
) mutable { cs
.close(); }, std::move(cs
));
1780 new_center
->submit_to(
1781 new_center
->get_id(), std::move(back_to_close
), true);
1788 // Before changing existing->center, it may already exists some events in existing->center's queue.
1789 // Then if we mark down `existing`, it will execute in another thread and clean up connection.
1790 // Previous event will result in segment fault
1791 auto transfer_existing
= [existing
, connect
, reply
, authorizer_reply
]() mutable {
1792 std::lock_guard
<std::mutex
> l(existing
->lock
);
1793 if (existing
->state
== STATE_CLOSED
)
1795 assert(existing
->state
== STATE_NONE
);
1797 existing
->state
= STATE_ACCEPTING_WAIT_CONNECT_MSG
;
1798 existing
->center
->create_file_event(existing
->cs
.fd(), EVENT_READABLE
, existing
->read_handler
);
1799 reply
.global_seq
= existing
->peer_global_seq
;
1800 if (existing
->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL
, connect
, reply
, authorizer_reply
) < 0) {
1805 if (existing
->center
->in_thread())
1806 transfer_existing();
1808 existing
->center
->submit_to(
1809 existing
->center
->get_id(), std::move(transfer_existing
), true);
1810 }, std::move(temp_cs
));
1812 existing
->center
->submit_to(
1813 existing
->center
->get_id(), std::move(deactivate_existing
), true);
1814 existing
->write_lock
.unlock();
1815 existing
->lock
.unlock();
1818 existing
->lock
.unlock();
1821 connect_seq
= connect
.connect_seq
+ 1;
1822 peer_global_seq
= connect
.global_seq
;
1823 ldout(async_msgr
->cct
, 10) << __func__
<< " accept success, connect_seq = "
1824 << connect_seq
<< " in_seq=" << in_seq
<< ", sending READY" << dendl
;
1828 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
1829 if ((connect
.features
& CEPH_FEATURE_RECONNECT_SEQ
) && !is_reset_from_peer
) {
1830 reply
.tag
= CEPH_MSGR_TAG_SEQ
;
1831 next_state
= STATE_ACCEPTING_WAIT_SEQ
;
1833 reply
.tag
= CEPH_MSGR_TAG_READY
;
1834 next_state
= STATE_ACCEPTING_READY
;
1835 discard_requeued_up_to(0);
1836 is_reset_from_peer
= false;
1841 reply
.features
= policy
.features_supported
;
1842 reply
.global_seq
= async_msgr
->get_global_seq();
1843 reply
.connect_seq
= connect_seq
;
1845 reply
.authorizer_len
= authorizer_reply
.length();
1847 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
1849 set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
1850 ldout(async_msgr
->cct
, 10) << __func__
<< " accept features " << get_features() << dendl
;
1852 session_security
.reset(
1853 get_auth_session_handler(async_msgr
->cct
, connect
.authorizer_protocol
,
1854 session_key
, get_features()));
1856 reply_bl
.append((char*)&reply
, sizeof(reply
));
1858 if (reply
.authorizer_len
)
1859 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
1861 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1862 uint64_t s
= in_seq
;
1863 reply_bl
.append((char*)&s
, sizeof(s
));
1867 // Because "replacing" will prevent other connections preempt this addr,
1868 // it's safe that here we don't acquire Connection's lock
1869 r
= async_msgr
->accept_conn(this);
1876 ldout(async_msgr
->cct
, 1) << __func__
<< " existing race replacing process for addr=" << peer_addr
1877 << " just fail later one(this)" << dendl
;
1878 goto fail_registered
;
1880 if (state
!= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
1881 ldout(async_msgr
->cct
, 1) << __func__
<< " state changed while accept_conn, it must be mark_down" << dendl
;
1882 assert(state
== STATE_CLOSED
|| state
== STATE_NONE
);
1883 async_msgr
->unregister_conn(this);
1884 goto fail_registered
;
1887 r
= try_send(reply_bl
);
1889 goto fail_registered
;
1892 dispatch_queue
->queue_accept(this);
1893 async_msgr
->ms_deliver_handle_fast_accept(this);
1898 ldout(async_msgr
->cct
, 2) << __func__
<< " accept write reply msg done" << dendl
;
1900 state
= STATE_WAIT_SEND
;
1901 state_after_send
= next_state
;
1907 ldout(async_msgr
->cct
, 10) << __func__
<< " accept fault after register" << dendl
;
1911 ldout(async_msgr
->cct
, 10) << __func__
<< " failed to accept." << dendl
;
1915 void AsyncConnection::_connect()
1917 ldout(async_msgr
->cct
, 10) << __func__
<< " csq=" << connect_seq
<< dendl
;
1919 state
= STATE_CONNECTING
;
1920 // rescheduler connection in order to avoid lock dep
1921 // may called by external thread(send_message)
1922 center
->dispatch_event_external(read_handler
);
1925 void AsyncConnection::accept(ConnectedSocket socket
, entity_addr_t
&addr
)
1927 ldout(async_msgr
->cct
, 10) << __func__
<< " sd=" << socket
.fd() << dendl
;
1928 assert(socket
.fd() >= 0);
1930 std::lock_guard
<std::mutex
> l(lock
);
1931 cs
= std::move(socket
);
1933 state
= STATE_ACCEPTING
;
1934 // rescheduler connection in order to avoid lock dep
1935 center
->dispatch_event_external(read_handler
);
1938 int AsyncConnection::send_message(Message
*m
)
1941 lgeneric_subdout(async_msgr
->cct
, ms
,
1942 1) << "-- " << async_msgr
->get_myaddr() << " --> "
1943 << get_peer_addr() << " -- "
1944 << *m
<< " -- " << m
<< " con "
1945 << m
->get_connection().get()
1948 // optimistic think it's ok to encode(actually may broken now)
1949 if (!m
->get_priority())
1950 m
->set_priority(async_msgr
->get_default_send_priority());
1952 m
->get_header().src
= async_msgr
->get_myname();
1953 m
->set_connection(this);
1955 if (m
->get_type() == CEPH_MSG_OSD_OP
)
1956 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_BEGIN", true);
1957 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
1958 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
1960 if (async_msgr
->get_myaddr() == get_peer_addr()) { //loopback connection
1961 ldout(async_msgr
->cct
, 20) << __func__
<< " " << *m
<< " local" << dendl
;
1962 std::lock_guard
<std::mutex
> l(write_lock
);
1963 if (can_write
!= WriteStatus::CLOSED
) {
1964 dispatch_queue
->local_delivery(m
, m
->get_priority());
1966 ldout(async_msgr
->cct
, 10) << __func__
<< " loopback connection closed."
1967 << " Drop message " << m
<< dendl
;
1973 last_active
= ceph::coarse_mono_clock::now();
1974 // we don't want to consider local message here, it's too lightweight which
1975 // may disturb users
1976 logger
->inc(l_msgr_send_messages
);
1979 uint64_t f
= get_features();
1981 // TODO: Currently not all messages supports reencode like MOSDMap, so here
1982 // only let fast dispatch support messages prepare message
1983 bool can_fast_prepare
= async_msgr
->ms_can_fast_dispatch(m
);
1984 if (can_fast_prepare
)
1985 prepare_send_message(f
, m
, bl
);
1987 std::lock_guard
<std::mutex
> l(write_lock
);
1988 // "features" changes will change the payload encoding
1989 if (can_fast_prepare
&& (can_write
== WriteStatus::NOWRITE
|| get_features() != f
)) {
1990 // ensure the correctness of message encoding
1992 m
->get_payload().clear();
1993 ldout(async_msgr
->cct
, 5) << __func__
<< " clear encoded buffer previous "
1994 << f
<< " != " << get_features() << dendl
;
1996 if (can_write
== WriteStatus::CLOSED
) {
1997 ldout(async_msgr
->cct
, 10) << __func__
<< " connection closed."
1998 << " Drop message " << m
<< dendl
;
2001 m
->trace
.event("async enqueueing message");
2002 out_q
[m
->get_priority()].emplace_back(std::move(bl
), m
);
2003 ldout(async_msgr
->cct
, 15) << __func__
<< " inline write is denied, reschedule m=" << m
<< dendl
;
2004 if (can_write
!= WriteStatus::REPLACING
)
2005 center
->dispatch_event_external(write_handler
);
2010 void AsyncConnection::requeue_sent()
2015 list
<pair
<bufferlist
, Message
*> >& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
2016 while (!sent
.empty()) {
2017 Message
* m
= sent
.back();
2019 ldout(async_msgr
->cct
, 10) << __func__
<< " " << *m
<< " for resend "
2020 << " (" << m
->get_seq() << ")" << dendl
;
2021 rq
.push_front(make_pair(bufferlist(), m
));
2026 void AsyncConnection::discard_requeued_up_to(uint64_t seq
)
2028 ldout(async_msgr
->cct
, 10) << __func__
<< " " << seq
<< dendl
;
2029 std::lock_guard
<std::mutex
> l(write_lock
);
2030 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0)
2032 list
<pair
<bufferlist
, Message
*> >& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
2033 while (!rq
.empty()) {
2034 pair
<bufferlist
, Message
*> p
= rq
.front();
2035 if (p
.second
->get_seq() == 0 || p
.second
->get_seq() > seq
)
2037 ldout(async_msgr
->cct
, 10) << __func__
<< " " << *(p
.second
) << " for resend seq " << p
.second
->get_seq()
2038 << " <= " << seq
<< ", discarding" << dendl
;
2044 out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
2048 * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue
2049 * Must hold write_lock prior to calling.
2051 void AsyncConnection::discard_out_queue()
2053 ldout(async_msgr
->cct
, 10) << __func__
<< " started" << dendl
;
2055 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
2056 ldout(async_msgr
->cct
, 20) << __func__
<< " discard " << *p
<< dendl
;
2060 for (map
<int, list
<pair
<bufferlist
, Message
*> > >::iterator p
= out_q
.begin(); p
!= out_q
.end(); ++p
)
2061 for (list
<pair
<bufferlist
, Message
*> >::iterator r
= p
->second
.begin(); r
!= p
->second
.end(); ++r
) {
2062 ldout(async_msgr
->cct
, 20) << __func__
<< " discard " << r
->second
<< dendl
;
2068 int AsyncConnection::randomize_out_seq()
2070 if (get_features() & CEPH_FEATURE_MSG_AUTH
) {
2071 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
2072 // here. We'll check it on the call. PLR
2074 int seq_error
= get_random_bytes((char *)&rand_seq
, sizeof(rand_seq
));
2075 rand_seq
&= SEQ_MASK
;
2076 lsubdout(async_msgr
->cct
, ms
, 10) << __func__
<< " randomize_out_seq " << rand_seq
<< dendl
;
2080 // previously, seq #'s always started at 0.
2086 void AsyncConnection::fault()
2088 if (state
== STATE_CLOSED
|| state
== STATE_NONE
) {
2089 ldout(async_msgr
->cct
, 10) << __func__
<< " connection is already closed" << dendl
;
2093 if (policy
.lossy
&& !(state
>= STATE_CONNECTING
&& state
< STATE_CONNECTING_READY
)) {
2094 ldout(async_msgr
->cct
, 1) << __func__
<< " on lossy channel, failing" << dendl
;
2096 dispatch_queue
->queue_reset(this);
2101 can_write
= WriteStatus::NOWRITE
;
2105 // queue delayed items immediately
2107 delay_state
->flush();
2108 // requeue sent items
2110 recv_start
= recv_end
= 0;
2113 is_reset_from_peer
= false;
2114 outcoming_bl
.clear();
2115 if (!once_ready
&& !is_queued() &&
2116 state
>=STATE_ACCEPTING
&& state
<= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
) {
2117 ldout(async_msgr
->cct
, 10) << __func__
<< " with nothing to send and in the half "
2118 << " accept state just closed" << dendl
;
2119 write_lock
.unlock();
2121 dispatch_queue
->queue_reset(this);
2125 if (policy
.standby
&& !is_queued() && state
!= STATE_WAIT
) {
2126 ldout(async_msgr
->cct
, 10) << __func__
<< " with nothing to send, going to standby" << dendl
;
2127 state
= STATE_STANDBY
;
2128 write_lock
.unlock();
2132 write_lock
.unlock();
2133 if (!(state
>= STATE_CONNECTING
&& state
< STATE_CONNECTING_READY
) &&
2134 state
!= STATE_WAIT
) { // STATE_WAIT is coming from STATE_CONNECTING_*
2135 // policy maybe empty when state is in accept
2136 if (policy
.server
) {
2137 ldout(async_msgr
->cct
, 0) << __func__
<< " server, going to standby" << dendl
;
2138 state
= STATE_STANDBY
;
2140 ldout(async_msgr
->cct
, 0) << __func__
<< " initiating reconnect" << dendl
;
2142 state
= STATE_CONNECTING
;
2144 backoff
= utime_t();
2145 center
->dispatch_event_external(read_handler
);
2147 if (state
== STATE_WAIT
) {
2148 backoff
.set_from_double(async_msgr
->cct
->_conf
->ms_max_backoff
);
2149 } else if (backoff
== utime_t()) {
2150 backoff
.set_from_double(async_msgr
->cct
->_conf
->ms_initial_backoff
);
2153 if (backoff
> async_msgr
->cct
->_conf
->ms_max_backoff
)
2154 backoff
.set_from_double(async_msgr
->cct
->_conf
->ms_max_backoff
);
2157 state
= STATE_CONNECTING
;
2158 ldout(async_msgr
->cct
, 10) << __func__
<< " waiting " << backoff
<< dendl
;
2160 register_time_events
.insert(center
->create_time_event(
2161 backoff
.to_nsec()/1000, wakeup_handler
));
2165 void AsyncConnection::was_session_reset()
2167 ldout(async_msgr
->cct
,10) << __func__
<< " started" << dendl
;
2168 std::lock_guard
<std::mutex
> l(write_lock
);
2170 delay_state
->discard();
2171 dispatch_queue
->discard_queue(conn_id
);
2172 discard_out_queue();
2173 // note: we need to clear outcoming_bl here, but was_session_reset may be
2174 // called by other thread, so let caller clear this itself!
2175 // outcoming_bl.clear();
2177 dispatch_queue
->queue_remote_reset(this);
2179 if (randomize_out_seq()) {
2180 ldout(async_msgr
->cct
, 15) << __func__
<< " could not get random bytes to set seq number for session reset; set seq number to " << out_seq
<< dendl
;
2185 // it's safe to directly set 0, double locked
2188 can_write
= WriteStatus::NOWRITE
;
2191 void AsyncConnection::_stop()
2193 if (state
== STATE_CLOSED
)
2197 delay_state
->flush();
2199 ldout(async_msgr
->cct
, 2) << __func__
<< dendl
;
2200 std::lock_guard
<std::mutex
> l(write_lock
);
2203 dispatch_queue
->discard_queue(conn_id
);
2204 discard_out_queue();
2205 async_msgr
->unregister_conn(this);
2206 worker
->release_worker();
2208 state
= STATE_CLOSED
;
2210 can_write
= WriteStatus::CLOSED
;
2212 // Make sure in-queue events will been processed
2213 center
->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
2216 void AsyncConnection::prepare_send_message(uint64_t features
, Message
*m
, bufferlist
&bl
)
2218 ldout(async_msgr
->cct
, 20) << __func__
<< " m" << " " << *m
<< dendl
;
2220 // associate message with Connection (for benefit of encode_payload)
2221 if (m
->empty_payload())
2222 ldout(async_msgr
->cct
, 20) << __func__
<< " encoding features "
2223 << features
<< " " << m
<< " " << *m
<< dendl
;
2225 ldout(async_msgr
->cct
, 20) << __func__
<< " half-reencoding features "
2226 << features
<< " " << m
<< " " << *m
<< dendl
;
2228 // encode and copy out of *m
2229 m
->encode(features
, msgr
->crcflags
);
2231 bl
.append(m
->get_payload());
2232 bl
.append(m
->get_middle());
2233 bl
.append(m
->get_data());
2236 ssize_t
AsyncConnection::write_message(Message
*m
, bufferlist
& bl
, bool more
)
2239 assert(center
->in_thread());
2240 m
->set_seq(++out_seq
);
2242 if (msgr
->crcflags
& MSG_CRC_HEADER
)
2243 m
->calc_header_crc();
2245 ceph_msg_header
& header
= m
->get_header();
2246 ceph_msg_footer
& footer
= m
->get_footer();
2248 // TODO: let sign_message could be reentry?
2249 // Now that we have all the crcs calculated, handle the
2250 // digital signature for the message, if the AsyncConnection has session
2251 // security set up. Some session security options do not
2252 // actually calculate and check the signature, but they should
2253 // handle the calls to sign_message and check_signature. PLR
2254 if (session_security
.get() == NULL
) {
2255 ldout(async_msgr
->cct
, 20) << __func__
<< " no session security" << dendl
;
2257 if (session_security
->sign_message(m
)) {
2258 ldout(async_msgr
->cct
, 20) << __func__
<< " failed to sign m="
2259 << m
<< "): sig = " << footer
.sig
<< dendl
;
2261 ldout(async_msgr
->cct
, 20) << __func__
<< " signed m=" << m
2262 << "): sig = " << footer
.sig
<< dendl
;
2266 unsigned original_bl_len
= outcoming_bl
.length();
2268 outcoming_bl
.append(CEPH_MSGR_TAG_MSG
);
2270 if (has_feature(CEPH_FEATURE_NOSRCADDR
)) {
2271 outcoming_bl
.append((char*)&header
, sizeof(header
));
2273 ceph_msg_header_old oldheader
;
2274 memcpy(&oldheader
, &header
, sizeof(header
));
2275 oldheader
.src
.name
= header
.src
;
2276 oldheader
.src
.addr
= get_peer_addr();
2277 oldheader
.orig_src
= oldheader
.src
;
2278 oldheader
.reserved
= header
.reserved
;
2279 oldheader
.crc
= ceph_crc32c(0, (unsigned char*)&oldheader
,
2280 sizeof(oldheader
) - sizeof(oldheader
.crc
));
2281 outcoming_bl
.append((char*)&oldheader
, sizeof(oldheader
));
2284 ldout(async_msgr
->cct
, 20) << __func__
<< " sending message type=" << header
.type
2285 << " src " << entity_name_t(header
.src
)
2286 << " front=" << header
.front_len
2287 << " data=" << header
.data_len
2288 << " off " << header
.data_off
<< dendl
;
2290 if ((bl
.length() <= ASYNC_COALESCE_THRESHOLD
) && (bl
.buffers().size() > 1)) {
2291 for (const auto &pb
: bl
.buffers()) {
2292 outcoming_bl
.append((char*)pb
.c_str(), pb
.length());
2295 outcoming_bl
.claim_append(bl
);
2298 // send footer; if receiver doesn't support signatures, use the old footer format
2299 ceph_msg_footer_old old_footer
;
2300 if (has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2301 outcoming_bl
.append((char*)&footer
, sizeof(footer
));
2303 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2304 old_footer
.front_crc
= footer
.front_crc
;
2305 old_footer
.middle_crc
= footer
.middle_crc
;
2306 old_footer
.data_crc
= footer
.data_crc
;
2308 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
2310 old_footer
.data_crc
= msgr
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
2311 old_footer
.flags
= footer
.flags
;
2312 outcoming_bl
.append((char*)&old_footer
, sizeof(old_footer
));
2315 m
->trace
.event("async writing message");
2316 ldout(async_msgr
->cct
, 20) << __func__
<< " sending " << m
->get_seq()
2317 << " " << m
<< dendl
;
2318 ssize_t total_send_size
= outcoming_bl
.length();
2319 ssize_t rc
= _try_send(more
);
2321 ldout(async_msgr
->cct
, 1) << __func__
<< " error sending " << m
<< ", "
2322 << cpp_strerror(rc
) << dendl
;
2323 } else if (rc
== 0) {
2324 logger
->inc(l_msgr_send_bytes
, total_send_size
- original_bl_len
);
2325 ldout(async_msgr
->cct
, 10) << __func__
<< " sending " << m
<< " done." << dendl
;
2327 logger
->inc(l_msgr_send_bytes
, total_send_size
- outcoming_bl
.length());
2328 ldout(async_msgr
->cct
, 10) << __func__
<< " sending " << m
<< " continuely." << dendl
;
2330 if (m
->get_type() == CEPH_MSG_OSD_OP
)
2331 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_END", false);
2332 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
2333 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_END", false);
2339 void AsyncConnection::reset_recv_state()
2341 // clean up state internal variables and states
2342 if (state
>= STATE_CONNECTING_SEND_CONNECT_MSG
&&
2343 state
<= STATE_CONNECTING_READY
) {
2348 if (state
> STATE_OPEN_MESSAGE_THROTTLE_MESSAGE
&&
2349 state
<= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
2350 && policy
.throttler_messages
) {
2351 ldout(async_msgr
->cct
, 10) << __func__
<< " releasing " << 1
2352 << " message to policy throttler "
2353 << policy
.throttler_messages
->get_current() << "/"
2354 << policy
.throttler_messages
->get_max() << dendl
;
2355 policy
.throttler_messages
->put();
2357 if (state
> STATE_OPEN_MESSAGE_THROTTLE_BYTES
&&
2358 state
<= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
) {
2359 if (policy
.throttler_bytes
) {
2360 ldout(async_msgr
->cct
, 10) << __func__
<< " releasing " << cur_msg_size
2361 << " bytes to policy throttler "
2362 << policy
.throttler_bytes
->get_current() << "/"
2363 << policy
.throttler_bytes
->get_max() << dendl
;
2364 policy
.throttler_bytes
->put(cur_msg_size
);
2367 if (state
> STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE
&&
2368 state
<= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
) {
2369 ldout(async_msgr
->cct
, 10) << __func__
<< " releasing " << cur_msg_size
2370 << " bytes to dispatch_queue throttler "
2371 << dispatch_queue
->dispatch_throttler
.get_current() << "/"
2372 << dispatch_queue
->dispatch_throttler
.get_max() << dendl
;
2373 dispatch_queue
->dispatch_throttle_release(cur_msg_size
);
2377 void AsyncConnection::handle_ack(uint64_t seq
)
2379 ldout(async_msgr
->cct
, 15) << __func__
<< " got ack seq " << seq
<< dendl
;
2381 std::lock_guard
<std::mutex
> l(write_lock
);
2382 while (!sent
.empty() && sent
.front()->get_seq() <= seq
) {
2383 Message
* m
= sent
.front();
2385 ldout(async_msgr
->cct
, 10) << __func__
<< " got ack seq "
2386 << seq
<< " >= " << m
->get_seq() << " on "
2387 << m
<< " " << *m
<< dendl
;
2392 void AsyncConnection::DelayedDelivery::do_request(int id
)
2394 Message
*m
= nullptr;
2396 std::lock_guard
<std::mutex
> l(delay_lock
);
2397 register_time_events
.erase(id
);
2400 if (delay_queue
.empty())
2402 utime_t release
= delay_queue
.front().first
;
2403 m
= delay_queue
.front().second
;
2404 string delay_msg_type
= msgr
->cct
->_conf
->ms_inject_delay_msg_type
;
2405 utime_t now
= ceph_clock_now();
2406 if ((release
> now
&&
2407 (delay_msg_type
.empty() || m
->get_type_name() == delay_msg_type
))) {
2408 utime_t t
= release
- now
;
2411 delay_queue
.pop_front();
2413 if (msgr
->ms_can_fast_dispatch(m
)) {
2414 dispatch_queue
->fast_dispatch(m
);
2416 dispatch_queue
->enqueue(m
, m
->get_priority(), conn_id
);
2420 void AsyncConnection::DelayedDelivery::flush() {
2421 stop_dispatch
= true;
2423 center
->get_id(), [this] () mutable {
2424 std::lock_guard
<std::mutex
> l(delay_lock
);
2425 while (!delay_queue
.empty()) {
2426 Message
*m
= delay_queue
.front().second
;
2427 if (msgr
->ms_can_fast_dispatch(m
)) {
2428 dispatch_queue
->fast_dispatch(m
);
2430 dispatch_queue
->enqueue(m
, m
->get_priority(), conn_id
);
2432 delay_queue
.pop_front();
2434 for (auto i
: register_time_events
)
2435 center
->delete_time_event(i
);
2436 register_time_events
.clear();
2437 stop_dispatch
= false;
2441 void AsyncConnection::send_keepalive()
2443 ldout(async_msgr
->cct
, 10) << __func__
<< dendl
;
2444 std::lock_guard
<std::mutex
> l(write_lock
);
2445 if (can_write
!= WriteStatus::CLOSED
) {
2447 center
->dispatch_event_external(write_handler
);
2451 void AsyncConnection::mark_down()
2453 ldout(async_msgr
->cct
, 1) << __func__
<< dendl
;
2454 std::lock_guard
<std::mutex
> l(lock
);
2458 void AsyncConnection::_append_keepalive_or_ack(bool ack
, utime_t
*tp
)
2460 ldout(async_msgr
->cct
, 10) << __func__
<< dendl
;
2463 struct ceph_timespec ts
;
2464 tp
->encode_timeval(&ts
);
2465 outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK
);
2466 outcoming_bl
.append((char*)&ts
, sizeof(ts
));
2467 } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
2468 struct ceph_timespec ts
;
2469 utime_t t
= ceph_clock_now();
2470 t
.encode_timeval(&ts
);
2471 outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE2
);
2472 outcoming_bl
.append((char*)&ts
, sizeof(ts
));
2474 outcoming_bl
.append(CEPH_MSGR_TAG_KEEPALIVE
);
2478 void AsyncConnection::handle_write()
2480 ldout(async_msgr
->cct
, 10) << __func__
<< dendl
;
2484 if (can_write
== WriteStatus::CANWRITE
) {
2486 _append_keepalive_or_ack();
2490 auto start
= ceph::mono_clock::now();
2494 Message
*m
= _get_next_outgoing(&data
);
2498 if (!policy
.lossy
) {
2503 more
= _has_next_outgoing();
2504 write_lock
.unlock();
2506 // send_message or requeue messages may not encode message
2508 prepare_send_message(get_features(), m
, data
);
2510 r
= write_message(m
, data
, more
);
2512 ldout(async_msgr
->cct
, 1) << __func__
<< " send msg failed" << dendl
;
2518 } while (can_write
== WriteStatus::CANWRITE
);
2519 write_lock
.unlock();
2521 uint64_t left
= ack_left
;
2525 outcoming_bl
.append(CEPH_MSGR_TAG_ACK
);
2526 outcoming_bl
.append((char*)&s
, sizeof(s
));
2527 ldout(async_msgr
->cct
, 10) << __func__
<< " try send msg ack, acked " << left
<< " messages" << dendl
;
2530 r
= _try_send(left
);
2531 } else if (is_queued()) {
2535 logger
->tinc(l_msgr_running_send_time
, ceph::mono_clock::now() - start
);
2537 ldout(async_msgr
->cct
, 1) << __func__
<< " send msg failed" << dendl
;
2541 write_lock
.unlock();
2544 if (state
== STATE_STANDBY
&& !policy
.server
&& is_queued()) {
2545 ldout(async_msgr
->cct
, 10) << __func__
<< " policy.server is false" << dendl
;
2547 } else if (cs
&& state
!= STATE_NONE
&& state
!= STATE_CONNECTING
&& state
!= STATE_CONNECTING_RE
&& state
!= STATE_CLOSED
) {
2550 ldout(async_msgr
->cct
, 1) << __func__
<< " send outcoming bl failed" << dendl
;
2551 write_lock
.unlock();
2557 write_lock
.unlock();
2569 void AsyncConnection::wakeup_from(uint64_t id
)
2572 register_time_events
.erase(id
);
2577 void AsyncConnection::tick(uint64_t id
)
2579 auto now
= ceph::coarse_mono_clock::now();
2580 ldout(async_msgr
->cct
, 20) << __func__
<< " last_id=" << last_tick_id
2581 << " last_active" << last_active
<< dendl
;
2582 std::lock_guard
<std::mutex
> l(lock
);
2584 auto idle_period
= std::chrono::duration_cast
<std::chrono::microseconds
>(now
- last_active
).count();
2585 if (inactive_timeout_us
< (uint64_t)idle_period
) {
2586 ldout(async_msgr
->cct
, 1) << __func__
<< " idle(" << idle_period
<< ") more than "
2587 << inactive_timeout_us
2588 << " us, mark self fault." << dendl
;
2590 } else if (is_connected()) {
2591 last_tick_id
= center
->create_time_event(inactive_timeout_us
, tick_handler
);