]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/ProtocolV1.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / msg / async / ProtocolV1.cc
index 714eff74df8a775abd0dd03fb7b093a3a9271290..1911d31e7e28746a8e0249a0920b80ed726301a4 100644 (file)
@@ -246,7 +246,8 @@ void ProtocolV1::send_message(Message *m) {
     out_q[m->get_priority()].emplace_back(std::move(bl), m);
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
-    if (can_write != WriteStatus::REPLACING) {
+    if (can_write != WriteStatus::REPLACING && !write_in_progress) {
+      write_in_progress = true;
       connection->center->dispatch_event_external(connection->write_handler);
     }
   }
@@ -352,6 +353,7 @@ void ProtocolV1::write_event() {
       } else if (r > 0)
         break;
     } while (can_write == WriteStatus::CANWRITE);
+    write_in_progress = false;
     connection->write_lock.unlock();
 
     // if r > 0 mean data still lefted, so no need _try_send.
@@ -382,6 +384,7 @@ void ProtocolV1::write_event() {
       return;
     }
   } else {
+    write_in_progress = false;
     connection->write_lock.unlock();
     connection->lock.lock();
     connection->write_lock.lock();
@@ -972,8 +975,6 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
     }
   }
 
-  message->set_connection(connection);
-
 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
   if (message->get_type() == CEPH_MSG_OSD_OP ||
       message->get_type() == CEPH_MSG_OSD_OPREPLY) {
@@ -1175,6 +1176,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
 }
 
 void ProtocolV1::requeue_sent() {
+  write_in_progress = false;
   if (sent.empty()) {
     return;
   }
@@ -1234,6 +1236,7 @@ void ProtocolV1::discard_out_queue() {
     }
   }
   out_q.clear();
+  write_in_progress = false;
 }
 
 void ProtocolV1::reset_recv_state() {
@@ -2260,6 +2263,7 @@ CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
         << __func__ << " stop myself to swap existing" << dendl;
     exproto->can_write = WriteStatus::REPLACING;
     exproto->replacing = true;
+    exproto->write_in_progress = false;
     existing->state_offset = 0;
     // avoid previous thread modify event
     exproto->state = NONE;