]>
git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/heartbeat.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
7 #include <seastar/core/future.hh>
8 #include "common/ceph_time.h"
9 #include "crimson/common/gated.h"
10 #include "crimson/net/Dispatcher.h"
11 #include "crimson/net/Fwd.h"
15 namespace crimson::osd
{
19 namespace crimson::mon
{
23 template<typename Message
> using Ref
= boost::intrusive_ptr
<Message
>;
25 class Heartbeat
: public crimson::net::Dispatcher
{
29 Heartbeat(osd_id_t whoami
,
30 const crimson::osd::ShardServices
& service
,
31 crimson::mon::Client
& monc
,
32 crimson::net::MessengerRef front_msgr
,
33 crimson::net::MessengerRef back_msgr
);
35 seastar::future
<> start(entity_addrvec_t front
,
36 entity_addrvec_t back
);
37 seastar::future
<> stop();
39 using osds_t
= std::vector
<osd_id_t
>;
40 void add_peer(osd_id_t peer
, epoch_t epoch
);
41 void update_peers(int whoami
);
42 void remove_peer(osd_id_t peer
);
43 osds_t
get_peers() const;
45 const entity_addrvec_t
& get_front_addrs() const;
46 const entity_addrvec_t
& get_back_addrs() const;
48 crimson::net::MessengerRef
get_front_msgr() const;
49 crimson::net::MessengerRef
get_back_msgr() const;
50 void set_require_authorizer(bool);
53 std::optional
<seastar::future
<>> ms_dispatch(
54 crimson::net::ConnectionRef conn
, MessageRef m
) override
;
55 void ms_handle_reset(crimson::net::ConnectionRef conn
, bool is_replace
) override
;
56 void ms_handle_connect(crimson::net::ConnectionRef conn
) override
;
57 void ms_handle_accept(crimson::net::ConnectionRef conn
) override
;
59 void print(std::ostream
&) const;
61 seastar::future
<> handle_osd_ping(crimson::net::ConnectionRef conn
,
63 seastar::future
<> handle_ping(crimson::net::ConnectionRef conn
,
65 seastar::future
<> handle_reply(crimson::net::ConnectionRef conn
,
67 seastar::future
<> handle_you_died();
70 /// @return peers not added in this epoch
71 osds_t
remove_down_peers();
72 /// add enough reporters for fast failure detection
73 void add_reporter_peers(int whoami
);
75 seastar::future
<> start_messenger(crimson::net::Messenger
& msgr
,
76 const entity_addrvec_t
& addrs
);
78 const osd_id_t whoami
;
79 const crimson::osd::ShardServices
& service
;
80 crimson::mon::Client
& monc
;
81 crimson::net::MessengerRef front_msgr
;
82 crimson::net::MessengerRef back_msgr
;
84 seastar::timer
<seastar::lowres_clock
> timer
;
85 // use real_clock so it can be converted to utime_t
86 using clock
= ceph::coarse_real_clock
;
88 class ConnectionListener
;
92 using peers_map_t
= std::map
<osd_id_t
, Peer
>;
95 // osds which are considered failed
96 // osd_id => when was the last time that both front and back pings were acked
98 // use for calculating how long the OSD has been unresponsive
99 using failure_queue_t
= std::map
<osd_id_t
, clock::time_point
>;
100 seastar::future
<> send_failures(failure_queue_t
&& failure_queue
);
101 seastar::future
<> send_heartbeats();
102 void heartbeat_check();
104 // osds we've reported to monior as failed ones, but they are not marked down
106 crimson::common::Gated gate
;
110 FailingPeers(Heartbeat
& heartbeat
) : heartbeat(heartbeat
) {}
111 bool add_pending(osd_id_t peer
,
112 clock::time_point failed_since
,
113 clock::time_point now
,
114 std::vector
<seastar::future
<>>& futures
);
115 seastar::future
<> cancel_one(osd_id_t peer
);
118 seastar::future
<> send_still_alive(osd_id_t
, const entity_addrvec_t
&);
120 Heartbeat
& heartbeat
;
122 struct failure_info_t
{
123 clock::time_point failed_since
;
124 entity_addrvec_t addrs
;
126 std::map
<osd_id_t
, failure_info_t
> failure_pending
;
130 inline std::ostream
& operator<<(std::ostream
& out
, const Heartbeat
& hb
) {
136 * Event driven interface for Heartbeat::Peer to be notified when both hb_front
137 * and hb_back are connected, or connection is lost.
139 class Heartbeat::ConnectionListener
{
141 ConnectionListener(size_t connections
) : connections
{connections
} {}
143 void increase_connected() {
144 assert(connected
< connections
);
146 if (connected
== connections
) {
150 void decrease_connected() {
151 assert(connected
> 0);
152 if (connected
== connections
) {
157 enum class type_t
{ front
, back
};
158 virtual entity_addr_t
get_peer_addr(type_t
) = 0;
161 virtual void on_connected() = 0;
162 virtual void on_disconnected() = 0;
165 const size_t connections
;
166 size_t connected
= 0;
169 class Heartbeat::Connection
{
171 using type_t
= ConnectionListener::type_t
;
172 Connection(osd_id_t peer
, bool is_winner_side
, type_t type
,
173 crimson::net::Messenger
& msgr
,
174 ConnectionListener
& listener
)
175 : peer
{peer
}, type
{type
},
176 msgr
{msgr
}, listener
{listener
},
177 is_winner_side
{is_winner_side
} {
180 Connection(const Connection
&) = delete;
181 Connection(Connection
&&) = delete;
182 Connection
& operator=(const Connection
&) = delete;
183 Connection
& operator=(Connection
&&) = delete;
187 bool matches(crimson::net::ConnectionRef _conn
) const;
191 void accepted(crimson::net::ConnectionRef
);
194 seastar::future
<> send(MessageURef msg
);
196 // retry connection if still pending
200 void set_connected();
205 crimson::net::Messenger
& msgr
;
206 ConnectionListener
& listener
;
209 * Resolve the following racing when both me and peer are trying to connect
210 * each other symmetrically, under SocketPolicy::lossy_client:
214 * |-[1]----> <----[2]-|
219 * |-[1]x> / \ <x[2]-|
220 * |<-[2]--- ---[1]->|
221 * |(reset#1) (reset#2)|
222 * |(reconnectB) (reconnectA)|
223 * |-[2]---> <---[1]-|
225 * (remote close populated)
227 * |(reset#2) (reset#1)|
231 * Our solution is to remember if such racing was happened recently, and
232 * establish connection asymmetrically only from the winner side whose osd-id
235 const bool is_winner_side
;
236 bool racing_detected
= false;
238 crimson::net::ConnectionRef conn
;
239 bool is_connected
= false;
241 friend std::ostream
& operator<<(std::ostream
& os
, const Connection
& c
) {
242 if (c
.type
== type_t::front
) {
243 return os
<< "con_front(osd." << c
.peer
<< ")";
245 return os
<< "con_back(osd." << c
.peer
<< ")";
251 * Track the ping history and ping reply (the pong) from the same session, clean up
252 * history once hb_front or hb_back loses connection and restart the session once
253 * both connections are connected again.
255 * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back
256 * loses connection, because we would end up with the following deadloop:
260 * hb_front reset <--(network)--- hb_front close
263 * remove Peer B (dead loop!) remove Peer A
266 * hb_back close ----(network)---> hb_back reset
268 class Heartbeat::Session
{
270 Session(osd_id_t peer
) : peer
{peer
} {}
272 void set_epoch(epoch_t epoch_
) { epoch
= epoch_
; }
273 epoch_t
get_epoch() const { return epoch
; }
274 bool is_started() const { return connected
; }
275 bool pinged() const {
276 if (clock::is_zero(first_tx
)) {
277 // i can never receive a pong without sending any ping message first.
278 assert(clock::is_zero(last_rx_front
) &&
279 clock::is_zero(last_rx_back
));
286 enum class health_state
{
291 health_state
do_health_screen(clock::time_point now
) const {
293 // we are not healty nor unhealty because we haven't sent anything yet
294 return health_state::UNKNOWN
;
295 } else if (!ping_history
.empty() && ping_history
.begin()->second
.deadline
< now
) {
296 return health_state::UNHEALTHY
;
297 } else if (!clock::is_zero(last_rx_front
) &&
298 !clock::is_zero(last_rx_back
)) {
299 // only declare to be healthy until we have received the first
300 // replies from both front/back connections
301 return health_state::HEALTHY
;
303 return health_state::UNKNOWN
;
307 clock::time_point
failed_since(clock::time_point now
) const;
309 void set_tx(clock::time_point now
) {
316 void on_connected() {
319 ping_history
.clear();
322 void on_ping(const utime_t
& sent_stamp
,
323 const clock::time_point
& deadline
) {
325 [[maybe_unused
]] auto [reply
, added
] =
326 ping_history
.emplace(sent_stamp
, reply_t
{deadline
, 2});
329 bool on_pong(const utime_t
& ping_stamp
,
330 Connection::type_t type
,
331 clock::time_point now
) {
333 auto ping
= ping_history
.find(ping_stamp
);
334 if (ping
== ping_history
.end()) {
335 // old replies, deprecated by newly sent pings.
338 auto& unacked
= ping
->second
.unacknowledged
;
340 if (type
== Connection::type_t::front
) {
348 ping_history
.erase(ping_history
.begin(), ++ping
);
353 void on_disconnected() {
356 if (!ping_history
.empty()) {
357 // we lost our ping_history of the last session, but still need to keep
358 // the oldest deadline for unhealthy check.
359 auto oldest
= ping_history
.begin();
360 auto sent_stamp
= oldest
->first
;
361 auto deadline
= oldest
->second
.deadline
;
362 ping_history
.clear();
363 ping_history
.emplace(sent_stamp
, reply_t
{deadline
, 0});
367 // maintain an entry in ping_history for unhealthy check
368 void set_inactive_history(clock::time_point
);
372 bool connected
= false;
373 // time we sent our first ping request
374 clock::time_point first_tx
;
375 // last time we sent a ping request
376 clock::time_point last_tx
;
377 // last time we got a ping reply on the front side
378 clock::time_point last_rx_front
;
379 // last time we got a ping reply on the back side
380 clock::time_point last_rx_back
;
381 // most recent epoch we wanted this peer
385 clock::time_point deadline
;
386 // one sent over front conn, another sent over back conn
387 uint8_t unacknowledged
= 0;
389 // history of inflight pings, arranging by timestamp we sent
390 std::map
<utime_t
, reply_t
> ping_history
;
393 class Heartbeat::Peer final
: private Heartbeat::ConnectionListener
{
395 Peer(Heartbeat
&, osd_id_t
);
397 Peer(Peer
&&) = delete;
398 Peer(const Peer
&) = delete;
399 Peer
& operator=(Peer
&&) = delete;
400 Peer
& operator=(const Peer
&) = delete;
402 void set_epoch(epoch_t epoch
) { session
.set_epoch(epoch
); }
403 epoch_t
get_epoch() const { return session
.get_epoch(); }
405 // if failure, return time_point since last active
406 // else, return clock::zero()
407 clock::time_point
failed_since(clock::time_point now
) const {
408 return session
.failed_since(now
);
411 clock::time_point
, ceph::signedspan
, std::vector
<seastar::future
<>>&);
412 seastar::future
<> handle_reply(crimson::net::ConnectionRef
, Ref
<MOSDPing
>);
413 void handle_reset(crimson::net::ConnectionRef conn
, bool is_replace
) {
414 for_each_conn([&] (auto& _conn
) {
415 if (_conn
.matches(conn
)) {
424 void handle_connect(crimson::net::ConnectionRef conn
) {
425 for_each_conn([&] (auto& _conn
) {
426 if (_conn
.matches(conn
)) {
431 void handle_accept(crimson::net::ConnectionRef conn
) {
432 for_each_conn([&] (auto& _conn
) {
433 _conn
.accepted(conn
);
438 entity_addr_t
get_peer_addr(type_t type
) override
;
439 void on_connected() override
;
440 void on_disconnected() override
;
441 void do_send_heartbeat(
442 clock::time_point
, ceph::signedspan
, std::vector
<seastar::future
<>>*);
444 template <typename Func
>
445 void for_each_conn(Func
&& f
) {
450 Heartbeat
& heartbeat
;
453 // if need to send heartbeat when session connected
454 bool pending_send
= false;
455 Connection con_front
;