#include "messages/MOSDFailure.h"
#include "crimson/common/config_proxy.h"
+#include "crimson/common/formatter.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/osd/shard_services.h"
}
}
-Heartbeat::Heartbeat(const crimson::osd::ShardServices& service,
+Heartbeat::Heartbeat(osd_id_t whoami,
+ const crimson::osd::ShardServices& service,
crimson::mon::Client& monc,
crimson::net::MessengerRef front_msgr,
crimson::net::MessengerRef back_msgr)
- : service{service},
+ : whoami{whoami},
+ service{service},
monc{monc},
front_msgr{front_msgr},
back_msgr{back_msgr},
// do this in background
- timer{[this] { (void)send_heartbeats(); }}
+ timer{[this] {
+ heartbeat_check();
+ (void)send_heartbeats();
+ }},
+ failing_peers{*this}
{}
seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
using crimson::net::SocketPolicy;
front_msgr->set_policy(entity_name_t::TYPE_OSD,
- SocketPolicy::stateless_server(0));
+ SocketPolicy::lossy_client(0));
back_msgr->set_policy(entity_name_t::TYPE_OSD,
- SocketPolicy::stateless_server(0));
- return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs),
- start_messenger(*back_msgr, back_addrs))
- .then([this] {
+ SocketPolicy::lossy_client(0));
+ return seastar::when_all_succeed(start_messenger(*front_msgr,
+ front_addrs),
+ start_messenger(*back_msgr,
+ back_addrs))
+ .then_unpack([this] {
timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_heartbeat_interval));
});
{
return msgr.try_bind(addrs,
local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max).then([&msgr, this] {
- return msgr.start(this);
- });
+ local_conf()->ms_bind_port_max)
+ .safe_then([this, &msgr]() mutable {
+ return msgr.start({this});
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger().error("heartbeat messenger try_bind(): address range is unavailable.");
+ ceph_abort();
+ }));
}
seastar::future<> Heartbeat::stop()
{
- return seastar::when_all_succeed(front_msgr->shutdown(),
- back_msgr->shutdown());
+ logger().info("{}", __func__);
+ timer.cancel();
+ front_msgr->stop();
+ back_msgr->stop();
+ return gate.close().then([this] {
+ return seastar::when_all_succeed(front_msgr->shutdown(),
+ back_msgr->shutdown());
+ }).then_unpack([] {
+ return seastar::now();
+ });
}
const entity_addrvec_t& Heartbeat::get_front_addrs() const
}
}
-void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+void Heartbeat::add_peer(osd_id_t _peer, epoch_t epoch)
{
- auto [peer_info, added] = peers.try_emplace(peer);
- auto& info = peer_info->second;
- info.epoch = epoch;
- if (added) {
- logger().info("add_peer({})", peer);
- auto osdmap = service.get_osdmap_service().get_map();
- // TODO: use addrs
- peer_info->second.con_front = front_msgr->connect(
- osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
- peer_info->second.con_back = back_msgr->connect(
- osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
- }
+ assert(whoami != _peer);
+ auto [iter, added] = peers.try_emplace(_peer, *this, _peer);
+ auto& peer = iter->second;
+ peer.set_epoch(epoch);
}
-seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
+Heartbeat::osds_t Heartbeat::remove_down_peers()
{
- osds_t osds;
- for (auto& peer : peers) {
- osds.push_back(peer.first);
- }
- return seastar::map_reduce(std::move(osds),
- [this](auto& osd) {
- auto osdmap = service.get_osdmap_service().get_map();
- if (!osdmap->is_up(osd)) {
- return remove_peer(osd).then([] {
- return seastar::make_ready_future<osd_id_t>(-1);
- });
- } else if (peers[osd].epoch < osdmap->get_epoch()) {
- return seastar::make_ready_future<osd_id_t>(osd);
- } else {
- return seastar::make_ready_future<osd_id_t>(-1);
- }
- }, osds_t{},
- [](osds_t&& extras, osd_id_t extra) {
- if (extra >= 0) {
- extras.push_back(extra);
+ osds_t old_osds; // osds not added in this epoch
+ for (auto i = peers.begin(); i != peers.end(); ) {
+ auto osdmap = service.get_osdmap_service().get_map();
+ const auto& [osd, peer] = *i;
+ if (!osdmap->is_up(osd)) {
+ i = peers.erase(i);
+ } else {
+ if (peer.get_epoch() < osdmap->get_epoch()) {
+ old_osds.push_back(osd);
}
- return std::move(extras);
- });
+ ++i;
+ }
+ }
+ return old_osds;
}
void Heartbeat::add_reporter_peers(int whoami)
};
}
-seastar::future<> Heartbeat::update_peers(int whoami)
+void Heartbeat::update_peers(int whoami)
{
const auto min_peers = static_cast<size_t>(
local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
add_reporter_peers(whoami);
- return remove_down_peers().then([=](osds_t&& extra) {
- // too many?
- struct iteration_state {
- osds_t::const_iterator where;
- osds_t::const_iterator end;
- };
- return seastar::do_with(iteration_state{extra.begin(),extra.end()},
- [=](iteration_state& s) {
- return seastar::do_until(
- [min_peers, &s, this] {
- return peers.size() <= min_peers || s.where == s.end; },
- [&s, this] {
- return remove_peer(*s.where); }
- );
- });
- }).then([=] {
- // or too few?
- auto osdmap = service.get_osdmap_service().get_map();
- auto epoch = osdmap->get_epoch();
- for (auto next = osdmap->get_next_up_osd_after(whoami);
- peers.size() < min_peers && next >= 0 && next != whoami;
- next = osdmap->get_next_up_osd_after(next)) {
- add_peer(next, epoch);
+ auto extra = remove_down_peers();
+ // too many?
+ for (auto& osd : extra) {
+ if (peers.size() <= min_peers) {
+ break;
}
- });
+ remove_peer(osd);
+ }
+ // or too few?
+ auto osdmap = service.get_osdmap_service().get_map();
+ auto epoch = osdmap->get_epoch();
+ for (auto next = osdmap->get_next_up_osd_after(whoami);
+ peers.size() < min_peers && next >= 0 && next != whoami;
+ next = osdmap->get_next_up_osd_after(next)) {
+ add_peer(next, epoch);
+ }
}
-seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
+Heartbeat::osds_t Heartbeat::get_peers() const
{
- auto found = peers.find(peer);
- assert(found != peers.end());
- logger().info("remove_peer({})", peer);
- return seastar::when_all_succeed(found->second.con_front->close(),
- found->second.con_back->close()).then(
- [this, peer] {
- peers.erase(peer);
+ osds_t osds;
+ osds.reserve(peers.size());
+ for (auto& peer : peers) {
+ osds.push_back(peer.first);
+ }
+ return osds;
+}
+
+void Heartbeat::remove_peer(osd_id_t peer)
+{
+ assert(peers.count(peer) == 1);
+ peers.erase(peer);
+}
+
+std::optional<seastar::future<>>
+Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
+{
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
+ switch (m->get_type()) {
+ case MSG_OSD_PING:
+ return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
+ default:
+ dispatched = false;
return seastar::now();
- });
+ }
+ });
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
-seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
- MessageRef m)
+void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
- switch (m->get_type()) {
- case MSG_OSD_PING:
- return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
- default:
- return seastar::now();
+ auto peer = conn->get_peer_id();
+ if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+ peer == entity_name_t::NEW) {
+ return;
+ }
+ if (auto found = peers.find(peer);
+ found != peers.end()) {
+ found->second.handle_reset(conn, is_replace);
}
}
-seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
{
- auto found = std::find_if(peers.begin(), peers.end(),
- [conn](const peers_map_t::value_type& peer) {
- return (peer.second.con_front == conn ||
- peer.second.con_back == conn);
- });
- if (found == peers.end()) {
- return seastar::now();
+ auto peer = conn->get_peer_id();
+ if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+ peer == entity_name_t::NEW) {
+ return;
+ }
+ if (auto found = peers.find(peer);
+ found != peers.end()) {
+ found->second.handle_connect(conn);
}
- const auto peer = found->first;
- const auto epoch = found->second.epoch;
- return remove_peer(peer).then([peer, epoch, this] {
- add_peer(peer, epoch);
- });
}
-seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
+void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+{
+ auto peer = conn->get_peer_id();
+ if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+ peer == entity_name_t::NEW) {
+ return;
+ }
+ if (auto found = peers.find(peer);
+ found != peers.end()) {
+ found->second.handle_accept(conn);
+ }
+}
+
+seastar::future<> Heartbeat::handle_osd_ping(crimson::net::ConnectionRef conn,
Ref<MOSDPing> m)
{
switch (m->op) {
}
}
-seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn,
+seastar::future<> Heartbeat::handle_ping(crimson::net::ConnectionRef conn,
Ref<MOSDPing> m)
{
auto min_message = static_cast<uint32_t>(
return conn->send(reply);
}
-seastar::future<> Heartbeat::handle_reply(crimson::net::Connection* conn,
+seastar::future<> Heartbeat::handle_reply(crimson::net::ConnectionRef conn,
Ref<MOSDPing> m)
{
const osd_id_t from = m->get_source().num();
return seastar::now();
}
auto& peer = found->second;
- auto ping = peer.ping_history.find(m->ping_stamp);
- if (ping == peer.ping_history.end()) {
- // old replies, deprecated by newly sent pings.
- return seastar::now();
- }
- const auto now = clock::now();
- auto& unacked = ping->second.unacknowledged;
- if (conn == peer.con_back.get()) {
- peer.last_rx_back = now;
- unacked--;
- } else if (conn == peer.con_front.get()) {
- peer.last_rx_front = now;
- unacked--;
- }
- if (unacked == 0) {
- peer.ping_history.erase(peer.ping_history.begin(), ++ping);
- }
- if (peer.is_healthy(now)) {
- // cancel false reports
- failure_queue.erase(from);
- if (auto pending = failure_pending.find(from);
- pending != failure_pending.end()) {
- return send_still_alive(from, pending->second.addrs);
- }
- }
- return seastar::now();
+ return peer.handle_reply(conn, m);
}
seastar::future<> Heartbeat::handle_you_died()
return seastar::now();
}
+void Heartbeat::heartbeat_check()
+{
+ failure_queue_t failure_queue;
+ const auto now = clock::now();
+ for (const auto& [osd, peer] : peers) {
+ auto failed_since = peer.failed_since(now);
+ if (!clock::is_zero(failed_since)) {
+ failure_queue.emplace(osd, failed_since);
+ }
+ }
+ if (!failure_queue.empty()) {
+ // send_failures can run in background, because
+ // 1. After the execution of send_failures, no msg is actually
+ // sent, which means the sending operation is not done,
+ // which further seems to involve problems risks that when
+ // osd shuts down, the left part of the sending operation
+ // may reference OSD and Heartbeat instances that are already
+ // deleted. However, remaining work of that sending operation
+ // involves no reference back to OSD or Heartbeat instances,
+ // which means it wouldn't involve the above risks.
+ // 2. messages are sent in order, if later checks find out
+ // the previous "failed" peers to be healthy, that "still
+ // alive" messages would be sent after the previous "osd
+ // failure" messages which is totally safe.
+ (void)send_failures(std::move(failure_queue));
+ }
+}
+
seastar::future<> Heartbeat::send_heartbeats()
{
- using peers_item_t = typename peers_map_t::value_type;
- return seastar::parallel_for_each(peers,
- [this](peers_item_t& item) {
- const auto mnow = service.get_mnow();
- const auto now = clock::now();
- const auto deadline =
- now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
- auto& info = item.second;
- info.last_tx = now;
- if (clock::is_zero(info.first_tx)) {
- info.first_tx = now;
- }
- const utime_t sent_stamp{now};
- [[maybe_unused]] auto [reply, added] =
- info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
- std::vector<crimson::net::ConnectionRef> conns{info.con_front,
- info.con_back};
- return seastar::parallel_for_each(std::move(conns),
- [sent_stamp, mnow, &reply=reply->second, this] (auto con) {
- if (con) {
- auto min_message = static_cast<uint32_t>(
- local_conf()->osd_heartbeat_min_size);
- auto ping = make_message<MOSDPing>(
- monc.get_fsid(),
- service.get_osdmap_service().get_map()->get_epoch(),
- MOSDPing::PING,
- sent_stamp,
- mnow,
- mnow,
- service.get_osdmap_service().get_up_epoch(),
- min_message);
- return con->send(ping).then([&reply] {
- reply.unacknowledged++;
- return seastar::now();
- });
- } else {
- return seastar::now();
- }
- });
- });
+ const auto mnow = service.get_mnow();
+ const auto now = clock::now();
+
+ std::vector<seastar::future<>> futures;
+ for (auto& [osd, peer] : peers) {
+ peer.send_heartbeat(now, mnow, futures);
+ }
+ return seastar::when_all_succeed(futures.begin(), futures.end());
}
-seastar::future<> Heartbeat::send_failures()
+seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue)
{
- using failure_item_t = typename failure_queue_t::value_type;
- return seastar::parallel_for_each(failure_queue,
- [this](failure_item_t& failure_item) {
- auto [osd, failed_since] = failure_item;
- if (failure_pending.count(osd)) {
- return seastar::now();
- }
- auto failed_for = chrono::duration_cast<chrono::seconds>(
- clock::now() - failed_since).count();
- auto osdmap = service.get_osdmap_service().get_map();
- auto failure_report =
- make_message<MOSDFailure>(monc.get_fsid(),
- osd,
- osdmap->get_addrs(osd),
- static_cast<int>(failed_for),
- osdmap->get_epoch());
- failure_pending.emplace(osd, failure_info_t{failed_since,
- osdmap->get_addrs(osd)});
- return monc.send_message(failure_report);
- }).then([this] {
- failure_queue.clear();
- return seastar::now();
+ std::vector<seastar::future<>> futures;
+ const auto now = clock::now();
+ for (auto [osd, failed_since] : failure_queue) {
+ failing_peers.add_pending(osd, failed_since, now, futures);
+ }
+
+ return seastar::when_all_succeed(futures.begin(), futures.end());
+}
+
+void Heartbeat::print(std::ostream& out) const
+{
+ out << "heartbeat";
+}
+
+Heartbeat::Connection::~Connection()
+{
+ if (conn) {
+ conn->mark_down();
+ }
+}
+
+bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn) const
+{
+ return (conn && conn == _conn);
+}
+
+void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+{
+ if (!conn) {
+ if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
+ logger().info("Heartbeat::Connection::accepted(): "
+ "{} racing resolved", *this);
+ conn = accepted_conn;
+ set_connected();
+ }
+ } else if (conn == accepted_conn) {
+ set_connected();
+ }
+}
+
+void Heartbeat::Connection::replaced()
+{
+ assert(!is_connected);
+ auto replaced_conn = conn;
+ // set the racing connection, will be handled by handle_accept()
+ conn = msgr.connect(replaced_conn->get_peer_addr(),
+ replaced_conn->get_peer_name());
+ racing_detected = true;
+ logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
+ assert(conn != replaced_conn);
+ assert(conn->is_connected());
+}
+
+void Heartbeat::Connection::reset()
+{
+ conn = nullptr;
+ if (is_connected) {
+ is_connected = false;
+ listener.decrease_connected();
+ }
+ if (!racing_detected || is_winner_side) {
+ connect();
+ } else {
+ logger().info("Heartbeat::Connection::reset(): "
+ "{} racing detected and lose, "
+ "waiting for peer connect me", *this);
+ }
+}
+
+seastar::future<> Heartbeat::Connection::send(MessageRef msg)
+{
+ assert(is_connected);
+ return conn->send(msg);
+}
+
+void Heartbeat::Connection::validate()
+{
+ assert(is_connected);
+ auto peer_addr = listener.get_peer_addr(type);
+ if (conn->get_peer_addr() != peer_addr) {
+ logger().info("Heartbeat::Connection::validate(): "
+ "{} has new address {} over {}, reset",
+ *this, peer_addr, conn->get_peer_addr());
+ conn->mark_down();
+ racing_detected = false;
+ reset();
+ }
+}
+
+void Heartbeat::Connection::retry()
+{
+ racing_detected = false;
+ if (!is_connected) {
+ if (conn) {
+ conn->mark_down();
+ reset();
+ } else {
+ connect();
+ }
+ }
+}
+
+void Heartbeat::Connection::set_connected()
+{
+ assert(!is_connected);
+ is_connected = true;
+ listener.increase_connected();
+}
+
+void Heartbeat::Connection::connect()
+{
+ assert(!conn);
+ auto addr = listener.get_peer_addr(type);
+ conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+ if (conn->is_connected()) {
+ set_connected();
+ }
+}
+
+Heartbeat::clock::time_point
+Heartbeat::Session::failed_since(Heartbeat::clock::time_point now) const
+{
+ if (do_health_screen(now) == health_state::UNHEALTHY) {
+ auto oldest_deadline = ping_history.begin()->second.deadline;
+ auto failed_since = std::min(last_rx_back, last_rx_front);
+ if (clock::is_zero(failed_since)) {
+ logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
+ "ever on either front or back, first ping sent {} "
+ "(oldest deadline {})",
+ peer, first_tx, oldest_deadline);
+ failed_since = first_tx;
+ } else {
+ logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
+ "since back {} front {} (oldest deadline {})",
+ peer, last_rx_back, last_rx_front, oldest_deadline);
+ }
+ return failed_since;
+ } else {
+ return clock::zero();
+ }
+}
+
+void Heartbeat::Session::set_inactive_history(clock::time_point now)
+{
+ assert(!connected);
+ if (ping_history.empty()) {
+ const utime_t sent_stamp{now};
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ } else { // the entry is already added
+ assert(ping_history.size() == 1);
+ }
+}
+
+Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
+ : ConnectionListener(2), heartbeat{heartbeat}, peer{peer}, session{peer},
+ con_front(peer, heartbeat.whoami > peer, Connection::type_t::front,
+ *heartbeat.front_msgr, *this),
+ con_back(peer, heartbeat.whoami > peer, Connection::type_t::back,
+ *heartbeat.back_msgr, *this)
+{
+ logger().info("Heartbeat::Peer: osd.{} added", peer);
+}
+
+Heartbeat::Peer::~Peer()
+{
+ logger().info("Heartbeat::Peer: osd.{} removed", peer);
+}
+
+void Heartbeat::Peer::send_heartbeat(
+ clock::time_point now, ceph::signedspan mnow,
+ std::vector<seastar::future<>>& futures)
+{
+ session.set_tx(now);
+ if (session.is_started()) {
+ do_send_heartbeat(now, mnow, &futures);
+ for_each_conn([] (auto& conn) {
+ conn.validate();
});
+ } else {
+ // we should send MOSDPing but still cannot at this moment
+ if (pending_send) {
+ // we have already pending for a entire heartbeat interval
+ logger().warn("Heartbeat::Peer::send_heartbeat(): "
+ "heartbeat to osd.{} is still pending...", peer);
+ for_each_conn([] (auto& conn) {
+ conn.retry();
+ });
+ } else {
+ logger().info("Heartbeat::Peer::send_heartbeat(): "
+ "heartbeat to osd.{} is pending send...", peer);
+ session.set_inactive_history(now);
+ pending_send = true;
+ }
+ }
}
-seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
- const entity_addrvec_t& addrs)
+seastar::future<> Heartbeat::Peer::handle_reply(
+ crimson::net::ConnectionRef conn, Ref<MOSDPing> m)
{
- auto still_alive = make_message<MOSDFailure>(
- monc.get_fsid(),
- osd,
- addrs,
- 0,
- service.get_osdmap_service().get_map()->get_epoch(),
- MOSDFailure::FLAG_ALIVE);
- return monc.send_message(still_alive).then([=] {
- failure_pending.erase(osd);
+ if (!session.is_started()) {
+ // we haven't sent any ping yet
return seastar::now();
- });
+ }
+ type_t type;
+ if (con_front.matches(conn)) {
+ type = type_t::front;
+ } else if (con_back.matches(conn)) {
+ type = type_t::back;
+ } else {
+ return seastar::now();
+ }
+ const auto now = clock::now();
+ if (session.on_pong(m->ping_stamp, type, now)) {
+ if (session.do_health_screen(now) == Session::health_state::HEALTHY) {
+ return heartbeat.failing_peers.cancel_one(peer);
+ }
+ }
+ return seastar::now();
}
-bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
+entity_addr_t Heartbeat::Peer::get_peer_addr(type_t type)
{
- if (ping_history.empty()) {
- // we haven't sent a ping yet or we have got all replies,
- // in either way we are safe and healthy for now
- return false;
+ const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ if (type == type_t::front) {
+ return osdmap->get_hb_front_addrs(peer).front();
} else {
- auto oldest_ping = ping_history.begin();
- return now > oldest_ping->second.deadline;
+ return osdmap->get_hb_back_addrs(peer).front();
}
}
-bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
+void Heartbeat::Peer::on_connected()
{
- if (con_front && clock::is_zero(last_rx_front)) {
- return false;
+ logger().info("Heartbeat::Peer: osd.{} connected (send={})",
+ peer, pending_send);
+ session.on_connected();
+ if (pending_send) {
+ pending_send = false;
+ do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
}
- if (con_back && clock::is_zero(last_rx_back)) {
+}
+
+void Heartbeat::Peer::on_disconnected()
+{
+ logger().info("Heartbeat::Peer: osd.{} disconnected", peer);
+ session.on_disconnected();
+}
+
+void Heartbeat::Peer::do_send_heartbeat(
+ Heartbeat::clock::time_point now,
+ ceph::signedspan mnow,
+ std::vector<seastar::future<>>* futures)
+{
+ const utime_t sent_stamp{now};
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ session.on_ping(sent_stamp, deadline);
+ for_each_conn([&, this] (auto& conn) {
+ auto min_message = static_cast<uint32_t>(
+ local_conf()->osd_heartbeat_min_size);
+ auto ping = make_message<MOSDPing>(
+ heartbeat.monc.get_fsid(),
+ heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDPing::PING,
+ sent_stamp,
+ mnow,
+ mnow,
+ heartbeat.service.get_osdmap_service().get_up_epoch(),
+ min_message);
+ if (futures) {
+ futures->push_back(conn.send(std::move(ping)));
+ }
+ });
+}
+
+bool Heartbeat::FailingPeers::add_pending(
+ osd_id_t peer,
+ clock::time_point failed_since,
+ clock::time_point now,
+ std::vector<seastar::future<>>& futures)
+{
+ if (failure_pending.count(peer)) {
return false;
}
- // only declare to be healthy until we have received the first
- // replies from both front/back connections
- return !is_unhealthy(now);
+ auto failed_for = chrono::duration_cast<chrono::seconds>(
+ now - failed_since).count();
+ auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ auto failure_report =
+ make_message<MOSDFailure>(heartbeat.monc.get_fsid(),
+ peer,
+ osdmap->get_addrs(peer),
+ static_cast<int>(failed_for),
+ osdmap->get_epoch());
+ failure_pending.emplace(peer, failure_info_t{failed_since,
+ osdmap->get_addrs(peer)});
+ futures.push_back(heartbeat.monc.send_message(failure_report));
+ logger().info("{}: osd.{} failed for {}", __func__, peer, failed_for);
+ return true;
+}
+
+seastar::future<> Heartbeat::FailingPeers::cancel_one(osd_id_t peer)
+{
+ if (auto pending = failure_pending.find(peer);
+ pending != failure_pending.end()) {
+ auto fut = send_still_alive(peer, pending->second.addrs);
+ failure_pending.erase(peer);
+ return fut;
+ }
+ return seastar::now();
+}
+
+seastar::future<>
+Heartbeat::FailingPeers::send_still_alive(
+ osd_id_t osd, const entity_addrvec_t& addrs)
+{
+ auto still_alive = make_message<MOSDFailure>(
+ heartbeat.monc.get_fsid(),
+ osd,
+ addrs,
+ 0,
+ heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDFailure::FLAG_ALIVE);
+ logger().info("{}: osd.{}", __func__, osd);
+ return heartbeat.monc.send_message(still_alive);
}