]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/ProtocolV2.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
index 4b03f5ebcf46ce25550372a847cd9f09590f3241..8fc02db6e559a24616dbe49afe0daaca23a95306 100644 (file)
@@ -145,7 +145,7 @@ void ProtocolV2::reset_session() {
 
   connection->dispatch_queue->discard_queue(connection->conn_id);
   discard_out_queue();
-  connection->outcoming_bl.clear();
+  connection->outgoing_bl.clear();
 
   connection->dispatch_queue->queue_remote_reset(connection);
 
@@ -195,6 +195,7 @@ void ProtocolV2::requeue_sent() {
     ldout(cct, 5) << __func__ << " requeueing message m=" << m
                   << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
                   << *m << dendl;
+    m->clear_payload();
     rq.emplace_front(out_queue_entry_t{false, m});
   }
 }
@@ -221,12 +222,36 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   return count;
 }
 
-void ProtocolV2::reset_recv_state() {
+void ProtocolV2::reset_security() {
+  ldout(cct, 5) << __func__ << dendl;
+
   auth_meta.reset(new AuthConnectionMeta);
-  session_stream_handlers.tx.reset(nullptr);
   session_stream_handlers.rx.reset(nullptr);
-  pre_auth.txbuf.clear();
+  session_stream_handlers.tx.reset(nullptr);
   pre_auth.rxbuf.clear();
+  pre_auth.txbuf.clear();
+}
+
+// it's expected the `write_lock` is held while calling this method.
+void ProtocolV2::reset_recv_state() {
+  ldout(cct, 5) << __func__ << dendl;
+
+  if (!connection->center->in_thread()) {
+    // execute in the same thread that uses the rx/tx handlers. We need
+    // to do the warp because holding `write_lock` is not enough as
+    // `write_event()` unlocks it just before calling `write_message()`.
+    // `submit_to()` here is NOT blocking.
+    connection->center->submit_to(connection->center->get_id(), [this] {
+      ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
+                    << dendl;
+      // Possibly unnecessary. See the comment in `deactivate_existing`.
+      std::lock_guard<std::mutex> l(connection->lock);
+      std::lock_guard<std::mutex> wl(connection->write_lock);
+      reset_security();
+    }, /* always_async = */true);
+  } else {
+    reset_security();
+  }
 
   // clean read and write callbacks
   connection->pendingReadLen.reset();
@@ -383,13 +408,8 @@ void ProtocolV2::prepare_send_message(uint64_t features,
   ldout(cct, 20) << __func__ << " m=" << *m << dendl;
 
   // associate message with Connection (for benefit of encode_payload)
-  if (m->empty_payload()) {
-    ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
-                   << " " << *m << dendl;
-  } else {
-    ldout(cct, 20) << __func__ << " half-reencoding features " << features
-                   << " " << m << " " << *m << dendl;
-  }
+  ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
+                << features << " " << m  << " " << *m << dendl;
 
   // encode and copy out of *m
   m->encode(features, 0);
@@ -422,6 +442,7 @@ void ProtocolV2::send_message(Message *m) {
   } else {
     ldout(cct, 5) << __func__ << " enqueueing message m=" << m
                   << " type=" << m->get_type() << " " << *m << dendl;
+    m->queue_start = ceph::mono_clock::now();
     m->trace.event("async enqueueing message");
     out_queue[m->get_priority()].emplace_back(
       out_queue_entry_t{is_prepared, m});
@@ -512,7 +533,7 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
                             m->get_payload(),
                             m->get_middle(),
                             m->get_data());
-  connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
+  connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
 
   ldout(cct, 5) << __func__ << " sending message m=" << m
                 << " seq=" << m->get_seq() << " " << *m << dendl;
@@ -522,21 +543,24 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
                  << " src=" << entity_name_t(messenger->get_myname())
                  << " off=" << header2.data_off
                  << dendl;
-  ssize_t total_send_size = connection->outcoming_bl.length();
+  ssize_t total_send_size = connection->outgoing_bl.length();
   ssize_t rc = connection->_try_send(more);
   if (rc < 0) {
     ldout(cct, 1) << __func__ << " error sending " << m << ", "
                   << cpp_strerror(rc) << dendl;
   } else {
     connection->logger->inc(
-        l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+        l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
     ldout(cct, 10) << __func__ << " sending " << m
                    << (rc ? " continuely." : " done.") << dendl;
   }
+
+#if defined(WITH_EVENTTRACE)
   if (m->get_type() == CEPH_MSG_OSD_OP)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
+#endif
   m->put();
 
   return rc;
@@ -545,12 +569,12 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
 void ProtocolV2::append_keepalive() {
   ldout(cct, 10) << __func__ << dendl;
   auto keepalive_frame = KeepAliveFrame::Encode();
-  connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+  connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
 }
 
 void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
   auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
-  connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+  connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
 }
 
 void ProtocolV2::handle_message_ack(uint64_t seq) {
@@ -564,6 +588,7 @@ void ProtocolV2::handle_message_ack(uint64_t seq) {
   static const int max_pending = 128;
   int i = 0;
   Message *pending[max_pending];
+  auto now = ceph::mono_clock::now();
   connection->write_lock.lock();
   while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
     Message *m = sent.front();
@@ -574,6 +599,7 @@ void ProtocolV2::handle_message_ack(uint64_t seq) {
                    << dendl;
   }
   connection->write_lock.unlock();
+  connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
   for (int k = 0; k < i; k++) {
     pending[k]->put();
   }
@@ -611,6 +637,12 @@ void ProtocolV2::write_event() {
         prepare_send_message(connection->get_features(), out_entry.m);
       }
 
+      if (out_entry.m->queue_start != ceph::mono_time()) {
+        connection->logger->tinc(l_msgr_send_messages_queue_lat,
+                                ceph::mono_clock::now() -
+                                out_entry.m->queue_start);
+      }
+
       r = write_message(out_entry.m, more);
 
       connection->write_lock.lock();
@@ -619,8 +651,11 @@ void ProtocolV2::write_event() {
       } else if (r < 0) {
         ldout(cct, 1) << __func__ << " send msg failed" << dendl;
         break;
-      } else if (r > 0)
+      } else if (r > 0) {
+       // Outbound message in-progress, thread will be re-awoken
+       // when the outbound socket is writeable again
         break;
+      }
     } while (can_write);
     write_in_progress = false;
 
@@ -628,10 +663,8 @@ void ProtocolV2::write_event() {
     if (r == 0) {
       uint64_t left = ack_left;
       if (left) {
-        ceph_le64 s;
-        s = in_seq;
         auto ack = AckFrame::Encode(in_seq);
-        connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
+        connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
         ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
                        << " messages" << dendl;
         ack_left -= left;
@@ -791,7 +824,7 @@ CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
 }
 
 CtPtr ProtocolV2::_wait_for_peer_banner() {
-  unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+  unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
   return READ(banner_len, _handle_peer_banner);
 }
 
@@ -819,7 +852,7 @@ CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
   uint16_t payload_len;
   bufferlist bl;
   buffer->set_offset(banner_prefix_len);
-  buffer->set_length(sizeof(__le16));
+  buffer->set_length(sizeof(ceph_le16));
   bl.push_back(std::move(buffer));
   auto ti = bl.cbegin();
   try {
@@ -1357,8 +1390,8 @@ CtPtr ProtocolV2::handle_message() {
   ldout(cct, 20) << __func__ << dendl;
   ceph_assert(state == THROTTLE_DONE);
 
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ltt_recv_stamp = ceph_clock_now();
+#if defined(WITH_EVENTTRACE)
+  utime_t ltt_recv_stamp = ceph_clock_now();
 #endif
   recv_stamp = ceph_clock_now();
 
@@ -1446,7 +1479,7 @@ CtPtr ProtocolV2::handle_message() {
     }
   }
 
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+#if defined(WITH_EVENTTRACE)
   if (message->get_type() == CEPH_MSG_OSD_OP ||
       message->get_type() == CEPH_MSG_OSD_OPREPLY) {
     utime_t ltt_processed_stamp = ceph_clock_now();
@@ -1477,15 +1510,23 @@ CtPtr ProtocolV2::handle_message() {
 
   state = READY;
 
+  ceph::mono_time fast_dispatch_time;
+
+  if (connection->is_blackhole()) {
+    ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
+    message->put();
+    goto out;
+  }
+
   connection->logger->inc(l_msgr_recv_messages);
   connection->logger->inc(
       l_msgr_recv_bytes,
       cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
 
   messenger->ms_fast_preprocess(message);
-  auto fast_dispatch_time = ceph::mono_clock::now();
+  fast_dispatch_time = ceph::mono_clock::now();
   connection->logger->tinc(l_msgr_running_recv_time,
-                           fast_dispatch_time - connection->recv_start_time);
+                          fast_dispatch_time - connection->recv_start_time);
   if (connection->delay_state) {
     double delay_period = 0;
     if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
@@ -1503,6 +1544,12 @@ CtPtr ProtocolV2::handle_message() {
     connection->logger->tinc(l_msgr_running_fast_dispatch_time,
                              connection->recv_start_time - fast_dispatch_time);
     connection->lock.lock();
+    // we might have been reused by another connection
+    // let's check if that is the case
+    if (state != READY) {
+      // yes, that was the case, let's do nothing
+      return nullptr;
+    }
   } else {
     connection->dispatch_queue->enqueue(message, message->get_priority(),
                                         connection->conn_id);
@@ -1510,13 +1557,7 @@ CtPtr ProtocolV2::handle_message() {
 
   handle_message_ack(current_header.ack_seq);
 
-  // we might have been reused by another connection
-  // let's check if that is the case
-  if (state != READY) {
-    // yes, that was the case, let's do nothing
-    return nullptr;
-  }
-
+ out:
   if (need_dispatch_writer && connection->is_connected()) {
     connection->center->dispatch_event_external(connection->write_handler);
   }
@@ -1700,9 +1741,9 @@ CtPtr ProtocolV2::post_client_banner_exchange() {
 }
 
 CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
+  ceph_assert(messenger->auth_client);
   ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
                 << " auth_client " << messenger->auth_client << dendl;
-  ceph_assert(messenger->auth_client);
 
   bufferlist bl;
   vector<uint32_t> preferred_modes;
@@ -2271,7 +2312,7 @@ CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
     // this happened at client side
     return finish_client_auth();
   } else {
-    ceph_assert_always("state corruption" == nullptr);
+    ceph_abort("state corruption");
   }
 }
 
@@ -2336,34 +2377,41 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
 
   peer_global_seq = client_ident.global_seq();
 
-  // Looks good so far, let's check if there is already an existing connection
-  // to this peer.
-
-  connection->lock.unlock();
-  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
-
-  if (existing &&
-      existing->protocol->proto_type != 2) {
-    ldout(cct,1) << __func__ << " existing " << existing << " proto "
-                << existing->protocol.get() << " version is "
-                << existing->protocol->proto_type << ", marking down" << dendl;
-    existing->mark_down();
-    existing = nullptr;
-  }
+  if (connection->policy.server &&
+      connection->policy.lossy &&
+      !connection->policy.register_lossy_clients) {
+    // incoming lossy client, no need to register this connection
+  } else {
+    // Looks good so far, let's check if there is already an existing connection
+    // to this peer.
+    connection->lock.unlock();
+    AsyncConnectionRef existing = messenger->lookup_conn(
+      *connection->peer_addrs);
+
+    if (existing &&
+       existing->protocol->proto_type != 2) {
+      ldout(cct,1) << __func__ << " existing " << existing << " proto "
+                  << existing->protocol.get() << " version is "
+                  << existing->protocol->proto_type << ", marking down"
+                  << dendl;
+      existing->mark_down();
+      existing = nullptr;
+    }
 
-  connection->inject_delay();
+    connection->inject_delay();
 
-  connection->lock.lock();
-  if (state != SESSION_ACCEPTING) {
-    ldout(cct, 1) << __func__
-                  << " state changed while accept, it must be mark_down"
-                  << dendl;
-    ceph_assert(state == CLOSED);
-    return _fault();
-  }
+    connection->lock.lock();
+    if (state != SESSION_ACCEPTING) {
+      ldout(cct, 1) << __func__
+                   << " state changed while accept, it must be mark_down"
+                   << dendl;
+      ceph_assert(state == CLOSED);
+      return _fault();
+    }
 
-  if (existing) {
-    return handle_existing_connection(existing);
+    if (existing) {
+      return handle_existing_connection(existing);
+    }
   }
 
   // if everything is OK reply with server identification
@@ -2530,7 +2578,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
   return reuse_connection(existing, exproto);
 }
 
-CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
+CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
   ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
 
   std::lock_guard<std::mutex> l(existing->lock);
@@ -2631,7 +2679,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
   }
 }
 
-CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
+CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
                                    ProtocolV2 *exproto) {
   ldout(cct, 20) << __func__ << " existing=" << existing
                  << " reconnect=" << reconnecting << dendl;
@@ -2658,16 +2706,19 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   }
   exproto->peer_global_seq = peer_global_seq;
 
+  ceph_assert(connection->center->in_thread());
   auto temp_cs = std::move(connection->cs);
   EventCenter *new_center = connection->center;
   Worker *new_worker = connection->worker;
+  // we can steal the session_stream_handlers under the assumption
+  // this happens in the event center's thread as there should be
+  // no user outside its boundaries (simlarly to e.g. outgoing_bl).
+  auto temp_stream_handlers = std::move(session_stream_handlers);
+  exproto->auth_meta = auth_meta;
 
   ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
                            << dendl;
 
-  std::swap(exproto->session_stream_handlers, session_stream_handlers);
-  exproto->auth_meta = auth_meta;
-
   // avoid _stop shutdown replacing socket
   // queue a reset on the new connection, which we're dumping for the old
   stop();
@@ -2688,14 +2739,25 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   ceph_assert(connection->recv_start == connection->recv_end);
 
   auto deactivate_existing = std::bind(
-      [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
+      [ existing,
+        new_worker,
+        new_center,
+        exproto,
+        temp_stream_handlers=std::move(temp_stream_handlers)
+      ](ConnectedSocket &cs) mutable {
         // we need to delete time event in original thread
         {
           std::lock_guard<std::mutex> l(existing->lock);
           existing->write_lock.lock();
           exproto->requeue_sent();
-          existing->outcoming_bl.clear();
+          // XXX: do we really need the locking for `outgoing_bl`? There is
+          // a comment just above its definition saying "lockfree, only used
+          // in own thread". I'm following lockfull schema just in the case.
+          // From performance point of view it should be fine – this happens
+          // far away from hot paths.
+          existing->outgoing_bl.clear();
           existing->open_write = false;
+          exproto->session_stream_handlers = std::move(temp_stream_handlers);
           existing->write_lock.unlock();
           if (exproto->state == NONE) {
             existing->shutdown_socket();