]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/heartbeat.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / heartbeat.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#pragma once
5
6#include <cstdint>
7#include <seastar/core/future.hh>
8#include "common/ceph_time.h"
f67539c2 9#include "crimson/common/gated.h"
11fdf7f2
TL
10#include "crimson/net/Dispatcher.h"
11#include "crimson/net/Fwd.h"
12
13class MOSDPing;
11fdf7f2 14
9f95a23c
TL
15namespace crimson::osd {
16 class ShardServices;
17}
18
19namespace crimson::mon {
11fdf7f2
TL
20 class Client;
21}
22
23template<typename Message> using Ref = boost::intrusive_ptr<Message>;
24
9f95a23c 25class Heartbeat : public crimson::net::Dispatcher {
11fdf7f2
TL
26public:
27 using osd_id_t = int;
28
f67539c2 29 Heartbeat(osd_id_t whoami,
1e59de90 30 crimson::osd::ShardServices& service,
9f95a23c 31 crimson::mon::Client& monc,
1e59de90
TL
32 crimson::net::Messenger &front_msgr,
33 crimson::net::Messenger &back_msgr);
11fdf7f2
TL
34
35 seastar::future<> start(entity_addrvec_t front,
36 entity_addrvec_t back);
37 seastar::future<> stop();
38
f67539c2 39 using osds_t = std::vector<osd_id_t>;
9f95a23c 40 void add_peer(osd_id_t peer, epoch_t epoch);
f67539c2
TL
41 void update_peers(int whoami);
42 void remove_peer(osd_id_t peer);
43 osds_t get_peers() const;
11fdf7f2
TL
44
45 const entity_addrvec_t& get_front_addrs() const;
46 const entity_addrvec_t& get_back_addrs() const;
47
1e59de90
TL
48 crimson::net::Messenger &get_front_msgr() const;
49 crimson::net::Messenger &get_back_msgr() const;
9f95a23c 50
11fdf7f2 51 // Dispatcher methods
f67539c2
TL
52 std::optional<seastar::future<>> ms_dispatch(
53 crimson::net::ConnectionRef conn, MessageRef m) override;
54 void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
55 void ms_handle_connect(crimson::net::ConnectionRef conn) override;
56 void ms_handle_accept(crimson::net::ConnectionRef conn) override;
11fdf7f2 57
f67539c2 58 void print(std::ostream&) const;
11fdf7f2 59private:
f67539c2 60 seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn,
11fdf7f2 61 Ref<MOSDPing> m);
f67539c2 62 seastar::future<> handle_ping(crimson::net::ConnectionRef conn,
11fdf7f2 63 Ref<MOSDPing> m);
f67539c2 64 seastar::future<> handle_reply(crimson::net::ConnectionRef conn,
11fdf7f2
TL
65 Ref<MOSDPing> m);
66 seastar::future<> handle_you_died();
67
11fdf7f2 68 /// remove down OSDs
f67539c2
TL
69 /// @return peers not added in this epoch
70 osds_t remove_down_peers();
11fdf7f2
TL
71 /// add enough reporters for fast failure detection
72 void add_reporter_peers(int whoami);
73
9f95a23c 74 seastar::future<> start_messenger(crimson::net::Messenger& msgr,
11fdf7f2 75 const entity_addrvec_t& addrs);
1e59de90
TL
76 seastar::future<> maybe_share_osdmap(crimson::net::ConnectionRef,
77 Ref<MOSDPing> m);
11fdf7f2 78private:
f67539c2 79 const osd_id_t whoami;
1e59de90 80 crimson::osd::ShardServices& service;
9f95a23c 81 crimson::mon::Client& monc;
1e59de90
TL
82 crimson::net::Messenger &front_msgr;
83 crimson::net::Messenger &back_msgr;
11fdf7f2
TL
84
85 seastar::timer<seastar::lowres_clock> timer;
86 // use real_clock so it can be converted to utime_t
87 using clock = ceph::coarse_real_clock;
88
f67539c2
TL
89 class ConnectionListener;
90 class Connection;
91 class Session;
92 class Peer;
93 using peers_map_t = std::map<osd_id_t, Peer>;
11fdf7f2
TL
94 peers_map_t peers;
95
96 // osds which are considered failed
97 // osd_id => when was the last time that both front and back pings were acked
f67539c2 98 // or sent.
11fdf7f2
TL
99 // use for calculating how long the OSD has been unresponsive
100 using failure_queue_t = std::map<osd_id_t, clock::time_point>;
f67539c2
TL
101 seastar::future<> send_failures(failure_queue_t&& failure_queue);
102 seastar::future<> send_heartbeats();
103 void heartbeat_check();
104
11fdf7f2
TL
105 // osds we've reported to monior as failed ones, but they are not marked down
106 // yet
f67539c2
TL
107 crimson::common::Gated gate;
108
109 class FailingPeers {
110 public:
111 FailingPeers(Heartbeat& heartbeat) : heartbeat(heartbeat) {}
112 bool add_pending(osd_id_t peer,
113 clock::time_point failed_since,
114 clock::time_point now,
115 std::vector<seastar::future<>>& futures);
116 seastar::future<> cancel_one(osd_id_t peer);
117
118 private:
119 seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
120
121 Heartbeat& heartbeat;
122
123 struct failure_info_t {
124 clock::time_point failed_since;
125 entity_addrvec_t addrs;
126 };
127 std::map<osd_id_t, failure_info_t> failure_pending;
128 } failing_peers;
129};
130
131inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {
132 hb.print(out);
133 return out;
134}
135
136/*
137 * Event driven interface for Heartbeat::Peer to be notified when both hb_front
138 * and hb_back are connected, or connection is lost.
139 */
140class Heartbeat::ConnectionListener {
141 public:
142 ConnectionListener(size_t connections) : connections{connections} {}
143
144 void increase_connected() {
145 assert(connected < connections);
146 ++connected;
147 if (connected == connections) {
148 on_connected();
149 }
150 }
151 void decrease_connected() {
152 assert(connected > 0);
153 if (connected == connections) {
154 on_disconnected();
155 }
156 --connected;
157 }
158 enum class type_t { front, back };
159 virtual entity_addr_t get_peer_addr(type_t) = 0;
160
161 protected:
162 virtual void on_connected() = 0;
163 virtual void on_disconnected() = 0;
164
165 private:
166 const size_t connections;
167 size_t connected = 0;
168};
169
170class Heartbeat::Connection {
171 public:
172 using type_t = ConnectionListener::type_t;
173 Connection(osd_id_t peer, bool is_winner_side, type_t type,
174 crimson::net::Messenger& msgr,
175 ConnectionListener& listener)
176 : peer{peer}, type{type},
177 msgr{msgr}, listener{listener},
178 is_winner_side{is_winner_side} {
179 connect();
180 }
181 Connection(const Connection&) = delete;
182 Connection(Connection&&) = delete;
183 Connection& operator=(const Connection&) = delete;
184 Connection& operator=(Connection&&) = delete;
185
186 ~Connection();
187
188 bool matches(crimson::net::ConnectionRef _conn) const;
189 void connected() {
190 set_connected();
191 }
192 void accepted(crimson::net::ConnectionRef);
193 void replaced();
194 void reset();
20effc67 195 seastar::future<> send(MessageURef msg);
f67539c2
TL
196 void validate();
197 // retry connection if still pending
198 void retry();
199
200 private:
201 void set_connected();
202 void connect();
203
204 const osd_id_t peer;
205 const type_t type;
206 crimson::net::Messenger& msgr;
207 ConnectionListener& listener;
208
209/*
210 * Resolve the following racing when both me and peer are trying to connect
211 * each other symmetrically, under SocketPolicy::lossy_client:
212 *
213 * OSD.A OSD.B
214 * - -
215 * |-[1]----> <----[2]-|
216 * \ /
217 * \ /
218 * delay.. X delay..
219 * / \
220 * |-[1]x> / \ <x[2]-|
221 * |<-[2]--- ---[1]->|
222 * |(reset#1) (reset#2)|
223 * |(reconnectB) (reconnectA)|
224 * |-[2]---> <---[1]-|
225 * delay.. delay..
226 * (remote close populated)
227 * |-[2]x> <x[1]-|
228 * |(reset#2) (reset#1)|
229 * | ... ... |
230 * (dead loop!)
231 *
232 * Our solution is to remember if such racing was happened recently, and
233 * establish connection asymmetrically only from the winner side whose osd-id
234 * is larger.
235 */
236 const bool is_winner_side;
237 bool racing_detected = false;
238
239 crimson::net::ConnectionRef conn;
240 bool is_connected = false;
241
242 friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
243 if (c.type == type_t::front) {
244 return os << "con_front(osd." << c.peer << ")";
245 } else {
246 return os << "con_back(osd." << c.peer << ")";
247 }
248 }
249};
250
1e59de90
TL
251#if FMT_VERSION >= 90000
252template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {};
253#endif
254
f67539c2
TL
255/*
256 * Track the ping history and ping reply (the pong) from the same session, clean up
257 * history once hb_front or hb_back loses connection and restart the session once
258 * both connections are connected again.
259 *
260 * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back
261 * loses connection, because we would end up with the following deadloop:
262 *
263 * OSD.A OSD.B
264 * - -
265 * hb_front reset <--(network)--- hb_front close
266 * | ^
267 * | |
268 * remove Peer B (dead loop!) remove Peer A
269 * | |
270 * V |
271 * hb_back close ----(network)---> hb_back reset
272 */
273class Heartbeat::Session {
274 public:
275 Session(osd_id_t peer) : peer{peer} {}
276
1e59de90
TL
277 void set_epoch_added(epoch_t epoch_) { epoch = epoch_; }
278 epoch_t get_epoch_added() const { return epoch; }
279
280 void set_last_epoch_sent(epoch_t epoch_) { last_sent_epoch = epoch_; }
281 epoch_t get_last_epoch_sent() const { return last_sent_epoch; }
282
f67539c2
TL
283 bool is_started() const { return connected; }
284 bool pinged() const {
285 if (clock::is_zero(first_tx)) {
286 // i can never receive a pong without sending any ping message first.
287 assert(clock::is_zero(last_rx_front) &&
288 clock::is_zero(last_rx_back));
289 return false;
290 } else {
291 return true;
292 }
293 }
294
295 enum class health_state {
296 UNKNOWN,
297 UNHEALTHY,
298 HEALTHY,
299 };
300 health_state do_health_screen(clock::time_point now) const {
301 if (!pinged()) {
302 // we are not healty nor unhealty because we haven't sent anything yet
303 return health_state::UNKNOWN;
304 } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) {
305 return health_state::UNHEALTHY;
306 } else if (!clock::is_zero(last_rx_front) &&
307 !clock::is_zero(last_rx_back)) {
308 // only declare to be healthy until we have received the first
309 // replies from both front/back connections
310 return health_state::HEALTHY;
311 } else {
312 return health_state::UNKNOWN;
313 }
314 }
315
316 clock::time_point failed_since(clock::time_point now) const;
317
318 void set_tx(clock::time_point now) {
319 if (!pinged()) {
320 first_tx = now;
321 }
322 last_tx = now;
323 }
324
325 void on_connected() {
326 assert(!connected);
327 connected = true;
328 ping_history.clear();
329 }
330
331 void on_ping(const utime_t& sent_stamp,
332 const clock::time_point& deadline) {
333 assert(connected);
334 [[maybe_unused]] auto [reply, added] =
335 ping_history.emplace(sent_stamp, reply_t{deadline, 2});
336 }
337
338 bool on_pong(const utime_t& ping_stamp,
339 Connection::type_t type,
340 clock::time_point now) {
341 assert(connected);
342 auto ping = ping_history.find(ping_stamp);
343 if (ping == ping_history.end()) {
344 // old replies, deprecated by newly sent pings.
345 return false;
346 }
347 auto& unacked = ping->second.unacknowledged;
348 assert(unacked);
349 if (type == Connection::type_t::front) {
350 last_rx_front = now;
351 unacked--;
352 } else {
353 last_rx_back = now;
354 unacked--;
355 }
356 if (unacked == 0) {
357 ping_history.erase(ping_history.begin(), ++ping);
358 }
359 return true;
360 }
361
362 void on_disconnected() {
363 assert(connected);
364 connected = false;
365 if (!ping_history.empty()) {
366 // we lost our ping_history of the last session, but still need to keep
367 // the oldest deadline for unhealthy check.
368 auto oldest = ping_history.begin();
369 auto sent_stamp = oldest->first;
370 auto deadline = oldest->second.deadline;
371 ping_history.clear();
372 ping_history.emplace(sent_stamp, reply_t{deadline, 0});
373 }
374 }
375
376 // maintain an entry in ping_history for unhealthy check
377 void set_inactive_history(clock::time_point);
378
379 private:
380 const osd_id_t peer;
381 bool connected = false;
382 // time we sent our first ping request
383 clock::time_point first_tx;
384 // last time we sent a ping request
385 clock::time_point last_tx;
386 // last time we got a ping reply on the front side
387 clock::time_point last_rx_front;
388 // last time we got a ping reply on the back side
389 clock::time_point last_rx_back;
390 // most recent epoch we wanted this peer
1e59de90
TL
391 epoch_t epoch; // rename me to epoch_added
392 // last epoch sent
393 epoch_t last_sent_epoch = 0;
f67539c2
TL
394
395 struct reply_t {
396 clock::time_point deadline;
397 // one sent over front conn, another sent over back conn
398 uint8_t unacknowledged = 0;
399 };
400 // history of inflight pings, arranging by timestamp we sent
401 std::map<utime_t, reply_t> ping_history;
402};
403
404class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
405 public:
406 Peer(Heartbeat&, osd_id_t);
407 ~Peer();
408 Peer(Peer&&) = delete;
409 Peer(const Peer&) = delete;
410 Peer& operator=(Peer&&) = delete;
411 Peer& operator=(const Peer&) = delete;
412
1e59de90
TL
413 // set/get the epoch at which the peer was added
414 void set_epoch_added(epoch_t epoch) { session.set_epoch_added(epoch); }
415 epoch_t get_epoch_added() const { return session.get_epoch_added(); }
416
417 void set_last_epoch_sent(epoch_t epoch) { session.set_last_epoch_sent(epoch); }
418 epoch_t get_last_epoch_sent() const { return session.get_last_epoch_sent(); }
f67539c2
TL
419
420 // if failure, return time_point since last active
421 // else, return clock::zero()
422 clock::time_point failed_since(clock::time_point now) const {
423 return session.failed_since(now);
424 }
425 void send_heartbeat(
426 clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
427 seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>);
428 void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
429 for_each_conn([&] (auto& _conn) {
430 if (_conn.matches(conn)) {
431 if (is_replace) {
432 _conn.replaced();
433 } else {
434 _conn.reset();
435 }
436 }
437 });
438 }
439 void handle_connect(crimson::net::ConnectionRef conn) {
440 for_each_conn([&] (auto& _conn) {
441 if (_conn.matches(conn)) {
442 _conn.connected();
443 }
444 });
445 }
446 void handle_accept(crimson::net::ConnectionRef conn) {
447 for_each_conn([&] (auto& _conn) {
448 _conn.accepted(conn);
449 });
450 }
451
452 private:
453 entity_addr_t get_peer_addr(type_t type) override;
454 void on_connected() override;
455 void on_disconnected() override;
456 void do_send_heartbeat(
457 clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*);
458
459 template <typename Func>
460 void for_each_conn(Func&& f) {
461 f(con_front);
462 f(con_back);
463 }
464
465 Heartbeat& heartbeat;
466 const osd_id_t peer;
467 Session session;
468 // if need to send heartbeat when session connected
469 bool pending_send = false;
470 Connection con_front;
471 Connection con_back;
11fdf7f2 472};
1e59de90
TL
473
474#if FMT_VERSION >= 90000
475template <> struct fmt::formatter<Heartbeat> : fmt::ostream_formatter {};
476#endif