]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/AsyncConnection.cc
update sources to 12.2.7
[ceph.git] / ceph / src / msg / async / AsyncConnection.cc
index 8eec5968458529a4a41b2a056fcdc6ab08ab4dda..80231cccf4b9b3a1f8e832eb8af3538f02b753a5 100644 (file)
@@ -122,9 +122,9 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
                                  Worker *w)
   : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
     logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
-    out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
+    state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
     dispatch_queue(q), can_write(WriteStatus::NOWRITE),
-    open_write(false), keepalive(false), recv_buf(NULL),
+    keepalive(false), recv_buf(NULL),
     recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0),
     last_active(ceph::coarse_mono_clock::now()),
@@ -203,6 +203,7 @@ ssize_t AsyncConnection::_try_send(bool more)
     }
   }
 
+  assert(center->in_thread());
   ssize_t r = cs.send(outcoming_bl, more);
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
@@ -213,21 +214,13 @@ ssize_t AsyncConnection::_try_send(bool more)
                              << " remaining bytes " << outcoming_bl.length() << dendl;
 
   if (!open_write && is_queued()) {
-    if (center->in_thread()) {
-      center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
-      open_write = true;
-    } else {
-      center->dispatch_event_external(write_handler);
-    }
+    center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
+    open_write = true;
   }
 
   if (open_write && !is_queued()) {
-    if (center->in_thread()) {
-      center->delete_file_event(cs.fd(), EVENT_WRITABLE);
-      open_write = false;
-    } else {
-      center->dispatch_event_external(write_handler);
-    }
+    center->delete_file_event(cs.fd(), EVENT_WRITABLE);
+    open_write = false;
     if (state_after_send != STATE_NONE)
       center->dispatch_event_external(read_handler);
   }
@@ -336,6 +329,7 @@ void AsyncConnection::process()
   bool need_dispatch_writer = false;
   std::lock_guard<std::mutex> l(lock);
   last_active = ceph::coarse_mono_clock::now();
+  auto recv_start_time = ceph::mono_clock::now();
   do {
     ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
     prev_state = state;
@@ -437,6 +431,7 @@ void AsyncConnection::process()
 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
           ltt_recv_stamp = ceph_clock_now();
 #endif
+          recv_stamp = ceph_clock_now();
           ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl;
           ceph_msg_header header;
           ceph_msg_header_old oldheader;
@@ -492,7 +487,6 @@ void AsyncConnection::process()
           front.clear();
           middle.clear();
           data.clear();
-          recv_stamp = ceph_clock_now();
           current_header = header;
           state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
           break;
@@ -734,7 +728,7 @@ void AsyncConnection::process()
           // side queueing because messages can't be renumbered, but the (kernel) client will
           // occasionally pull a message out of the sent queue to send elsewhere.  in that case
           // it doesn't matter if we "got" it or not.
-          uint64_t cur_seq = in_seq.read();
+          uint64_t cur_seq = in_seq;
           if (message->get_seq() <= cur_seq) {
             ldout(async_msgr->cct,0) << __func__ << " got old message "
                     << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
@@ -766,13 +760,13 @@ void AsyncConnection::process()
 #endif
 
           // note last received message.
-          in_seq.set(message->get_seq());
+          in_seq = message->get_seq();
          ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
                                     << message->get_seq() << " " << message
                                    << " " << *message << dendl;
 
           if (!policy.lossy) {
-            ack_left.inc();
+            ack_left++;
             need_dispatch_writer = true;
           }
           state = STATE_OPEN;
@@ -781,6 +775,8 @@ void AsyncConnection::process()
           logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
 
           async_msgr->ms_fast_preprocess(message);
+          auto fast_dispatch_time = ceph::mono_clock::now();
+          logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time);
           if (delay_state) {
             utime_t release = message->get_recv_stamp();
             double delay_period = 0;
@@ -794,6 +790,9 @@ void AsyncConnection::process()
           } else if (async_msgr->ms_can_fast_dispatch(message)) {
             lock.unlock();
             dispatch_queue->fast_dispatch(message);
+            recv_start_time = ceph::mono_clock::now();
+            logger->tinc(l_msgr_running_fast_dispatch_time,
+                         recv_start_time - fast_dispatch_time);
             lock.lock();
           } else {
             dispatch_queue->enqueue(message, message->get_priority(), conn_id);
@@ -845,6 +844,8 @@ void AsyncConnection::process()
 
   if (need_dispatch_writer && is_connected())
     center->dispatch_event_external(write_handler);
+
+  logger->tinc(l_msgr_running_recv_time, ceph::mono_clock::now() - recv_start_time);
   return;
 
  fail:
@@ -976,16 +977,17 @@ ssize_t AsyncConnection::_process_connection()
                                 << " not " << peer_addr
                                 << " - presumably this is the same node!" << dendl;
           } else {
-            ldout(async_msgr->cct, 0) << __func__ << " connect claims to be "
-                                << paddr << " not " << peer_addr << " - wrong node!" << dendl;
-            goto fail;
+            ldout(async_msgr->cct, 10) << __func__ << " connect claims to be "
+                                      << paddr << " not " << peer_addr << dendl;
+           goto fail;
           }
         }
 
         ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
         lock.unlock();
         async_msgr->learned_addr(peer_addr_for_me);
-        if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+        if (async_msgr->cct->_conf->ms_inject_internal_delays
+            && async_msgr->cct->_conf->ms_inject_socket_failures) {
           if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
             ldout(msgr->cct, 10) << __func__ << " sleep for "
                                  << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
@@ -1024,8 +1026,7 @@ ssize_t AsyncConnection::_process_connection()
 
     case STATE_CONNECTING_SEND_CONNECT_MSG:
       {
-        if (!got_bad_auth) {
-          delete authorizer;
+        if (!authorizer) {
           authorizer = async_msgr->get_authorizer(peer_type, false);
         }
         bufferlist bl;
@@ -1105,7 +1106,15 @@ ssize_t AsyncConnection::_process_connection()
           }
 
           authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
-          bufferlist::iterator iter = authorizer_reply.begin();
+
+         if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
+           ldout(async_msgr->cct,10) << __func__ << " connect got auth challenge" << dendl;
+           authorizer->add_challenge(async_msgr->cct, authorizer_reply);
+           state = STATE_CONNECTING_SEND_CONNECT_MSG;
+           break;
+         }
+
+          auto iter = authorizer_reply.begin();
           if (authorizer && !authorizer->verify_reply(iter)) {
             ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
             goto fail;
@@ -1134,7 +1143,7 @@ ssize_t AsyncConnection::_process_connection()
 
         newly_acked_seq = *((uint64_t*)state_buffer);
         ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
-                            << " vs out_seq " << out_seq.read() << dendl;
+                            << " vs out_seq " << out_seq << dendl;
         discard_requeued_up_to(newly_acked_seq);
         //while (newly_acked_seq > out_seq.read()) {
         //  Message *m = _get_next_outgoing(NULL);
@@ -1147,7 +1156,7 @@ ssize_t AsyncConnection::_process_connection()
         //}
 
         bufferlist bl;
-        uint64_t s = in_seq.read();
+        uint64_t s = in_seq;
         bl.append((char*)&s, sizeof(s));
         r = try_send(bl);
         if (r == 0) {
@@ -1418,6 +1427,8 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
   if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
     ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
     was_session_reset();
+    // see was_session_reset
+    outcoming_bl.clear();
     state = STATE_CONNECTING_SEND_CONNECT_MSG;
   }
   if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
@@ -1481,20 +1492,32 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   // require signatures for cephx?
   if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
     if (peer_type == CEPH_ENTITY_TYPE_OSD ||
-        peer_type == CEPH_ENTITY_TYPE_MDS) {
+        peer_type == CEPH_ENTITY_TYPE_MDS ||
+       peer_type == CEPH_ENTITY_TYPE_MGR) {
       if (async_msgr->cct->_conf->cephx_require_signatures ||
           async_msgr->cct->_conf->cephx_cluster_require_signatures) {
         ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
       }
+      if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
+         async_msgr->cct->_conf->cephx_cluster_require_version >= 2) {
+        ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for cluster" << dendl;
+        policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
+      }
     } else {
       if (async_msgr->cct->_conf->cephx_require_signatures ||
           async_msgr->cct->_conf->cephx_service_require_signatures) {
         ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl;
         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
       }
+      if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
+         async_msgr->cct->_conf->cephx_service_require_version >= 2) {
+        ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for service" << dendl;
+        policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
+      }
     }
   }
+
   uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
   if (feat_missing) {
     ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
@@ -1505,12 +1528,26 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   lock.unlock();
 
   bool authorizer_valid;
-  if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
-                               authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
+  bool need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2);
+  bool had_challenge = (bool)authorizer_challenge;
+  if (!async_msgr->verify_authorizer(
+       this, peer_type, connect.authorizer_protocol, authorizer_bl,
+       authorizer_reply, authorizer_valid, session_key,
+       need_challenge ? &authorizer_challenge : nullptr) ||
+      !authorizer_valid) {
     lock.lock();
-    ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
+    char tag;
+    if (need_challenge && !had_challenge && authorizer_challenge) {
+      ldout(async_msgr->cct,0) << __func__ << ": challenging authorizer"
+                              << dendl;
+      assert(authorizer_reply.length());
+      tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
+    } else {
+      ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
+      tag = CEPH_MSGR_TAG_BADAUTHORIZER;
+    }
     session_security.reset();
-    return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply);
+    return _reply_accept(tag, connect, reply, authorizer_reply);
   }
 
   // We've verified the authorizer for this AsyncConnection, so set up the session security structure.  PLR
@@ -1535,8 +1572,15 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     // connection's lock
     existing->lock.lock();  // skip lockdep check (we are locking a second AsyncConnection here)
 
-    if (existing->replacing || existing->state == STATE_CLOSED) {
-      ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
+    if (existing->state == STATE_CLOSED) {
+      ldout(async_msgr->cct, 1) << __func__ << " existing already closed." << dendl;
+      existing->lock.unlock();
+      existing = NULL;
+      goto open;
+    }
+
+    if (existing->replacing) {
+      ldout(async_msgr->cct, 1) << __func__ << " existing racing replace happened while replacing."
                                 << " existing_state=" << get_state_name(existing->state) << dendl;
       reply.global_seq = existing->peer_global_seq;
       r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
@@ -1672,13 +1716,10 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
 
     center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
 
-    // Clean up output buffer
-    existing->outcoming_bl.clear();
     if (existing->delay_state) {
       existing->delay_state->flush();
       assert(!delay_state);
     }
-    existing->requeue_sent();
     existing->reset_recv_state();
 
     auto temp_cs = std::move(cs);
@@ -1691,7 +1732,6 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     dispatch_queue->queue_reset(this);
     ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
     existing->can_write = WriteStatus::REPLACING;
-    existing->open_write = false;
     existing->replacing = true;
     existing->state_offset = 0;
     // avoid previous thread modify event
@@ -1701,11 +1741,18 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     // there shouldn't exist any buffer
     assert(recv_start == recv_end);
 
+    existing->authorizer_challenge.reset();
+
     auto deactivate_existing = std::bind(
         [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
       // we need to delete time event in original thread
       {
         std::lock_guard<std::mutex> l(existing->lock);
+        existing->write_lock.lock();
+        existing->requeue_sent();
+        existing->outcoming_bl.clear();
+        existing->open_write = false;
+        existing->write_lock.unlock();
         if (existing->state == STATE_NONE) {
           existing->shutdown_socket();
           existing->cs = std::move(cs);
@@ -1763,7 +1810,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
-                             << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl;
+                             << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
 
   int next_state;
 
@@ -1776,7 +1823,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     next_state = STATE_ACCEPTING_READY;
     discard_requeued_up_to(0);
     is_reset_from_peer = false;
-    in_seq.set(0);
+    in_seq = 0;
   }
 
   // send READY reply
@@ -1801,7 +1848,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
 
   if (reply.tag == CEPH_MSGR_TAG_SEQ) {
-    uint64_t s = in_seq.read();
+    uint64_t s = in_seq;
     reply_bl.append((char*)&s, sizeof(s));
   }
 
@@ -1821,7 +1868,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   }
   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
     ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
-    assert(state == STATE_CLOSED);
+    assert(state == STATE_CLOSED || state == STATE_NONE);
     goto fail_registered;
   }
 
@@ -1934,16 +1981,7 @@ int AsyncConnection::send_message(Message *m)
     ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
                               << f << " != " << get_features() << dendl;
   }
-  if (!is_queued() && can_write == WriteStatus::CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) {
-    if (!bl.length())
-      prepare_send_message(get_features(), m, bl);
-    logger->inc(l_msgr_send_messages_inline);
-    if (write_message(m, bl, false) < 0) {
-      ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
-      // we want to handle fault within internal thread
-      center->dispatch_event_external(write_handler);
-    }
-  } else if (can_write == WriteStatus::CLOSED) {
+  if (can_write == WriteStatus::CLOSED) {
     ldout(async_msgr->cct, 10) << __func__ << " connection closed."
                                << " Drop message " << m << dendl;
     m->put();
@@ -1969,7 +2007,7 @@ void AsyncConnection::requeue_sent()
     ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
                                << " (" << m->get_seq() << ")" << dendl;
     rq.push_front(make_pair(bufferlist(), m));
-    out_seq.dec();
+    out_seq--;
   }
 }
 
@@ -1988,7 +2026,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq)
                          << " <= " << seq << ", discarding" << dendl;
     p.second->put();
     rq.pop_front();
-    out_seq.inc();
+    out_seq++;
   }
   if (rq.empty())
     out_q.erase(CEPH_MSG_PRIO_HIGHEST);
@@ -1996,7 +2034,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq)
 
 /*
  * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue
- * Must hold pipe_lock prior to calling.
+ * Must hold write_lock prior to calling.
  */
 void AsyncConnection::discard_out_queue()
 {
@@ -2013,7 +2051,6 @@ void AsyncConnection::discard_out_queue()
       r->second->put();
     }
   out_q.clear();
-  outcoming_bl.clear();
 }
 
 int AsyncConnection::randomize_out_seq()
@@ -2025,11 +2062,11 @@ int AsyncConnection::randomize_out_seq()
     int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
     rand_seq &= SEQ_MASK;
     lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
-    out_seq.set(rand_seq);
+    out_seq = rand_seq;
     return seq_error;
   } else {
     // previously, seq #'s always started at 0.
-    out_seq.set(0);
+    out_seq = 0;
     return 0;
   }
 }
@@ -2065,7 +2102,7 @@ void AsyncConnection::fault()
   outcoming_bl.clear();
   if (!once_ready && !is_queued() &&
       state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
-    ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
+    ldout(async_msgr->cct, 10) << __func__ << " with nothing to send and in the half "
                               << " accept state just closed" << dendl;
     write_lock.unlock();
     _stop();
@@ -2074,7 +2111,7 @@ void AsyncConnection::fault()
   }
   reset_recv_state();
   if (policy.standby && !is_queued() && state != STATE_WAIT) {
-    ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
+    ldout(async_msgr->cct, 10) << __func__ << " with nothing to send, going to standby" << dendl;
     state = STATE_STANDBY;
     write_lock.unlock();
     return;
@@ -2121,17 +2158,20 @@ void AsyncConnection::was_session_reset()
     delay_state->discard();
   dispatch_queue->discard_queue(conn_id);
   discard_out_queue();
+  // note: we need to clear outcoming_bl here, but was_session_reset may be
+  // called by other thread, so let caller clear this itself!
+  // outcoming_bl.clear();
 
   dispatch_queue->queue_remote_reset(this);
 
   if (randomize_out_seq()) {
-    ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
+    ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
   }
 
-  in_seq.set(0);
+  in_seq = 0;
   connect_seq = 0;
   // it's safe to directly set 0, double locked
-  ack_left.set(0);
+  ack_left = 0;
   once_ready = false;
   can_write = WriteStatus::NOWRITE;
 }
@@ -2184,14 +2224,8 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer
 ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
 {
   FUNCTRACE();
-  assert(can_write == WriteStatus::CANWRITE);
-  m->set_seq(out_seq.inc());
-
-  if (!policy.lossy) {
-    // put on sent list
-    sent.push_back(m);
-    m->get();
-  }
+  assert(center->in_thread());
+  m->set_seq(++out_seq);
 
   if (msgr->crcflags & MSG_CRC_HEADER)
     m->calc_header_crc();
@@ -2242,9 +2276,8 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
                              << " off " << header.data_off << dendl;
 
   if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
-    std::list<buffer::ptr>::const_iterator pb;
-    for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) {
-      outcoming_bl.append((char*)pb->c_str(), pb->length());
+    for (const auto &pb : bl.buffers()) {
+      outcoming_bl.append((char*)pb.c_str(), pb.length());
     }
   } else {
     outcoming_bl.claim_append(bl);  
@@ -2268,16 +2301,18 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
   }
 
   m->trace.event("async writing message");
-  logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len);
   ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
                              << " " << m << dendl;
+  ssize_t total_send_size = outcoming_bl.length();
   ssize_t rc = _try_send(more);
   if (rc < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
                               << cpp_strerror(rc) << dendl;
   } else if (rc == 0) {
+    logger->inc(l_msgr_send_bytes, total_send_size - original_bl_len);
     ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
   } else {
+    logger->inc(l_msgr_send_bytes, total_send_size - outcoming_bl.length());
     ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
   }
   if (m->get_type() == CEPH_MSG_OSD_OP)
@@ -2441,41 +2476,52 @@ void AsyncConnection::handle_write()
       keepalive = false;
     }
 
-    while (1) {
+    auto start = ceph::mono_clock::now();
+    bool more;
+    do {
       bufferlist data;
       Message *m = _get_next_outgoing(&data);
       if (!m)
         break;
 
+      if (!policy.lossy) {
+        // put on sent list
+        sent.push_back(m);
+        m->get();
+      }
+      more = _has_next_outgoing();
+      write_lock.unlock();
+
       // send_message or requeue messages may not encode message
       if (!data.length())
         prepare_send_message(get_features(), m, data);
 
-      r = write_message(m, data, _has_next_outgoing());
+      r = write_message(m, data, more);
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
-        write_lock.unlock();
         goto fail;
-      } else if (r > 0) {
-        break;
       }
-    }
+      write_lock.lock();
+      if (r > 0)
+        break;
+    } while (can_write == WriteStatus::CANWRITE);
+    write_lock.unlock();
 
-    uint64_t left = ack_left.read();
+    uint64_t left = ack_left;
     if (left) {
       ceph_le64 s;
-      s = in_seq.read();
+      s = in_seq;
       outcoming_bl.append(CEPH_MSGR_TAG_ACK);
       outcoming_bl.append((char*)&s, sizeof(s));
       ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
-      ack_left.sub(left);
-      left = ack_left.read();
+      ack_left -= left;
+      left = ack_left;
       r = _try_send(left);
     } else if (is_queued()) {
       r = _try_send();
     }
 
-    write_lock.unlock();
+    logger->tinc(l_msgr_running_send_time, ceph::mono_clock::now() - start);
     if (r < 0) {
       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
       goto fail;
@@ -2522,7 +2568,6 @@ void AsyncConnection::tick(uint64_t id)
   auto now = ceph::coarse_mono_clock::now();
   ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
                              << " last_active" << last_active << dendl;
-  assert(last_tick_id == id);
   std::lock_guard<std::mutex> l(lock);
   last_tick_id = 0;
   auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();