1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include "include/Context.h"
20 #include "include/random.h"
21 #include "common/errno.h"
22 #include "AsyncMessenger.h"
23 #include "AsyncConnection.h"
25 #include "ProtocolV1.h"
26 #include "ProtocolV2.h"
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
32 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
33 #define SEQ_MASK 0x7fffffff
35 #define dout_subsys ceph_subsys_ms
37 #define dout_prefix _conn_prefix(_dout)
38 std::ostream
& AsyncConnection::_conn_prefix(std::ostream
*_dout
) {
39 return *_dout
<< "-- " << async_msgr
->get_myaddrs() << " >> "
40 << *peer_addrs
<< " conn(" << this
41 << (msgr2
? " msgr2=" : " legacy=")
43 << " " << ceph_con_mode_name(protocol
->auth_meta
->con_mode
)
45 << " s=" << get_state_name(state
)
46 << " l=" << policy
.lossy
51 // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
53 const uint32_t AsyncConnection::TCP_PREFETCH_MIN_SIZE
= 512;
55 class C_time_wakeup
: public EventCallback
{
56 AsyncConnectionRef conn
;
59 explicit C_time_wakeup(AsyncConnectionRef c
): conn(c
) {}
60 void do_request(uint64_t fd_or_id
) override
{
61 conn
->wakeup_from(fd_or_id
);
65 class C_handle_read
: public EventCallback
{
66 AsyncConnectionRef conn
;
69 explicit C_handle_read(AsyncConnectionRef c
): conn(c
) {}
70 void do_request(uint64_t fd_or_id
) override
{
75 class C_handle_write
: public EventCallback
{
76 AsyncConnectionRef conn
;
79 explicit C_handle_write(AsyncConnectionRef c
): conn(c
) {}
80 void do_request(uint64_t fd
) override
{
85 class C_handle_write_callback
: public EventCallback
{
86 AsyncConnectionRef conn
;
89 explicit C_handle_write_callback(AsyncConnectionRef c
) : conn(c
) {}
90 void do_request(uint64_t fd
) override
{ conn
->handle_write_callback(); }
93 class C_clean_handler
: public EventCallback
{
94 AsyncConnectionRef conn
;
96 explicit C_clean_handler(AsyncConnectionRef c
): conn(c
) {}
97 void do_request(uint64_t id
) override
{
103 class C_tick_wakeup
: public EventCallback
{
104 AsyncConnectionRef conn
;
107 explicit C_tick_wakeup(AsyncConnectionRef c
): conn(c
) {}
108 void do_request(uint64_t fd_or_id
) override
{
109 conn
->tick(fd_or_id
);
114 AsyncConnection::AsyncConnection(CephContext
*cct
, AsyncMessenger
*m
, DispatchQueue
*q
,
115 Worker
*w
, bool m2
, bool local
)
116 : Connection(cct
, m
),
117 delay_state(NULL
), async_msgr(m
), conn_id(q
->get_id()),
118 logger(w
->get_perf_counter()),
119 labeled_logger(w
->get_labeled_perf_counter()),
120 state(STATE_NONE
), port(-1),
121 dispatch_queue(q
), recv_buf(NULL
),
122 recv_max_prefetch(std::max
<int64_t>(msgr
->cct
->_conf
->ms_tcp_prefetch_max_size
, TCP_PREFETCH_MIN_SIZE
)),
123 recv_start(0), recv_end(0),
124 last_active(ceph::coarse_mono_clock::now()),
125 connect_timeout_us(cct
->_conf
->ms_connection_ready_timeout
*1000*1000),
126 inactive_timeout_us(cct
->_conf
->ms_connection_idle_timeout
*1000*1000),
127 msgr2(m2
), state_offset(0),
128 worker(w
), center(&w
->center
),read_buffer(nullptr)
130 #ifdef UNIT_TESTS_BUILT
131 this->interceptor
= m
->interceptor
;
133 read_handler
= new C_handle_read(this);
134 write_handler
= new C_handle_write(this);
135 write_callback_handler
= new C_handle_write_callback(this);
136 wakeup_handler
= new C_time_wakeup(this);
137 tick_handler
= new C_tick_wakeup(this);
138 // double recv_max_prefetch see "read_until"
139 recv_buf
= new char[2*recv_max_prefetch
];
141 protocol
= std::unique_ptr
<Protocol
>(new LoopbackProtocolV1(this));
143 protocol
= std::unique_ptr
<Protocol
>(new ProtocolV2(this));
145 protocol
= std::unique_ptr
<Protocol
>(new ProtocolV1(this));
147 logger
->inc(l_msgr_created_connections
);
150 AsyncConnection::~AsyncConnection()
154 ceph_assert(!delay_state
);
157 int AsyncConnection::get_con_mode() const
159 return protocol
->get_con_mode();
162 bool AsyncConnection::is_msgr2() const
164 return protocol
->proto_type
== 2;
167 void AsyncConnection::maybe_start_delay_thread()
170 async_msgr
->cct
->_conf
.with_val
<std::string
>(
171 "ms_inject_delay_type",
172 [this](const std::string
& s
) {
173 if (s
.find(ceph_entity_type_name(peer_type
)) != std::string::npos
) {
174 ldout(msgr
->cct
, 1) << __func__
<< " setting up a delay queue"
176 delay_state
= new DelayedDelivery(async_msgr
, center
, dispatch_queue
,
184 ssize_t
AsyncConnection::read(unsigned len
, char *buffer
,
185 std::function
<void(char *, ssize_t
)> callback
) {
186 ldout(async_msgr
->cct
, 20) << __func__
187 << (pendingReadLen
? " continue" : " start")
188 << " len=" << len
<< dendl
;
189 ssize_t r
= read_until(len
, buffer
);
191 readCallback
= callback
;
192 pendingReadLen
= len
;
193 read_buffer
= buffer
;
198 // Because this func will be called multi times to populate
199 // the needed buffer, so the passed in bufferptr must be the same.
200 // Normally, only "read_message" will pass existing bufferptr in
202 // And it will uses readahead method to reduce small read overhead,
203 // "recv_buf" is used to store read buffer
205 // return the remaining bytes, 0 means this buffer is finished
206 // else return < 0 means error
207 ssize_t
AsyncConnection::read_until(unsigned len
, char *p
)
209 ldout(async_msgr
->cct
, 25) << __func__
<< " len is " << len
<< " state_offset is "
210 << state_offset
<< dendl
;
212 if (async_msgr
->cct
->_conf
->ms_inject_socket_failures
&& cs
) {
213 if (rand() % async_msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
214 ldout(async_msgr
->cct
, 0) << __func__
<< " injecting socket failure" << dendl
;
220 uint64_t left
= len
- state_offset
;
221 if (recv_end
> recv_start
) {
222 uint64_t to_read
= std::min
<uint64_t>(recv_end
- recv_start
, left
);
223 memcpy(p
, recv_buf
+recv_start
, to_read
);
224 recv_start
+= to_read
;
226 ldout(async_msgr
->cct
, 25) << __func__
<< " got " << to_read
<< " in buffer "
227 << " left is " << left
<< " buffer still has "
228 << recv_end
- recv_start
<< dendl
;
233 state_offset
+= to_read
;
236 recv_end
= recv_start
= 0;
237 /* nothing left in the prefetch buffer */
238 if (left
> (uint64_t)recv_max_prefetch
) {
239 /* this was a large read, we don't prefetch for these */
241 r
= read_bulk(p
+state_offset
, left
);
242 ldout(async_msgr
->cct
, 25) << __func__
<< " read_bulk left is " << left
<< " got " << r
<< dendl
;
244 ldout(async_msgr
->cct
, 1) << __func__
<< " read failed" << dendl
;
246 } else if (r
== static_cast<int>(left
)) {
255 r
= read_bulk(recv_buf
+recv_end
, recv_max_prefetch
);
256 ldout(async_msgr
->cct
, 25) << __func__
<< " read_bulk recv_end is " << recv_end
257 << " left is " << left
<< " got " << r
<< dendl
;
259 ldout(async_msgr
->cct
, 1) << __func__
<< " read failed" << dendl
;
263 if (r
>= static_cast<int>(left
)) {
264 recv_start
= len
- state_offset
;
265 memcpy(p
+state_offset
, recv_buf
, recv_start
);
271 memcpy(p
+state_offset
, recv_buf
, recv_end
-recv_start
);
272 state_offset
+= (recv_end
- recv_start
);
273 recv_end
= recv_start
= 0;
275 ldout(async_msgr
->cct
, 25) << __func__
<< " need len " << len
<< " remaining "
276 << len
- state_offset
<< " bytes" << dendl
;
277 return len
- state_offset
;
280 /* return -1 means `fd` occurs error or closed, it should be closed
281 * return 0 means EAGAIN or EINTR */
282 ssize_t
AsyncConnection::read_bulk(char *buf
, unsigned len
)
286 nread
= cs
.read(buf
, len
);
288 if (nread
== -EAGAIN
) {
290 } else if (nread
== -EINTR
) {
293 ldout(async_msgr
->cct
, 1) << __func__
<< " reading from fd=" << cs
.fd()
294 << " : "<< nread
<< " " << strerror(nread
) << dendl
;
297 } else if (nread
== 0) {
298 ldout(async_msgr
->cct
, 1) << __func__
<< " peer close file descriptor "
305 ssize_t
AsyncConnection::write(ceph::buffer::list
&bl
,
306 std::function
<void(ssize_t
)> callback
,
309 std::unique_lock
<std::mutex
> l(write_lock
);
310 outgoing_bl
.claim_append(bl
);
311 ssize_t r
= _try_send(more
);
313 writeCallback
= callback
;
318 // return the remaining bytes, it may larger than the length of ptr
319 // else return < 0 means error
320 ssize_t
AsyncConnection::_try_send(bool more
)
322 if (async_msgr
->cct
->_conf
->ms_inject_socket_failures
&& cs
) {
323 if (rand() % async_msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
324 ldout(async_msgr
->cct
, 0) << __func__
<< " injecting socket failure" << dendl
;
329 ceph_assert(center
->in_thread());
330 ldout(async_msgr
->cct
, 25) << __func__
<< " cs.send " << outgoing_bl
.length()
331 << " bytes" << dendl
;
332 // network block would make ::send return EAGAIN, that would make here looks
333 // like do not call cs.send() and r = 0
335 if (likely(!inject_network_congestion())) {
336 r
= cs
.send(outgoing_bl
, more
);
339 ldout(async_msgr
->cct
, 1) << __func__
<< " send error: " << cpp_strerror(r
) << dendl
;
343 ldout(async_msgr
->cct
, 10) << __func__
<< " sent bytes " << r
344 << " remaining bytes " << outgoing_bl
.length() << dendl
;
346 if (!open_write
&& is_queued()) {
347 center
->create_file_event(cs
.fd(), EVENT_WRITABLE
, write_handler
);
351 if (open_write
&& !is_queued()) {
352 center
->delete_file_event(cs
.fd(), EVENT_WRITABLE
);
355 center
->dispatch_event_external(write_callback_handler
);
359 return outgoing_bl
.length();
362 void AsyncConnection::inject_delay() {
363 if (async_msgr
->cct
->_conf
->ms_inject_internal_delays
) {
364 ldout(async_msgr
->cct
, 10) << __func__
<< " sleep for " <<
365 async_msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
367 t
.set_from_double(async_msgr
->cct
->_conf
->ms_inject_internal_delays
);
372 bool AsyncConnection::inject_network_congestion() const {
373 return (async_msgr
->cct
->_conf
->ms_inject_network_congestion
> 0 &&
374 rand() % async_msgr
->cct
->_conf
->ms_inject_network_congestion
!= 0);
377 void AsyncConnection::process() {
378 std::lock_guard
<std::mutex
> l(lock
);
379 last_active
= ceph::coarse_mono_clock::now();
380 recv_start_time
= ceph::mono_clock::now();
382 ldout(async_msgr
->cct
, 20) << __func__
<< dendl
;
386 ldout(async_msgr
->cct
, 20) << __func__
<< " enter none state" << dendl
;
390 ldout(async_msgr
->cct
, 20) << __func__
<< " socket closed" << dendl
;
393 case STATE_CONNECTING
: {
394 ceph_assert(!policy
.server
);
396 // clear timer (if any) since we are connecting/re-connecting
398 center
->delete_time_event(last_tick_id
);
400 last_connect_started
= ceph::coarse_mono_clock::now();
401 last_tick_id
= center
->create_time_event(
402 connect_timeout_us
, tick_handler
);
405 center
->delete_file_event(cs
.fd(), EVENT_READABLE
| EVENT_WRITABLE
);
410 opts
.priority
= async_msgr
->get_socket_priority();
411 if (async_msgr
->cct
->_conf
->mon_use_min_delay_socket
) {
412 if (async_msgr
->get_mytype() == CEPH_ENTITY_TYPE_MON
&&
414 opts
.priority
= SOCKET_PRIORITY_MIN_DELAY
;
417 opts
.connect_bind_addr
= msgr
->get_myaddrs().front();
418 ssize_t r
= worker
->connect(target_addr
, opts
, &cs
);
424 center
->create_file_event(cs
.fd(), EVENT_READABLE
, read_handler
);
425 state
= STATE_CONNECTING_RE
;
427 case STATE_CONNECTING_RE
: {
428 ssize_t r
= cs
.is_connected();
430 ldout(async_msgr
->cct
, 1) << __func__
<< " reconnect failed to "
431 << target_addr
<< dendl
;
432 if (r
== -ECONNREFUSED
) {
433 ldout(async_msgr
->cct
, 2)
434 << __func__
<< " connection refused!" << dendl
;
435 dispatch_queue
->queue_refused(this);
440 ldout(async_msgr
->cct
, 10)
441 << __func__
<< " nonblock connect inprogress" << dendl
;
442 if (async_msgr
->get_stack()->nonblock_connect_need_writable_event()) {
443 center
->create_file_event(cs
.fd(), EVENT_WRITABLE
,
446 logger
->tinc(l_msgr_running_recv_time
,
447 ceph::mono_clock::now() - recv_start_time
);
451 center
->delete_file_event(cs
.fd(), EVENT_WRITABLE
);
452 ldout(async_msgr
->cct
, 10)
453 << __func__
<< " connect successfully, ready to send banner" << dendl
;
454 state
= STATE_CONNECTION_ESTABLISHED
;
458 case STATE_ACCEPTING
: {
459 center
->create_file_event(cs
.fd(), EVENT_READABLE
, read_handler
);
460 state
= STATE_CONNECTION_ESTABLISHED
;
461 if (async_msgr
->cct
->_conf
->mon_use_min_delay_socket
) {
462 if (async_msgr
->get_mytype() == CEPH_ENTITY_TYPE_MON
&&
464 cs
.set_priority(cs
.fd(), SOCKET_PRIORITY_MIN_DELAY
,
465 target_addr
.get_family());
471 case STATE_CONNECTION_ESTABLISHED
: {
472 if (pendingReadLen
) {
473 ssize_t r
= read(*pendingReadLen
, read_buffer
, readCallback
);
474 if (r
<= 0) { // read all bytes, or an error occured
475 pendingReadLen
.reset();
476 char *buf_tmp
= read_buffer
;
477 read_buffer
= nullptr;
478 readCallback(buf_tmp
, r
);
480 logger
->tinc(l_msgr_running_recv_time
,
481 ceph::mono_clock::now() - recv_start_time
);
488 protocol
->read_event();
490 logger
->tinc(l_msgr_running_recv_time
,
491 ceph::mono_clock::now() - recv_start_time
);
494 bool AsyncConnection::is_connected() {
495 return protocol
->is_connected();
498 void AsyncConnection::connect(const entity_addrvec_t
&addrs
, int type
,
499 entity_addr_t
&target
) {
501 std::lock_guard
<std::mutex
> l(lock
);
503 set_peer_addrs(addrs
);
504 policy
= msgr
->get_policy(type
);
505 target_addr
= target
;
509 void AsyncConnection::_connect()
511 ldout(async_msgr
->cct
, 10) << __func__
<< dendl
;
513 state
= STATE_CONNECTING
;
515 // rescheduler connection in order to avoid lock dep
516 // may called by external thread(send_message)
517 center
->dispatch_event_external(read_handler
);
520 void AsyncConnection::accept(ConnectedSocket socket
,
521 const entity_addr_t
&listen_addr
,
522 const entity_addr_t
&peer_addr
)
524 ldout(async_msgr
->cct
, 10) << __func__
<< " sd=" << socket
.fd()
525 << " listen_addr " << listen_addr
526 << " peer_addr " << peer_addr
<< dendl
;
527 ceph_assert(socket
.fd() >= 0);
529 std::lock_guard
<std::mutex
> l(lock
);
530 cs
= std::move(socket
);
531 socket_addr
= listen_addr
;
532 target_addr
= peer_addr
; // until we know better
533 state
= STATE_ACCEPTING
;
535 // rescheduler connection in order to avoid lock dep
536 center
->dispatch_event_external(read_handler
);
539 int AsyncConnection::send_message(Message
*m
)
541 FUNCTRACE(async_msgr
->cct
);
542 lgeneric_subdout(async_msgr
->cct
, ms
,
543 1) << "-- " << async_msgr
->get_myaddrs() << " --> "
544 << get_peer_addrs() << " -- "
545 << *m
<< " -- " << m
<< " con "
549 if (is_blackhole()) {
550 lgeneric_subdout(async_msgr
->cct
, ms
, 0) << __func__
<< ceph_entity_type_name(peer_type
)
551 << " blackhole " << *m
<< dendl
;
556 // optimistic think it's ok to encode(actually may broken now)
557 if (!m
->get_priority())
558 m
->set_priority(async_msgr
->get_default_send_priority());
560 m
->get_header().src
= async_msgr
->get_myname();
561 m
->set_connection(this);
563 #if defined(WITH_EVENTTRACE)
564 if (m
->get_type() == CEPH_MSG_OSD_OP
)
565 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OP_BEGIN", true);
566 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
567 OID_EVENT_TRACE_WITH_MSG(m
, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
570 if (is_loopback
) { //loopback connection
571 ldout(async_msgr
->cct
, 20) << __func__
<< " " << *m
<< " local" << dendl
;
572 std::lock_guard
<std::mutex
> l(write_lock
);
573 if (protocol
->is_connected()) {
574 dispatch_queue
->local_delivery(m
, m
->get_priority());
576 ldout(async_msgr
->cct
, 10) << __func__
<< " loopback connection closed."
577 << " Drop message " << m
<< dendl
;
583 // we don't want to consider local message here, it's too lightweight which
585 logger
->inc(l_msgr_send_messages
);
587 protocol
->send_message(m
);
591 entity_addr_t
AsyncConnection::_infer_target_addr(const entity_addrvec_t
& av
)
593 // pick the first addr of the same address family as socket_addr. it could be
594 // an any: or v2: addr, we don't care. it should not be a v1 addr.
595 for (auto& i
: av
.v
) {
599 if (i
.get_family() == socket_addr
.get_family()) {
600 ldout(async_msgr
->cct
,10) << __func__
<< " " << av
<< " -> " << i
<< dendl
;
604 ldout(async_msgr
->cct
,10) << __func__
<< " " << av
<< " -> nothing to match "
605 << socket_addr
<< dendl
;
609 void AsyncConnection::fault()
614 // queue delayed items immediately
616 delay_state
->flush();
618 recv_start
= recv_end
= 0;
623 void AsyncConnection::_stop() {
624 writeCallback
.reset();
625 dispatch_queue
->discard_queue(conn_id
);
626 async_msgr
->unregister_conn(this);
627 worker
->release_worker();
629 state
= STATE_CLOSED
;
633 // Make sure in-queue events will been processed
634 center
->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
637 bool AsyncConnection::is_queued() const {
638 return outgoing_bl
.length();
641 void AsyncConnection::shutdown_socket() {
642 for (auto &&t
: register_time_events
) center
->delete_time_event(t
);
643 register_time_events
.clear();
645 center
->delete_time_event(last_tick_id
);
649 center
->delete_file_event(cs
.fd(), EVENT_READABLE
| EVENT_WRITABLE
);
655 void AsyncConnection::DelayedDelivery::do_request(uint64_t id
)
657 Message
*m
= nullptr;
659 std::lock_guard
<std::mutex
> l(delay_lock
);
660 register_time_events
.erase(id
);
663 if (delay_queue
.empty())
665 m
= delay_queue
.front();
666 delay_queue
.pop_front();
668 if (msgr
->ms_can_fast_dispatch(m
)) {
669 dispatch_queue
->fast_dispatch(m
);
671 dispatch_queue
->enqueue(m
, m
->get_priority(), conn_id
);
675 void AsyncConnection::DelayedDelivery::discard() {
676 stop_dispatch
= true;
677 center
->submit_to(center
->get_id(),
679 std::lock_guard
<std::mutex
> l(delay_lock
);
680 while (!delay_queue
.empty()) {
681 Message
*m
= delay_queue
.front();
682 dispatch_queue
->dispatch_throttle_release(
683 m
->get_dispatch_throttle_size());
685 delay_queue
.pop_front();
687 for (auto i
: register_time_events
)
688 center
->delete_time_event(i
);
689 register_time_events
.clear();
690 stop_dispatch
= false;
695 void AsyncConnection::DelayedDelivery::flush() {
696 stop_dispatch
= true;
698 center
->get_id(), [this] () mutable {
699 std::lock_guard
<std::mutex
> l(delay_lock
);
700 while (!delay_queue
.empty()) {
701 Message
*m
= delay_queue
.front();
702 if (msgr
->ms_can_fast_dispatch(m
)) {
703 dispatch_queue
->fast_dispatch(m
);
705 dispatch_queue
->enqueue(m
, m
->get_priority(), conn_id
);
707 delay_queue
.pop_front();
709 for (auto i
: register_time_events
)
710 center
->delete_time_event(i
);
711 register_time_events
.clear();
712 stop_dispatch
= false;
716 void AsyncConnection::send_keepalive()
718 protocol
->send_keepalive();
721 void AsyncConnection::mark_down()
723 ldout(async_msgr
->cct
, 1) << __func__
<< dendl
;
724 std::lock_guard
<std::mutex
> l(lock
);
728 void AsyncConnection::handle_write()
730 ldout(async_msgr
->cct
, 10) << __func__
<< dendl
;
731 protocol
->write_event();
734 void AsyncConnection::handle_write_callback() {
735 std::lock_guard
<std::mutex
> l(lock
);
736 last_active
= ceph::coarse_mono_clock::now();
737 recv_start_time
= ceph::mono_clock::now();
740 auto callback
= *writeCallback
;
741 writeCallback
.reset();
749 void AsyncConnection::stop(bool queue_reset
) {
751 bool need_queue_reset
= (state
!= STATE_CLOSED
) && queue_reset
;
754 if (need_queue_reset
) dispatch_queue
->queue_reset(this);
757 void AsyncConnection::cleanup() {
760 delete write_handler
;
761 delete write_callback_handler
;
762 delete wakeup_handler
;
770 void AsyncConnection::wakeup_from(uint64_t id
)
773 register_time_events
.erase(id
);
778 void AsyncConnection::tick(uint64_t id
)
780 auto now
= ceph::coarse_mono_clock::now();
781 ldout(async_msgr
->cct
, 20) << __func__
<< " last_id=" << last_tick_id
782 << " last_active=" << last_active
<< dendl
;
783 std::lock_guard
<std::mutex
> l(lock
);
785 if (!is_connected()) {
786 if (connect_timeout_us
<=
787 (uint64_t)std::chrono::duration_cast
<std::chrono::microseconds
>
788 (now
- last_connect_started
).count()) {
789 ldout(async_msgr
->cct
, 1) << __func__
<< " see no progress in more than "
790 << connect_timeout_us
791 << " us during connecting to "
792 << target_addr
<< ", fault."
795 labeled_logger
->inc(l_msgr_connection_ready_timeouts
);
797 last_tick_id
= center
->create_time_event(connect_timeout_us
, tick_handler
);
800 auto idle_period
= std::chrono::duration_cast
<std::chrono::microseconds
>
801 (now
- last_active
).count();
802 if (inactive_timeout_us
< (uint64_t)idle_period
) {
803 ldout(async_msgr
->cct
, 1) << __func__
<< " idle (" << idle_period
804 << ") for more than " << inactive_timeout_us
808 labeled_logger
->inc(l_msgr_connection_idle_timeouts
);
810 last_tick_id
= center
->create_time_event(inactive_timeout_us
, tick_handler
);