}
}
-void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard)
{
+ ceph_assert_always(seastar::this_shard_id() == prv_shard);
auto peer = conn->get_peer_id();
if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
peer == entity_name_t::NEW) {
}
}
-void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_accept(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace)
{
+ ceph_assert_always(seastar::this_shard_id() == prv_shard);
auto peer = conn->get_peer_id();
if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
peer == entity_name_t::NEW) {
}
if (auto found = peers.find(peer);
found != peers.end()) {
- found->second.handle_accept(conn);
+ found->second.handle_accept(conn, is_replace);
}
}
Ref<MOSDPing> m)
{
const osd_id_t from = m->get_source().num();
- const epoch_t osdmap_epoch = service.get_map()->get_epoch();
- const epoch_t peer_epoch = m->map_epoch;
+ const epoch_t current_osdmap_epoch = service.get_map()->get_epoch();
auto found = peers.find(from);
if (found == peers.end()) {
return seastar::now();
}
auto& peer = found->second;
- if (peer_epoch > peer.get_last_epoch_sent()) {
- logger().debug("{} updating session's last epoch sent "
- "from {} to peer's (id: {}) map epoch of {}",
- __func__, peer.get_last_epoch_sent(),
- from, peer_epoch);
- peer.set_last_epoch_sent(peer_epoch);
+ if (m->map_epoch > peer.get_projected_epoch()) {
+ logger().debug("{} updating peer {} session's projected_epoch"
+ "from {} to ping map epoch of {}",
+ __func__, from, peer.get_projected_epoch(),
+ m->map_epoch);
+ peer.set_projected_epoch(m->map_epoch);
}
- if (osdmap_epoch <= peer.get_last_epoch_sent()) {
- logger().info("{} latest epoch sent {} is already later "
- "than osdmap epoch of {}",
- __func__ , peer.get_last_epoch_sent(),
- osdmap_epoch);
+ if (current_osdmap_epoch <= peer.get_projected_epoch()) {
+ logger().debug("{} peer {} projected_epoch {} is already later "
+ "than our osdmap epoch of {}",
+ __func__ , from, peer.get_projected_epoch(),
+ current_osdmap_epoch);
return seastar::now();
}
- logger().info("{} peer id: {} epoch is {} while osdmap is {}",
- __func__ , from, m->map_epoch, osdmap_epoch);
- if (osdmap_epoch > m->map_epoch) {
- logger().debug("{} sharing osdmap epoch of {} with peer id {}",
- __func__, osdmap_epoch, from);
- // Peer's newest map is m->map_epoch. Therfore it misses
- // the osdmaps in the range of `m->map_epoch` to `osdmap_epoch`.
- return service.send_incremental_map_to_osd(from, m->map_epoch);
- }
- return seastar::now();
+ const epoch_t send_from = peer.get_projected_epoch();
+ logger().debug("{} sending peer {} peer maps from projected epoch {} through "
+ "local osdmap epoch {}",
+ __func__,
+ from,
+ send_from,
+ current_osdmap_epoch);
+ peer.set_projected_epoch(current_osdmap_epoch);
+ return service.send_incremental_map_to_osd(from, send_from);
}
seastar::future<> Heartbeat::handle_reply(crimson::net::ConnectionRef conn,
return (conn && conn == _conn);
}
-void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+bool Heartbeat::Connection::accepted(
+ crimson::net::ConnectionRef accepted_conn,
+ bool is_replace)
{
- if (!conn) {
- if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
- logger().info("Heartbeat::Connection::accepted(): "
- "{} racing resolved", *this);
- conn = accepted_conn;
- set_connected();
+ ceph_assert(accepted_conn);
+ ceph_assert(accepted_conn != conn);
+ if (accepted_conn->get_peer_addr() != listener.get_peer_addr(type)) {
+ return false;
+ }
+
+ if (is_replace) {
+ logger().info("Heartbeat::Connection::accepted(): "
+ "{} racing", *this);
+ racing_detected = true;
+ }
+ if (conn) {
+ // there is no assumption about the ordering of the reset and accept
+ // events for the 2 racing connections.
+ if (is_connected) {
+ logger().warn("Heartbeat::Connection::accepted(): "
+ "{} is accepted while connected, is_replace={}",
+ *this, is_replace);
+ conn->mark_down();
+ set_unconnected();
}
- } else if (conn == accepted_conn) {
- set_connected();
}
+ conn = accepted_conn;
+ set_connected();
+ return true;
}
-void Heartbeat::Connection::replaced()
+void Heartbeat::Connection::reset(bool is_replace)
{
- assert(!is_connected);
- auto replaced_conn = conn;
- // set the racing connection, will be handled by handle_accept()
- conn = msgr.connect(replaced_conn->get_peer_addr(),
- replaced_conn->get_peer_name());
- racing_detected = true;
- logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
- assert(conn != replaced_conn);
-}
+ if (is_replace) {
+ logger().info("Heartbeat::Connection::reset(): "
+ "{} racing, waiting for the replacing accept",
+ *this);
+ racing_detected = true;
+ }
-void Heartbeat::Connection::reset()
-{
- conn = nullptr;
if (is_connected) {
- is_connected = false;
- listener.decrease_connected();
+ set_unconnected();
+ } else {
+ conn = nullptr;
}
- if (!racing_detected || is_winner_side) {
+
+ if (is_replace) {
+ // waiting for the replacing accept event
+ } else if (!racing_detected || is_winner_side) {
connect();
- } else {
+ } else { // racing_detected && !is_winner_side
logger().info("Heartbeat::Connection::reset(): "
"{} racing detected and lose, "
"waiting for peer connect me", *this);
void Heartbeat::Connection::set_connected()
{
+ assert(conn);
assert(!is_connected);
+ ceph_assert(conn->is_connected());
is_connected = true;
listener.increase_connected();
}
+void Heartbeat::Connection::set_unconnected()
+{
+ assert(conn);
+ assert(is_connected);
+ conn = nullptr;
+ is_connected = false;
+ listener.decrease_connected();
+}
+
void Heartbeat::Connection::connect()
{
assert(!conn);
}
}
+void Heartbeat::Peer::handle_reset(
+ crimson::net::ConnectionRef conn, bool is_replace)
+{
+ int cnt = 0;
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.matches(conn)) {
+ ++cnt;
+ _conn.reset(is_replace);
+ }
+ });
+
+ if (cnt == 0) {
+ logger().info("Heartbeat::Peer::handle_reset(): {} ignores conn, is_replace={} -- {}",
+ *this, is_replace, *conn);
+ } else if (cnt > 1) {
+ logger().error("Heartbeat::Peer::handle_reset(): {} handles conn {} times -- {}",
+ *this, cnt, *conn);
+ }
+}
+
+void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
+{
+ int cnt = 0;
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.matches(conn)) {
+ ++cnt;
+ _conn.connected();
+ }
+ });
+
+ if (cnt == 0) {
+ logger().error("Heartbeat::Peer::handle_connect(): {} ignores conn -- {}",
+ *this, *conn);
+ conn->mark_down();
+ } else if (cnt > 1) {
+ logger().error("Heartbeat::Peer::handle_connect(): {} handles conn {} times -- {}",
+ *this, cnt, *conn);
+ }
+}
+
+void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn, bool is_replace)
+{
+ int cnt = 0;
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.accepted(conn, is_replace)) {
+ ++cnt;
+ }
+ });
+
+ if (cnt == 0) {
+ logger().warn("Heartbeat::Peer::handle_accept(): {} ignores conn -- {}",
+ *this, *conn);
+ } else if (cnt > 1) {
+ logger().error("Heartbeat::Peer::handle_accept(): {} handles conn {} times -- {}",
+ *this, cnt, *conn);
+ }
+}
+
seastar::future<> Heartbeat::Peer::handle_reply(
crimson::net::ConnectionRef conn, Ref<MOSDPing> m)
{