1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/range/join.hpp>
7 #include <fmt/chrono.h>
10 #include "messages/MOSDPing.h"
11 #include "messages/MOSDFailure.h"
13 #include "crimson/common/config_proxy.h"
14 #include "crimson/common/formatter.h"
15 #include "crimson/net/Connection.h"
16 #include "crimson/net/Messenger.h"
17 #include "crimson/osd/shard_services.h"
18 #include "crimson/mon/MonClient.h"
20 #include "osd/OSDMap.h"
24 using crimson::common::local_conf
;
27 seastar::logger
& logger() {
28 return crimson::get_logger(ceph_subsys_osd
);
32 Heartbeat::Heartbeat(osd_id_t whoami
,
33 crimson::osd::ShardServices
& service
,
34 crimson::mon::Client
& monc
,
35 crimson::net::Messenger
&front_msgr
,
36 crimson::net::Messenger
&back_msgr
)
40 front_msgr
{front_msgr
},
42 // do this in background
45 (void)send_heartbeats();
50 seastar::future
<> Heartbeat::start(entity_addrvec_t front_addrs
,
51 entity_addrvec_t back_addrs
)
53 logger().info("heartbeat: start front_addrs={}, back_addrs={}",
54 front_addrs
, back_addrs
);
55 // i only care about the address, so any unused port would work
56 for (auto& addr
: boost::join(front_addrs
.v
, back_addrs
.v
)) {
60 using crimson::net::SocketPolicy
;
61 front_msgr
.set_policy(entity_name_t::TYPE_OSD
,
62 SocketPolicy::lossy_client(0));
63 back_msgr
.set_policy(entity_name_t::TYPE_OSD
,
64 SocketPolicy::lossy_client(0));
65 return seastar::when_all_succeed(start_messenger(front_msgr
,
67 start_messenger(back_msgr
,
71 std::chrono::seconds(local_conf()->osd_heartbeat_interval
));
76 Heartbeat::start_messenger(crimson::net::Messenger
& msgr
,
77 const entity_addrvec_t
& addrs
)
79 return msgr
.bind(addrs
).safe_then([this, &msgr
]() mutable {
80 return msgr
.start({this});
81 }, crimson::net::Messenger::bind_ertr::all_same_way(
82 [addrs
] (const std::error_code
& e
) {
83 logger().error("heartbeat messenger bind({}): {}", addrs
, e
);
88 seastar::future
<> Heartbeat::stop()
90 logger().info("{}", __func__
);
94 return gate
.close().then([this] {
95 return seastar::when_all_succeed(front_msgr
.shutdown(),
96 back_msgr
.shutdown());
98 return seastar::now();
102 const entity_addrvec_t
& Heartbeat::get_front_addrs() const
104 return front_msgr
.get_myaddrs();
107 const entity_addrvec_t
& Heartbeat::get_back_addrs() const
109 return back_msgr
.get_myaddrs();
112 crimson::net::Messenger
& Heartbeat::get_front_msgr() const
117 crimson::net::Messenger
& Heartbeat::get_back_msgr() const
122 void Heartbeat::add_peer(osd_id_t _peer
, epoch_t epoch
)
124 assert(whoami
!= _peer
);
125 auto [iter
, added
] = peers
.try_emplace(_peer
, *this, _peer
);
126 auto& peer
= iter
->second
;
127 peer
.set_epoch_added(epoch
);
130 Heartbeat::osds_t
Heartbeat::remove_down_peers()
132 osds_t old_osds
; // osds not added in this epoch
133 for (auto i
= peers
.begin(); i
!= peers
.end(); ) {
134 auto osdmap
= service
.get_map();
135 const auto& [osd
, peer
] = *i
;
136 if (!osdmap
->is_up(osd
)) {
139 if (peer
.get_epoch_added() < osdmap
->get_epoch()) {
140 old_osds
.push_back(osd
);
148 void Heartbeat::add_reporter_peers(int whoami
)
150 auto osdmap
= service
.get_map();
151 // include next and previous up osds to ensure we have a fully-connected set
153 if (auto next
= osdmap
->get_next_up_osd_after(whoami
); next
>= 0) {
156 if (auto prev
= osdmap
->get_previous_up_osd_before(whoami
); prev
>= 0) {
159 // make sure we have at least **min_down** osds coming from different
160 // subtree level (e.g., hosts) for fast failure detection.
161 auto min_down
= local_conf().get_val
<uint64_t>("mon_osd_min_down_reporters");
162 auto subtree
= local_conf().get_val
<string
>("mon_osd_reporter_subtree_level");
163 osdmap
->get_random_up_osds_by_subtree(
164 whoami
, subtree
, min_down
, want
, &want
);
165 auto epoch
= osdmap
->get_epoch();
166 for (int osd
: want
) {
167 add_peer(osd
, epoch
);
171 void Heartbeat::update_peers(int whoami
)
173 const auto min_peers
= static_cast<size_t>(
174 local_conf().get_val
<int64_t>("osd_heartbeat_min_peers"));
175 add_reporter_peers(whoami
);
176 auto extra
= remove_down_peers();
178 for (auto& osd
: extra
) {
179 if (peers
.size() <= min_peers
) {
185 auto osdmap
= service
.get_map();
186 auto epoch
= osdmap
->get_epoch();
187 for (auto next
= osdmap
->get_next_up_osd_after(whoami
);
188 peers
.size() < min_peers
&& next
>= 0 && next
!= whoami
;
189 next
= osdmap
->get_next_up_osd_after(next
)) {
190 add_peer(next
, epoch
);
194 Heartbeat::osds_t
Heartbeat::get_peers() const
197 osds
.reserve(peers
.size());
198 for (auto& peer
: peers
) {
199 osds
.push_back(peer
.first
);
204 void Heartbeat::remove_peer(osd_id_t peer
)
206 assert(peers
.count(peer
) == 1);
210 std::optional
<seastar::future
<>>
211 Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn
, MessageRef m
)
213 bool dispatched
= true;
214 gate
.dispatch_in_background(__func__
, *this, [this, conn
, &m
, &dispatched
] {
215 switch (m
->get_type()) {
217 return handle_osd_ping(conn
, boost::static_pointer_cast
<MOSDPing
>(m
));
220 return seastar::now();
223 return (dispatched
? std::make_optional(seastar::now()) : std::nullopt
);
226 void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn
, bool is_replace
)
228 auto peer
= conn
->get_peer_id();
229 if (conn
->get_peer_type() != entity_name_t::TYPE_OSD
||
230 peer
== entity_name_t::NEW
) {
233 if (auto found
= peers
.find(peer
);
234 found
!= peers
.end()) {
235 found
->second
.handle_reset(conn
, is_replace
);
239 void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn
)
241 auto peer
= conn
->get_peer_id();
242 if (conn
->get_peer_type() != entity_name_t::TYPE_OSD
||
243 peer
== entity_name_t::NEW
) {
246 if (auto found
= peers
.find(peer
);
247 found
!= peers
.end()) {
248 found
->second
.handle_connect(conn
);
252 void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn
)
254 auto peer
= conn
->get_peer_id();
255 if (conn
->get_peer_type() != entity_name_t::TYPE_OSD
||
256 peer
== entity_name_t::NEW
) {
259 if (auto found
= peers
.find(peer
);
260 found
!= peers
.end()) {
261 found
->second
.handle_accept(conn
);
265 seastar::future
<> Heartbeat::handle_osd_ping(crimson::net::ConnectionRef conn
,
270 return handle_ping(conn
, m
);
271 case MOSDPing::PING_REPLY
:
272 return handle_reply(conn
, m
);
273 case MOSDPing::YOU_DIED
:
274 return handle_you_died();
276 return seastar::now();
280 seastar::future
<> Heartbeat::handle_ping(crimson::net::ConnectionRef conn
,
283 auto min_message
= static_cast<uint32_t>(
284 local_conf()->osd_heartbeat_min_size
);
286 crimson::make_message
<MOSDPing
>(
288 service
.get_map()->get_epoch(),
289 MOSDPing::PING_REPLY
,
293 service
.get_up_epoch(),
295 return conn
->send(std::move(reply
)
296 ).then([this, m
, conn
] {
297 return maybe_share_osdmap(conn
, m
);
301 seastar::future
<> Heartbeat::maybe_share_osdmap(
302 crimson::net::ConnectionRef conn
,
305 const osd_id_t from
= m
->get_source().num();
306 const epoch_t osdmap_epoch
= service
.get_map()->get_epoch();
307 const epoch_t peer_epoch
= m
->map_epoch
;
308 auto found
= peers
.find(from
);
309 if (found
== peers
.end()) {
310 return seastar::now();
312 auto& peer
= found
->second
;
314 if (peer_epoch
> peer
.get_last_epoch_sent()) {
315 logger().debug("{} updating session's last epoch sent "
316 "from {} to peer's (id: {}) map epoch of {}",
317 __func__
, peer
.get_last_epoch_sent(),
319 peer
.set_last_epoch_sent(peer_epoch
);
322 if (osdmap_epoch
<= peer
.get_last_epoch_sent()) {
323 logger().info("{} latest epoch sent {} is already later "
324 "than osdmap epoch of {}",
325 __func__
, peer
.get_last_epoch_sent(),
327 return seastar::now();
330 logger().info("{} peer id: {} epoch is {} while osdmap is {}",
331 __func__
, from
, m
->map_epoch
, osdmap_epoch
);
332 if (osdmap_epoch
> m
->map_epoch
) {
333 logger().debug("{} sharing osdmap epoch of {} with peer id {}",
334 __func__
, osdmap_epoch
, from
);
335 // Peer's newest map is m->map_epoch. Therfore it misses
336 // the osdmaps in the range of `m->map_epoch` to `osdmap_epoch`.
337 return service
.send_incremental_map_to_osd(from
, m
->map_epoch
);
339 return seastar::now();
342 seastar::future
<> Heartbeat::handle_reply(crimson::net::ConnectionRef conn
,
345 const osd_id_t from
= m
->get_source().num();
346 auto found
= peers
.find(from
);
347 if (found
== peers
.end()) {
349 return seastar::now();
351 auto& peer
= found
->second
;
352 return peer
.handle_reply(conn
, m
353 ).then([this, conn
, m
] {
354 return maybe_share_osdmap(conn
, m
);
358 seastar::future
<> Heartbeat::handle_you_died()
360 // TODO: ask for newer osdmap
361 return seastar::now();
364 void Heartbeat::heartbeat_check()
366 failure_queue_t failure_queue
;
367 const auto now
= clock::now();
368 for (const auto& [osd
, peer
] : peers
) {
369 auto failed_since
= peer
.failed_since(now
);
370 if (!clock::is_zero(failed_since
)) {
371 failure_queue
.emplace(osd
, failed_since
);
374 if (!failure_queue
.empty()) {
375 // send_failures can run in background, because
376 // 1. After the execution of send_failures, no msg is actually
377 // sent, which means the sending operation is not done,
378 // which further seems to involve problems risks that when
379 // osd shuts down, the left part of the sending operation
380 // may reference OSD and Heartbeat instances that are already
381 // deleted. However, remaining work of that sending operation
382 // involves no reference back to OSD or Heartbeat instances,
383 // which means it wouldn't involve the above risks.
384 // 2. messages are sent in order, if later checks find out
385 // the previous "failed" peers to be healthy, that "still
386 // alive" messages would be sent after the previous "osd
387 // failure" messages which is totally safe.
388 (void)send_failures(std::move(failure_queue
));
392 seastar::future
<> Heartbeat::send_heartbeats()
394 const auto mnow
= service
.get_mnow();
395 const auto now
= clock::now();
397 std::vector
<seastar::future
<>> futures
;
398 for (auto& [osd
, peer
] : peers
) {
399 peer
.send_heartbeat(now
, mnow
, futures
);
401 return seastar::when_all_succeed(futures
.begin(), futures
.end());
404 seastar::future
<> Heartbeat::send_failures(failure_queue_t
&& failure_queue
)
406 std::vector
<seastar::future
<>> futures
;
407 const auto now
= clock::now();
408 for (auto [osd
, failed_since
] : failure_queue
) {
409 failing_peers
.add_pending(osd
, failed_since
, now
, futures
);
412 return seastar::when_all_succeed(futures
.begin(), futures
.end());
415 void Heartbeat::print(std::ostream
& out
) const
420 Heartbeat::Connection::~Connection()
427 bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn
) const
429 return (conn
&& conn
== _conn
);
432 void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn
)
435 if (accepted_conn
->get_peer_addr() == listener
.get_peer_addr(type
)) {
436 logger().info("Heartbeat::Connection::accepted(): "
437 "{} racing resolved", *this);
438 conn
= accepted_conn
;
441 } else if (conn
== accepted_conn
) {
446 void Heartbeat::Connection::replaced()
448 assert(!is_connected
);
449 auto replaced_conn
= conn
;
450 // set the racing connection, will be handled by handle_accept()
451 conn
= msgr
.connect(replaced_conn
->get_peer_addr(),
452 replaced_conn
->get_peer_name());
453 racing_detected
= true;
454 logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
455 assert(conn
!= replaced_conn
);
458 void Heartbeat::Connection::reset()
462 is_connected
= false;
463 listener
.decrease_connected();
465 if (!racing_detected
|| is_winner_side
) {
468 logger().info("Heartbeat::Connection::reset(): "
469 "{} racing detected and lose, "
470 "waiting for peer connect me", *this);
474 seastar::future
<> Heartbeat::Connection::send(MessageURef msg
)
476 assert(is_connected
);
477 return conn
->send(std::move(msg
));
480 void Heartbeat::Connection::validate()
482 assert(is_connected
);
483 auto peer_addr
= listener
.get_peer_addr(type
);
484 if (conn
->get_peer_addr() != peer_addr
) {
485 logger().info("Heartbeat::Connection::validate(): "
486 "{} has new address {} over {}, reset",
487 *this, peer_addr
, conn
->get_peer_addr());
489 racing_detected
= false;
494 void Heartbeat::Connection::retry()
496 racing_detected
= false;
507 void Heartbeat::Connection::set_connected()
509 assert(!is_connected
);
511 listener
.increase_connected();
514 void Heartbeat::Connection::connect()
517 auto addr
= listener
.get_peer_addr(type
);
518 conn
= msgr
.connect(addr
, entity_name_t(CEPH_ENTITY_TYPE_OSD
, peer
));
519 if (conn
->is_connected()) {
524 Heartbeat::clock::time_point
525 Heartbeat::Session::failed_since(Heartbeat::clock::time_point now
) const
527 if (do_health_screen(now
) == health_state::UNHEALTHY
) {
528 auto oldest_deadline
= ping_history
.begin()->second
.deadline
;
529 auto failed_since
= std::min(last_rx_back
, last_rx_front
);
530 if (clock::is_zero(failed_since
)) {
531 logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
532 "ever on either front or back, first ping sent {} "
533 "(oldest deadline {})",
534 peer
, first_tx
, oldest_deadline
);
535 failed_since
= first_tx
;
537 logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
538 "since back {} front {} (oldest deadline {})",
539 peer
, last_rx_back
, last_rx_front
, oldest_deadline
);
543 return clock::zero();
547 void Heartbeat::Session::set_inactive_history(clock::time_point now
)
550 if (ping_history
.empty()) {
551 const utime_t sent_stamp
{now
};
552 const auto deadline
=
553 now
+ std::chrono::seconds(local_conf()->osd_heartbeat_grace
);
554 ping_history
.emplace(sent_stamp
, reply_t
{deadline
, 0});
555 } else { // the entry is already added
556 assert(ping_history
.size() == 1);
560 Heartbeat::Peer::Peer(Heartbeat
& heartbeat
, osd_id_t peer
)
561 : ConnectionListener(2), heartbeat
{heartbeat
}, peer
{peer
}, session
{peer
},
562 con_front(peer
, heartbeat
.whoami
> peer
, Connection::type_t::front
,
563 heartbeat
.front_msgr
, *this),
564 con_back(peer
, heartbeat
.whoami
> peer
, Connection::type_t::back
,
565 heartbeat
.back_msgr
, *this)
567 logger().info("Heartbeat::Peer: osd.{} added", peer
);
570 Heartbeat::Peer::~Peer()
572 logger().info("Heartbeat::Peer: osd.{} removed", peer
);
575 void Heartbeat::Peer::send_heartbeat(
576 clock::time_point now
, ceph::signedspan mnow
,
577 std::vector
<seastar::future
<>>& futures
)
580 if (session
.is_started()) {
581 do_send_heartbeat(now
, mnow
, &futures
);
582 for_each_conn([] (auto& conn
) {
586 // we should send MOSDPing but still cannot at this moment
588 // we have already pending for a entire heartbeat interval
589 logger().warn("Heartbeat::Peer::send_heartbeat(): "
590 "heartbeat to osd.{} is still pending...", peer
);
591 for_each_conn([] (auto& conn
) {
595 logger().info("Heartbeat::Peer::send_heartbeat(): "
596 "heartbeat to osd.{} is pending send...", peer
);
597 session
.set_inactive_history(now
);
603 seastar::future
<> Heartbeat::Peer::handle_reply(
604 crimson::net::ConnectionRef conn
, Ref
<MOSDPing
> m
)
606 if (!session
.is_started()) {
607 // we haven't sent any ping yet
608 return seastar::now();
611 if (con_front
.matches(conn
)) {
612 type
= type_t::front
;
613 } else if (con_back
.matches(conn
)) {
616 return seastar::now();
618 const auto now
= clock::now();
619 if (session
.on_pong(m
->ping_stamp
, type
, now
)) {
620 if (session
.do_health_screen(now
) == Session::health_state::HEALTHY
) {
621 return heartbeat
.failing_peers
.cancel_one(peer
);
624 return seastar::now();
627 entity_addr_t
Heartbeat::Peer::get_peer_addr(type_t type
)
629 const auto osdmap
= heartbeat
.service
.get_map();
630 if (type
== type_t::front
) {
631 return osdmap
->get_hb_front_addrs(peer
).front();
633 return osdmap
->get_hb_back_addrs(peer
).front();
637 void Heartbeat::Peer::on_connected()
639 logger().info("Heartbeat::Peer: osd.{} connected (send={})",
641 session
.on_connected();
643 pending_send
= false;
644 do_send_heartbeat(clock::now(), heartbeat
.service
.get_mnow(), nullptr);
648 void Heartbeat::Peer::on_disconnected()
650 logger().info("Heartbeat::Peer: osd.{} disconnected", peer
);
651 session
.on_disconnected();
654 void Heartbeat::Peer::do_send_heartbeat(
655 Heartbeat::clock::time_point now
,
656 ceph::signedspan mnow
,
657 std::vector
<seastar::future
<>>* futures
)
659 const utime_t sent_stamp
{now
};
660 const auto deadline
=
661 now
+ std::chrono::seconds(local_conf()->osd_heartbeat_grace
);
662 session
.on_ping(sent_stamp
, deadline
);
663 for_each_conn([&, this] (auto& conn
) {
664 auto min_message
= static_cast<uint32_t>(
665 local_conf()->osd_heartbeat_min_size
);
666 auto ping
= crimson::make_message
<MOSDPing
>(
667 heartbeat
.monc
.get_fsid(),
668 heartbeat
.service
.get_map()->get_epoch(),
673 heartbeat
.service
.get_up_epoch(),
676 futures
->push_back(conn
.send(std::move(ping
)));
681 bool Heartbeat::FailingPeers::add_pending(
683 clock::time_point failed_since
,
684 clock::time_point now
,
685 std::vector
<seastar::future
<>>& futures
)
687 if (failure_pending
.count(peer
)) {
690 auto failed_for
= std::chrono::duration_cast
<std::chrono::seconds
>(
691 now
- failed_since
).count();
692 auto osdmap
= heartbeat
.service
.get_map();
693 auto failure_report
=
694 crimson::make_message
<MOSDFailure
>(heartbeat
.monc
.get_fsid(),
696 osdmap
->get_addrs(peer
),
697 static_cast<int>(failed_for
),
698 osdmap
->get_epoch());
699 failure_pending
.emplace(peer
, failure_info_t
{failed_since
,
700 osdmap
->get_addrs(peer
)});
701 futures
.push_back(heartbeat
.monc
.send_message(std::move(failure_report
)));
702 logger().info("{}: osd.{} failed for {}", __func__
, peer
, failed_for
);
706 seastar::future
<> Heartbeat::FailingPeers::cancel_one(osd_id_t peer
)
708 if (auto pending
= failure_pending
.find(peer
);
709 pending
!= failure_pending
.end()) {
710 auto fut
= send_still_alive(peer
, pending
->second
.addrs
);
711 failure_pending
.erase(peer
);
714 return seastar::now();
718 Heartbeat::FailingPeers::send_still_alive(
719 osd_id_t osd
, const entity_addrvec_t
& addrs
)
721 auto still_alive
= crimson::make_message
<MOSDFailure
>(
722 heartbeat
.monc
.get_fsid(),
726 heartbeat
.service
.get_map()->get_epoch(),
727 MOSDFailure::FLAG_ALIVE
);
728 logger().info("{}: osd.{}", __func__
, osd
);
729 return heartbeat
.monc
.send_message(std::move(still_alive
));