]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/heartbeat.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / osd / heartbeat.cc
index a728c327fcc867730ce901ddc4acf3599f29050d..266e56533c3bb1579afdf26fc2db9ce87aa0d42e 100644 (file)
@@ -236,8 +236,11 @@ void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replac
   }
 }
 
-void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_connect(
+    crimson::net::ConnectionRef conn,
+    seastar::shard_id prv_shard)
 {
+  ceph_assert_always(seastar::this_shard_id() == prv_shard);
   auto peer = conn->get_peer_id();
   if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
       peer == entity_name_t::NEW) {
@@ -249,8 +252,12 @@ void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
   }
 }
 
-void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_accept(
+    crimson::net::ConnectionRef conn,
+    seastar::shard_id prv_shard,
+    bool is_replace)
 {
+  ceph_assert_always(seastar::this_shard_id() == prv_shard);
   auto peer = conn->get_peer_id();
   if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
       peer == entity_name_t::NEW) {
@@ -258,7 +265,7 @@ void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
   }
   if (auto found = peers.find(peer);
       found != peers.end()) {
-    found->second.handle_accept(conn);
+    found->second.handle_accept(conn, is_replace);
   }
 }
 
@@ -303,40 +310,38 @@ seastar::future<> Heartbeat::maybe_share_osdmap(
   Ref<MOSDPing> m)
 {
   const osd_id_t from = m->get_source().num();
-  const epoch_t osdmap_epoch = service.get_map()->get_epoch();
-  const epoch_t peer_epoch = m->map_epoch;
+  const epoch_t current_osdmap_epoch = service.get_map()->get_epoch();
   auto found = peers.find(from);
   if (found == peers.end()) {
     return seastar::now();
   }
   auto& peer = found->second;
 
-  if (peer_epoch > peer.get_last_epoch_sent()) {
-    logger().debug("{} updating session's last epoch sent "
-                   "from {} to peer's (id: {}) map epoch of {}",
-                   __func__, peer.get_last_epoch_sent(),
-                   from, peer_epoch);
-    peer.set_last_epoch_sent(peer_epoch);
+  if (m->map_epoch > peer.get_projected_epoch()) {
+    logger().debug("{} updating peer {} session's projected_epoch"
+                   "from {} to ping map epoch of {}",
+                   __func__, from, peer.get_projected_epoch(),
+                   m->map_epoch);
+    peer.set_projected_epoch(m->map_epoch);
   }
 
-  if (osdmap_epoch <= peer.get_last_epoch_sent()) {
-    logger().info("{} latest epoch sent {} is already later "
-                  "than osdmap epoch of {}",
-                  __func__ , peer.get_last_epoch_sent(),
-                  osdmap_epoch);
+  if (current_osdmap_epoch <= peer.get_projected_epoch()) {
+    logger().debug("{} peer {} projected_epoch {} is already later "
+                  "than our osdmap epoch of {}",
+                  __func__ , from, peer.get_projected_epoch(),
+                  current_osdmap_epoch);
     return seastar::now();
   }
 
-  logger().info("{} peer id: {} epoch is {} while osdmap is {}",
-                __func__ , from, m->map_epoch, osdmap_epoch);
-  if (osdmap_epoch > m->map_epoch) {
-    logger().debug("{} sharing osdmap epoch of {} with peer id {}",
-                   __func__, osdmap_epoch, from);
-    // Peer's newest map is m->map_epoch. Therfore it misses
-    // the osdmaps in the range of `m->map_epoch` to `osdmap_epoch`.
-    return service.send_incremental_map_to_osd(from, m->map_epoch);
-  }
-  return seastar::now();
+  const epoch_t send_from = peer.get_projected_epoch();
+  logger().debug("{} sending peer {} peer maps from projected epoch {} through "
+                "local osdmap epoch {}",
+                __func__,
+                from,
+                send_from,
+                current_osdmap_epoch);
+  peer.set_projected_epoch(current_osdmap_epoch);
+  return service.send_incremental_map_to_osd(from, send_from);
 }
 
 seastar::future<> Heartbeat::handle_reply(crimson::net::ConnectionRef conn,
@@ -429,42 +434,57 @@ bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn) const
   return (conn && conn == _conn);
 }
 
-void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+bool Heartbeat::Connection::accepted(
+    crimson::net::ConnectionRef accepted_conn,
+    bool is_replace)
 {
-  if (!conn) {
-    if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
-      logger().info("Heartbeat::Connection::accepted(): "
-                    "{} racing resolved", *this);
-      conn = accepted_conn;
-      set_connected();
+  ceph_assert(accepted_conn);
+  ceph_assert(accepted_conn != conn);
+  if (accepted_conn->get_peer_addr() != listener.get_peer_addr(type)) {
+    return false;
+  }
+
+  if (is_replace) {
+    logger().info("Heartbeat::Connection::accepted(): "
+                  "{} racing", *this);
+    racing_detected = true;
+  }
+  if (conn) {
+    // there is no assumption about the ordering of the reset and accept
+    // events for the 2 racing connections.
+    if (is_connected) {
+      logger().warn("Heartbeat::Connection::accepted(): "
+                    "{} is accepted while connected, is_replace={}",
+                    *this, is_replace);
+      conn->mark_down();
+      set_unconnected();
     }
-  } else if (conn == accepted_conn) {
-    set_connected();
   }
+  conn = accepted_conn;
+  set_connected();
+  return true;
 }
 
-void Heartbeat::Connection::replaced()
+void Heartbeat::Connection::reset(bool is_replace)
 {
-  assert(!is_connected);
-  auto replaced_conn = conn;
-  // set the racing connection, will be handled by handle_accept()
-  conn = msgr.connect(replaced_conn->get_peer_addr(),
-                      replaced_conn->get_peer_name());
-  racing_detected = true;
-  logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
-  assert(conn != replaced_conn);
-}
+  if (is_replace) {
+    logger().info("Heartbeat::Connection::reset(): "
+                  "{} racing, waiting for the replacing accept",
+                  *this);
+    racing_detected = true;
+  }
 
-void Heartbeat::Connection::reset()
-{
-  conn = nullptr;
   if (is_connected) {
-    is_connected = false;
-    listener.decrease_connected();
+    set_unconnected();
+  } else {
+    conn = nullptr;
   }
-  if (!racing_detected || is_winner_side) {
+
+  if (is_replace) {
+    // waiting for the replacing accept event
+  } else if (!racing_detected || is_winner_side) {
     connect();
-  } else {
+  } else { // racing_detected && !is_winner_side
     logger().info("Heartbeat::Connection::reset(): "
                   "{} racing detected and lose, "
                   "waiting for peer connect me", *this);
@@ -506,11 +526,22 @@ void Heartbeat::Connection::retry()
 
 void Heartbeat::Connection::set_connected()
 {
+  assert(conn);
   assert(!is_connected);
+  ceph_assert(conn->is_connected());
   is_connected = true;
   listener.increase_connected();
 }
 
+void Heartbeat::Connection::set_unconnected()
+{
+  assert(conn);
+  assert(is_connected);
+  conn = nullptr;
+  is_connected = false;
+  listener.decrease_connected();
+}
+
 void Heartbeat::Connection::connect()
 {
   assert(!conn);
@@ -600,6 +631,64 @@ void Heartbeat::Peer::send_heartbeat(
   }
 }
 
+void Heartbeat::Peer::handle_reset(
+    crimson::net::ConnectionRef conn, bool is_replace)
+{
+  int cnt = 0;
+  for_each_conn([&] (auto& _conn) {
+    if (_conn.matches(conn)) {
+      ++cnt;
+      _conn.reset(is_replace);
+    }
+  });
+
+  if (cnt == 0) {
+    logger().info("Heartbeat::Peer::handle_reset(): {} ignores conn, is_replace={} -- {}",
+                  *this, is_replace, *conn);
+  } else if (cnt > 1) {
+    logger().error("Heartbeat::Peer::handle_reset(): {} handles conn {} times -- {}",
+                  *this, cnt, *conn);
+  }
+}
+
+void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
+{
+  int cnt = 0;
+  for_each_conn([&] (auto& _conn) {
+    if (_conn.matches(conn)) {
+      ++cnt;
+      _conn.connected();
+    }
+  });
+
+  if (cnt == 0) {
+    logger().error("Heartbeat::Peer::handle_connect(): {} ignores conn -- {}",
+                   *this, *conn);
+    conn->mark_down();
+  } else if (cnt > 1) {
+    logger().error("Heartbeat::Peer::handle_connect(): {} handles conn {} times -- {}",
+                  *this, cnt, *conn);
+  }
+}
+
+void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn, bool is_replace)
+{
+  int cnt = 0;
+  for_each_conn([&] (auto& _conn) {
+    if (_conn.accepted(conn, is_replace)) {
+      ++cnt;
+    }
+  });
+
+  if (cnt == 0) {
+    logger().warn("Heartbeat::Peer::handle_accept(): {} ignores conn -- {}",
+                  *this, *conn);
+  } else if (cnt > 1) {
+    logger().error("Heartbeat::Peer::handle_accept(): {} handles conn {} times -- {}",
+                  *this, cnt, *conn);
+  }
+}
+
 seastar::future<> Heartbeat::Peer::handle_reply(
     crimson::net::ConnectionRef conn, Ref<MOSDPing> m)
 {