]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/heartbeat.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / heartbeat.cc
index 0fb5eff7dcc93f41b6e8a46a8788df9a0c78ad63..81ec06ecd5de5421d9237a78fd01657912650b17 100644 (file)
@@ -9,6 +9,7 @@
 #include "messages/MOSDFailure.h"
 
 #include "crimson/common/config_proxy.h"
+#include "crimson/common/formatter.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/osd/shard_services.h"
@@ -24,16 +25,22 @@ namespace {
   }
 }
 
-Heartbeat::Heartbeat(const crimson::osd::ShardServices& service,
+Heartbeat::Heartbeat(osd_id_t whoami,
+                     const crimson::osd::ShardServices& service,
                      crimson::mon::Client& monc,
                      crimson::net::MessengerRef front_msgr,
                      crimson::net::MessengerRef back_msgr)
-  : service{service},
+  : whoami{whoami},
+    service{service},
     monc{monc},
     front_msgr{front_msgr},
     back_msgr{back_msgr},
     // do this in background
-    timer{[this] { (void)send_heartbeats(); }}
+    timer{[this] {
+      heartbeat_check();
+      (void)send_heartbeats();
+    }},
+    failing_peers{*this}
 {}
 
 seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
@@ -47,12 +54,14 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
 
   using crimson::net::SocketPolicy;
   front_msgr->set_policy(entity_name_t::TYPE_OSD,
-                         SocketPolicy::stateless_server(0));
+                         SocketPolicy::lossy_client(0));
   back_msgr->set_policy(entity_name_t::TYPE_OSD,
-                        SocketPolicy::stateless_server(0));
-  return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs),
-                                   start_messenger(*back_msgr, back_addrs))
-    .then([this] {
+                        SocketPolicy::lossy_client(0));
+  return seastar::when_all_succeed(start_messenger(*front_msgr,
+                                                  front_addrs),
+                                   start_messenger(*back_msgr,
+                                                  back_addrs))
+    .then_unpack([this] {
       timer.arm_periodic(
         std::chrono::seconds(local_conf()->osd_heartbeat_interval));
     });
@@ -64,15 +73,28 @@ Heartbeat::start_messenger(crimson::net::Messenger& msgr,
 {
   return msgr.try_bind(addrs,
                        local_conf()->ms_bind_port_min,
-                       local_conf()->ms_bind_port_max).then([&msgr, this] {
-    return msgr.start(this);
-  });
+                       local_conf()->ms_bind_port_max)
+  .safe_then([this, &msgr]() mutable {
+    return msgr.start({this});
+  }, crimson::net::Messenger::bind_ertr::all_same_way(
+      [] (const std::error_code& e) {
+    logger().error("heartbeat messenger try_bind(): address range is unavailable.");
+    ceph_abort();
+  }));
 }
 
 seastar::future<> Heartbeat::stop()
 {
-  return seastar::when_all_succeed(front_msgr->shutdown(),
-                                   back_msgr->shutdown());
+  logger().info("{}", __func__);
+  timer.cancel();
+  front_msgr->stop();
+  back_msgr->stop();
+  return gate.close().then([this] {
+    return seastar::when_all_succeed(front_msgr->shutdown(),
+                                    back_msgr->shutdown());
+  }).then_unpack([] {
+    return seastar::now();
+  });
 }
 
 const entity_addrvec_t& Heartbeat::get_front_addrs() const
@@ -93,47 +115,30 @@ void Heartbeat::set_require_authorizer(bool require_authorizer)
   }
 }
 
-void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+void Heartbeat::add_peer(osd_id_t _peer, epoch_t epoch)
 {
-  auto [peer_info, added] = peers.try_emplace(peer);
-  auto& info = peer_info->second;
-  info.epoch = epoch;
-  if (added) {
-    logger().info("add_peer({})", peer);
-    auto osdmap = service.get_osdmap_service().get_map();
-    // TODO: use addrs
-    peer_info->second.con_front = front_msgr->connect(
-        osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
-    peer_info->second.con_back = back_msgr->connect(
-        osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
-  }
+  assert(whoami != _peer);
+  auto [iter, added] = peers.try_emplace(_peer, *this, _peer);
+  auto& peer = iter->second;
+  peer.set_epoch(epoch);
 }
 
-seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
+Heartbeat::osds_t Heartbeat::remove_down_peers()
 {
-  osds_t osds;
-  for (auto& peer : peers) {
-    osds.push_back(peer.first);
-  }
-  return seastar::map_reduce(std::move(osds),
-    [this](auto& osd) {
-      auto osdmap = service.get_osdmap_service().get_map();
-      if (!osdmap->is_up(osd)) {
-        return remove_peer(osd).then([] {
-          return seastar::make_ready_future<osd_id_t>(-1);
-        });
-      } else if (peers[osd].epoch < osdmap->get_epoch()) {
-        return seastar::make_ready_future<osd_id_t>(osd);
-      } else {
-        return seastar::make_ready_future<osd_id_t>(-1);
-      }
-    }, osds_t{},
-    [](osds_t&& extras, osd_id_t extra) {
-      if (extra >= 0) {
-        extras.push_back(extra);
+  osds_t old_osds; // osds not added in this epoch
+  for (auto i = peers.begin(); i != peers.end(); ) {
+    auto osdmap = service.get_osdmap_service().get_map();
+    const auto& [osd, peer] = *i;
+    if (!osdmap->is_up(osd)) {
+      i = peers.erase(i);
+    } else {
+      if (peer.get_epoch() < osdmap->get_epoch()) {
+        old_osds.push_back(osd);
       }
-      return std::move(extras);
-    });
+      ++i;
+    }
+  }
+  return old_osds;
 }
 
 void Heartbeat::add_reporter_peers(int whoami)
@@ -159,80 +164,101 @@ void Heartbeat::add_reporter_peers(int whoami)
   };
 }
 
-seastar::future<> Heartbeat::update_peers(int whoami)
+void Heartbeat::update_peers(int whoami)
 {
   const auto min_peers = static_cast<size_t>(
     local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
   add_reporter_peers(whoami);
-  return remove_down_peers().then([=](osds_t&& extra) {
-    // too many?
-    struct iteration_state {
-      osds_t::const_iterator where;
-      osds_t::const_iterator end;
-    };
-    return seastar::do_with(iteration_state{extra.begin(),extra.end()},
-      [=](iteration_state& s) {
-        return seastar::do_until(
-          [min_peers, &s, this] {
-            return peers.size() <= min_peers || s.where == s.end; },
-          [&s, this] {
-            return remove_peer(*s.where); }
-        );
-    });
-  }).then([=] {
-    // or too few?
-    auto osdmap = service.get_osdmap_service().get_map();
-    auto epoch = osdmap->get_epoch();
-    for (auto next = osdmap->get_next_up_osd_after(whoami);
-      peers.size() < min_peers && next >= 0 && next != whoami;
-      next = osdmap->get_next_up_osd_after(next)) {
-      add_peer(next, epoch);
+  auto extra = remove_down_peers();
+  // too many?
+  for (auto& osd : extra) {
+    if (peers.size() <= min_peers) {
+      break;
     }
-  });
+    remove_peer(osd);
+  }
+  // or too few?
+  auto osdmap = service.get_osdmap_service().get_map();
+  auto epoch = osdmap->get_epoch();
+  for (auto next = osdmap->get_next_up_osd_after(whoami);
+    peers.size() < min_peers && next >= 0 && next != whoami;
+    next = osdmap->get_next_up_osd_after(next)) {
+    add_peer(next, epoch);
+  }
 }
 
-seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
+Heartbeat::osds_t Heartbeat::get_peers() const
 {
-  auto found = peers.find(peer);
-  assert(found != peers.end());
-  logger().info("remove_peer({})", peer);
-  return seastar::when_all_succeed(found->second.con_front->close(),
-                                   found->second.con_back->close()).then(
-    [this, peer] {
-      peers.erase(peer);
+  osds_t osds;
+  osds.reserve(peers.size());
+  for (auto& peer : peers) {
+    osds.push_back(peer.first);
+  }
+  return osds;
+}
+
+void Heartbeat::remove_peer(osd_id_t peer)
+{
+  assert(peers.count(peer) == 1);
+  peers.erase(peer);
+}
+
+std::optional<seastar::future<>>
+Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
+{
+  bool dispatched = true;
+  gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
+    switch (m->get_type()) {
+    case MSG_OSD_PING:
+      return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
+    default:
+      dispatched = false;
       return seastar::now();
-    });
+    }
+  });
+  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
-seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
-                                         MessageRef m)
+void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
 {
-  switch (m->get_type()) {
-  case MSG_OSD_PING:
-    return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
-  default:
-    return seastar::now();
+  auto peer = conn->get_peer_id();
+  if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+      peer == entity_name_t::NEW) {
+    return;
+  }
+  if (auto found = peers.find(peer);
+      found != peers.end()) {
+    found->second.handle_reset(conn, is_replace);
   }
 }
 
-seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
 {
-  auto found = std::find_if(peers.begin(), peers.end(),
-                            [conn](const peers_map_t::value_type& peer) {
-                              return (peer.second.con_front == conn ||
-                                      peer.second.con_back == conn);
-                            });
-  if (found == peers.end()) {
-    return seastar::now();
+  auto peer = conn->get_peer_id();
+  if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+      peer == entity_name_t::NEW) {
+    return;
+  }
+  if (auto found = peers.find(peer);
+      found != peers.end()) {
+    found->second.handle_connect(conn);
   }
-  const auto peer = found->first;
-  const auto epoch = found->second.epoch;
-  return remove_peer(peer).then([peer, epoch, this] {
-    add_peer(peer, epoch);
-  });
 }
 
-seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
+void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+{
+  auto peer = conn->get_peer_id();
+  if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+      peer == entity_name_t::NEW) {
+    return;
+  }
+  if (auto found = peers.find(peer);
+      found != peers.end()) {
+    found->second.handle_accept(conn);
+  }
+}
+
+seastar::future<> Heartbeat::handle_osd_ping(crimson::net::ConnectionRef conn,
                                              Ref<MOSDPing> m)
 {
   switch (m->op) {
@@ -247,7 +273,7 @@ seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
   }
 }
 
-seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn,
+seastar::future<> Heartbeat::handle_ping(crimson::net::ConnectionRef conn,
                                          Ref<MOSDPing> m)
 {
   auto min_message = static_cast<uint32_t>(
@@ -265,7 +291,7 @@ seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn,
   return conn->send(reply);
 }
 
-seastar::future<> Heartbeat::handle_reply(crimson::net::Connection* conn,
+seastar::future<> Heartbeat::handle_reply(crimson::net::ConnectionRef conn,
                                           Ref<MOSDPing> m)
 {
   const osd_id_t from = m->get_source().num();
@@ -275,32 +301,7 @@ seastar::future<> Heartbeat::handle_reply(crimson::net::Connection* conn,
     return seastar::now();
   }
   auto& peer = found->second;
-  auto ping = peer.ping_history.find(m->ping_stamp);
-  if (ping == peer.ping_history.end()) {
-    // old replies, deprecated by newly sent pings.
-    return seastar::now();
-  }
-  const auto now = clock::now();
-  auto& unacked = ping->second.unacknowledged;
-  if (conn == peer.con_back.get()) {
-    peer.last_rx_back = now;
-    unacked--;
-  } else if (conn == peer.con_front.get()) {
-    peer.last_rx_front = now;
-    unacked--;
-  }
-  if (unacked == 0) {
-    peer.ping_history.erase(peer.ping_history.begin(), ++ping);
-  }
-  if (peer.is_healthy(now)) {
-    // cancel false reports
-    failure_queue.erase(from);
-    if (auto pending = failure_pending.find(from);
-        pending != failure_pending.end()) {
-      return send_still_alive(from, pending->second.addrs);
-    }
-  }
-  return seastar::now();
+  return peer.handle_reply(conn, m);
 }
 
 seastar::future<> Heartbeat::handle_you_died()
@@ -309,114 +310,371 @@ seastar::future<> Heartbeat::handle_you_died()
   return seastar::now();
 }
 
+void Heartbeat::heartbeat_check()
+{
+  failure_queue_t failure_queue;
+  const auto now = clock::now();
+  for (const auto& [osd, peer] : peers) {
+    auto failed_since = peer.failed_since(now);
+    if (!clock::is_zero(failed_since)) {
+      failure_queue.emplace(osd, failed_since);
+    }
+  }
+  if (!failure_queue.empty()) {
+    // send_failures can run in background, because
+    //         1. After the execution of send_failures, no msg is actually
+    //            sent, which means the sending operation is not done,
+    //            which further seems to involve problems risks that when
+    //            osd shuts down, the left part of the sending operation
+    //            may reference OSD and Heartbeat instances that are already
+    //            deleted. However, remaining work of that sending operation
+    //            involves no reference back to OSD or Heartbeat instances,
+    //            which means it wouldn't involve the above risks.
+    //         2. messages are sent in order, if later checks find out
+    //            the previous "failed" peers to be healthy, that "still
+    //            alive" messages would be sent after the previous "osd
+    //            failure" messages which is totally safe.
+    (void)send_failures(std::move(failure_queue));
+  }
+}
+
 seastar::future<> Heartbeat::send_heartbeats()
 {
-  using peers_item_t = typename peers_map_t::value_type;
-  return seastar::parallel_for_each(peers,
-    [this](peers_item_t& item) {
-      const auto mnow = service.get_mnow();
-      const auto now = clock::now();
-      const auto deadline =
-        now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
-      auto& info = item.second;
-      info.last_tx = now;
-      if (clock::is_zero(info.first_tx)) {
-        info.first_tx = now;
-      }
-      const utime_t sent_stamp{now};
-      [[maybe_unused]] auto [reply, added] =
-        info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
-      std::vector<crimson::net::ConnectionRef> conns{info.con_front,
-                                                     info.con_back};
-      return seastar::parallel_for_each(std::move(conns),
-        [sent_stamp, mnow, &reply=reply->second, this] (auto con) {
-          if (con) {
-            auto min_message = static_cast<uint32_t>(
-              local_conf()->osd_heartbeat_min_size);
-            auto ping = make_message<MOSDPing>(
-             monc.get_fsid(),
-             service.get_osdmap_service().get_map()->get_epoch(),
-             MOSDPing::PING,
-             sent_stamp,
-             mnow,
-             mnow,
-             service.get_osdmap_service().get_up_epoch(),
-             min_message);
-            return con->send(ping).then([&reply] {
-              reply.unacknowledged++;
-              return seastar::now();
-            });
-          } else {
-            return seastar::now();
-          }
-        });
-    });
+  const auto mnow = service.get_mnow();
+  const auto now = clock::now();
+
+  std::vector<seastar::future<>> futures;
+  for (auto& [osd, peer] : peers) {
+    peer.send_heartbeat(now, mnow, futures);
+  }
+  return seastar::when_all_succeed(futures.begin(), futures.end());
 }
 
-seastar::future<> Heartbeat::send_failures()
+seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue)
 {
-  using failure_item_t = typename failure_queue_t::value_type;
-  return seastar::parallel_for_each(failure_queue,
-    [this](failure_item_t& failure_item) {
-      auto [osd, failed_since] = failure_item;
-      if (failure_pending.count(osd)) {
-        return seastar::now();
-      }
-      auto failed_for = chrono::duration_cast<chrono::seconds>(
-        clock::now() - failed_since).count();
-      auto osdmap = service.get_osdmap_service().get_map();
-      auto failure_report =
-        make_message<MOSDFailure>(monc.get_fsid(),
-                                  osd,
-                                  osdmap->get_addrs(osd),
-                                  static_cast<int>(failed_for),
-                                  osdmap->get_epoch());
-      failure_pending.emplace(osd, failure_info_t{failed_since,
-                                                  osdmap->get_addrs(osd)});
-      return monc.send_message(failure_report);
-    }).then([this] {
-      failure_queue.clear();
-      return seastar::now();
+  std::vector<seastar::future<>> futures;
+  const auto now = clock::now();
+  for (auto [osd, failed_since] : failure_queue) {
+    failing_peers.add_pending(osd, failed_since, now, futures);
+  }
+
+  return seastar::when_all_succeed(futures.begin(), futures.end());
+}
+
+void Heartbeat::print(std::ostream& out) const
+{
+  out << "heartbeat";
+}
+
+Heartbeat::Connection::~Connection()
+{
+  if (conn) {
+    conn->mark_down();
+  }
+}
+
+bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn) const
+{
+  return (conn && conn == _conn);
+}
+
+void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+{
+  if (!conn) {
+    if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
+      logger().info("Heartbeat::Connection::accepted(): "
+                    "{} racing resolved", *this);
+      conn = accepted_conn;
+      set_connected();
+    }
+  } else if (conn == accepted_conn) {
+    set_connected();
+  }
+}
+
+void Heartbeat::Connection::replaced()
+{
+  assert(!is_connected);
+  auto replaced_conn = conn;
+  // set the racing connection, will be handled by handle_accept()
+  conn = msgr.connect(replaced_conn->get_peer_addr(),
+                      replaced_conn->get_peer_name());
+  racing_detected = true;
+  logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
+  assert(conn != replaced_conn);
+  assert(conn->is_connected());
+}
+
+void Heartbeat::Connection::reset()
+{
+  conn = nullptr;
+  if (is_connected) {
+    is_connected = false;
+    listener.decrease_connected();
+  }
+  if (!racing_detected || is_winner_side) {
+    connect();
+  } else {
+    logger().info("Heartbeat::Connection::reset(): "
+                  "{} racing detected and lose, "
+                  "waiting for peer connect me", *this);
+  }
+}
+
+seastar::future<> Heartbeat::Connection::send(MessageRef msg)
+{
+  assert(is_connected);
+  return conn->send(msg);
+}
+
+void Heartbeat::Connection::validate()
+{
+  assert(is_connected);
+  auto peer_addr = listener.get_peer_addr(type);
+  if (conn->get_peer_addr() != peer_addr) {
+    logger().info("Heartbeat::Connection::validate(): "
+                  "{} has new address {} over {}, reset",
+                  *this, peer_addr, conn->get_peer_addr());
+    conn->mark_down();
+    racing_detected = false;
+    reset();
+  }
+}
+
+void Heartbeat::Connection::retry()
+{
+  racing_detected = false;
+  if (!is_connected) {
+    if (conn) {
+      conn->mark_down();
+      reset();
+    } else {
+      connect();
+    }
+  }
+}
+
+void Heartbeat::Connection::set_connected()
+{
+  assert(!is_connected);
+  is_connected = true;
+  listener.increase_connected();
+}
+
+void Heartbeat::Connection::connect()
+{
+  assert(!conn);
+  auto addr = listener.get_peer_addr(type);
+  conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+  if (conn->is_connected()) {
+    set_connected();
+  }
+}
+
+Heartbeat::clock::time_point
+Heartbeat::Session::failed_since(Heartbeat::clock::time_point now) const
+{
+  if (do_health_screen(now) == health_state::UNHEALTHY) {
+    auto oldest_deadline = ping_history.begin()->second.deadline;
+    auto failed_since = std::min(last_rx_back, last_rx_front);
+    if (clock::is_zero(failed_since)) {
+      logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
+                     "ever on either front or back, first ping sent {} "
+                     "(oldest deadline {})",
+                     peer, first_tx, oldest_deadline);
+      failed_since = first_tx;
+    } else {
+      logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
+                     "since back {} front {} (oldest deadline {})",
+                     peer, last_rx_back, last_rx_front, oldest_deadline);
+    }
+    return failed_since;
+  } else {
+    return clock::zero();
+  }
+}
+
+void Heartbeat::Session::set_inactive_history(clock::time_point now)
+{
+  assert(!connected);
+  if (ping_history.empty()) {
+    const utime_t sent_stamp{now};
+    const auto deadline =
+      now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+    ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+  } else { // the entry is already added
+    assert(ping_history.size() == 1);
+  }
+}
+
+Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
+  : ConnectionListener(2), heartbeat{heartbeat}, peer{peer}, session{peer},
+  con_front(peer, heartbeat.whoami > peer, Connection::type_t::front,
+            *heartbeat.front_msgr, *this),
+  con_back(peer, heartbeat.whoami > peer, Connection::type_t::back,
+           *heartbeat.back_msgr, *this)
+{
+  logger().info("Heartbeat::Peer: osd.{} added", peer);
+}
+
+Heartbeat::Peer::~Peer()
+{
+  logger().info("Heartbeat::Peer: osd.{} removed", peer);
+}
+
+void Heartbeat::Peer::send_heartbeat(
+    clock::time_point now, ceph::signedspan mnow,
+    std::vector<seastar::future<>>& futures)
+{
+  session.set_tx(now);
+  if (session.is_started()) {
+    do_send_heartbeat(now, mnow, &futures);
+    for_each_conn([] (auto& conn) {
+      conn.validate();
     });
+  } else {
+    // we should send MOSDPing but still cannot at this moment
+    if (pending_send) {
+      // we have already pending for a entire heartbeat interval
+      logger().warn("Heartbeat::Peer::send_heartbeat(): "
+                    "heartbeat to osd.{} is still pending...", peer);
+      for_each_conn([] (auto& conn) {
+        conn.retry();
+      });
+    } else {
+      logger().info("Heartbeat::Peer::send_heartbeat(): "
+                    "heartbeat to osd.{} is pending send...", peer);
+      session.set_inactive_history(now);
+      pending_send = true;
+    }
+  }
 }
 
-seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
-                                              const entity_addrvec_t& addrs)
+seastar::future<> Heartbeat::Peer::handle_reply(
+    crimson::net::ConnectionRef conn, Ref<MOSDPing> m)
 {
-  auto still_alive = make_message<MOSDFailure>(
-    monc.get_fsid(),
-    osd,
-    addrs,
-    0,
-    service.get_osdmap_service().get_map()->get_epoch(),
-    MOSDFailure::FLAG_ALIVE);
-  return monc.send_message(still_alive).then([=] {
-    failure_pending.erase(osd);
+  if (!session.is_started()) {
+    // we haven't sent any ping yet
     return seastar::now();
-  });
+  }
+  type_t type;
+  if (con_front.matches(conn)) {
+    type = type_t::front;
+  } else if (con_back.matches(conn)) {
+    type = type_t::back;
+  } else {
+    return seastar::now();
+  }
+  const auto now = clock::now();
+  if (session.on_pong(m->ping_stamp, type, now)) {
+    if (session.do_health_screen(now) == Session::health_state::HEALTHY) {
+      return heartbeat.failing_peers.cancel_one(peer);
+    }
+  }
+  return seastar::now();
 }
 
-bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
+entity_addr_t Heartbeat::Peer::get_peer_addr(type_t type)
 {
-  if (ping_history.empty()) {
-    // we haven't sent a ping yet or we have got all replies,
-    // in either way we are safe and healthy for now
-    return false;
+  const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+  if (type == type_t::front) {
+    return osdmap->get_hb_front_addrs(peer).front();
   } else {
-    auto oldest_ping = ping_history.begin();
-    return now > oldest_ping->second.deadline;
+    return osdmap->get_hb_back_addrs(peer).front();
   }
 }
 
-bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
+void Heartbeat::Peer::on_connected()
 {
-  if (con_front && clock::is_zero(last_rx_front)) {
-    return false;
+  logger().info("Heartbeat::Peer: osd.{} connected (send={})",
+                peer, pending_send);
+  session.on_connected();
+  if (pending_send) {
+    pending_send = false;
+    do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
   }
-  if (con_back && clock::is_zero(last_rx_back)) {
+}
+
+void Heartbeat::Peer::on_disconnected()
+{
+  logger().info("Heartbeat::Peer: osd.{} disconnected", peer);
+  session.on_disconnected();
+}
+
+void Heartbeat::Peer::do_send_heartbeat(
+    Heartbeat::clock::time_point now,
+    ceph::signedspan mnow,
+    std::vector<seastar::future<>>* futures)
+{
+  const utime_t sent_stamp{now};
+  const auto deadline =
+    now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+  session.on_ping(sent_stamp, deadline);
+  for_each_conn([&, this] (auto& conn) {
+    auto min_message = static_cast<uint32_t>(
+      local_conf()->osd_heartbeat_min_size);
+    auto ping = make_message<MOSDPing>(
+      heartbeat.monc.get_fsid(),
+      heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+      MOSDPing::PING,
+      sent_stamp,
+      mnow,
+      mnow,
+      heartbeat.service.get_osdmap_service().get_up_epoch(),
+      min_message);
+    if (futures) {
+      futures->push_back(conn.send(std::move(ping)));
+    }
+  });
+}
+
+bool Heartbeat::FailingPeers::add_pending(
+  osd_id_t peer,
+  clock::time_point failed_since,
+  clock::time_point now,
+  std::vector<seastar::future<>>& futures)
+{
+  if (failure_pending.count(peer)) {
     return false;
   }
-  // only declare to be healthy until we have received the first
-  // replies from both front/back connections
-  return !is_unhealthy(now);
+  auto failed_for = chrono::duration_cast<chrono::seconds>(
+      now - failed_since).count();
+  auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+  auto failure_report =
+      make_message<MOSDFailure>(heartbeat.monc.get_fsid(),
+                                peer,
+                                osdmap->get_addrs(peer),
+                                static_cast<int>(failed_for),
+                                osdmap->get_epoch());
+  failure_pending.emplace(peer, failure_info_t{failed_since,
+                                               osdmap->get_addrs(peer)});
+  futures.push_back(heartbeat.monc.send_message(failure_report));
+  logger().info("{}: osd.{} failed for {}", __func__, peer, failed_for);
+  return true;
+}
+
+seastar::future<> Heartbeat::FailingPeers::cancel_one(osd_id_t peer)
+{
+  if (auto pending = failure_pending.find(peer);
+      pending != failure_pending.end()) {
+    auto fut = send_still_alive(peer, pending->second.addrs);
+    failure_pending.erase(peer);
+    return fut;
+  }
+  return seastar::now();
+}
+
+seastar::future<>
+Heartbeat::FailingPeers::send_still_alive(
+    osd_id_t osd, const entity_addrvec_t& addrs)
+{
+  auto still_alive = make_message<MOSDFailure>(
+    heartbeat.monc.get_fsid(),
+    osd,
+    addrs,
+    0,
+    heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+    MOSDFailure::FLAG_ALIVE);
+  logger().info("{}: osd.{}", __func__, osd);
+  return heartbeat.monc.send_message(still_alive);
 }