]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/heartbeat.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / heartbeat.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <cstdint>
7 #include <seastar/core/future.hh>
8 #include "common/ceph_time.h"
9 #include "crimson/common/gated.h"
10 #include "crimson/net/Dispatcher.h"
11 #include "crimson/net/Fwd.h"
12
13 class MOSDPing;
14
15 namespace crimson::osd {
16 class ShardServices;
17 }
18
19 namespace crimson::mon {
20 class Client;
21 }
22
23 template<typename Message> using Ref = boost::intrusive_ptr<Message>;
24
25 class Heartbeat : public crimson::net::Dispatcher {
26 public:
27 using osd_id_t = int;
28
29 Heartbeat(osd_id_t whoami,
30 const crimson::osd::ShardServices& service,
31 crimson::mon::Client& monc,
32 crimson::net::MessengerRef front_msgr,
33 crimson::net::MessengerRef back_msgr);
34
35 seastar::future<> start(entity_addrvec_t front,
36 entity_addrvec_t back);
37 seastar::future<> stop();
38
39 using osds_t = std::vector<osd_id_t>;
40 void add_peer(osd_id_t peer, epoch_t epoch);
41 void update_peers(int whoami);
42 void remove_peer(osd_id_t peer);
43 osds_t get_peers() const;
44
45 const entity_addrvec_t& get_front_addrs() const;
46 const entity_addrvec_t& get_back_addrs() const;
47
48 crimson::net::MessengerRef get_front_msgr() const;
49 crimson::net::MessengerRef get_back_msgr() const;
50 void set_require_authorizer(bool);
51
52 // Dispatcher methods
53 std::optional<seastar::future<>> ms_dispatch(
54 crimson::net::ConnectionRef conn, MessageRef m) override;
55 void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
56 void ms_handle_connect(crimson::net::ConnectionRef conn) override;
57 void ms_handle_accept(crimson::net::ConnectionRef conn) override;
58
59 void print(std::ostream&) const;
60 private:
61 seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn,
62 Ref<MOSDPing> m);
63 seastar::future<> handle_ping(crimson::net::ConnectionRef conn,
64 Ref<MOSDPing> m);
65 seastar::future<> handle_reply(crimson::net::ConnectionRef conn,
66 Ref<MOSDPing> m);
67 seastar::future<> handle_you_died();
68
69 /// remove down OSDs
70 /// @return peers not added in this epoch
71 osds_t remove_down_peers();
72 /// add enough reporters for fast failure detection
73 void add_reporter_peers(int whoami);
74
75 seastar::future<> start_messenger(crimson::net::Messenger& msgr,
76 const entity_addrvec_t& addrs);
77 private:
78 const osd_id_t whoami;
79 const crimson::osd::ShardServices& service;
80 crimson::mon::Client& monc;
81 crimson::net::MessengerRef front_msgr;
82 crimson::net::MessengerRef back_msgr;
83
84 seastar::timer<seastar::lowres_clock> timer;
85 // use real_clock so it can be converted to utime_t
86 using clock = ceph::coarse_real_clock;
87
88 class ConnectionListener;
89 class Connection;
90 class Session;
91 class Peer;
92 using peers_map_t = std::map<osd_id_t, Peer>;
93 peers_map_t peers;
94
95 // osds which are considered failed
96 // osd_id => when was the last time that both front and back pings were acked
97 // or sent.
98 // use for calculating how long the OSD has been unresponsive
99 using failure_queue_t = std::map<osd_id_t, clock::time_point>;
100 seastar::future<> send_failures(failure_queue_t&& failure_queue);
101 seastar::future<> send_heartbeats();
102 void heartbeat_check();
103
104 // osds we've reported to monior as failed ones, but they are not marked down
105 // yet
106 crimson::common::Gated gate;
107
108 class FailingPeers {
109 public:
110 FailingPeers(Heartbeat& heartbeat) : heartbeat(heartbeat) {}
111 bool add_pending(osd_id_t peer,
112 clock::time_point failed_since,
113 clock::time_point now,
114 std::vector<seastar::future<>>& futures);
115 seastar::future<> cancel_one(osd_id_t peer);
116
117 private:
118 seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
119
120 Heartbeat& heartbeat;
121
122 struct failure_info_t {
123 clock::time_point failed_since;
124 entity_addrvec_t addrs;
125 };
126 std::map<osd_id_t, failure_info_t> failure_pending;
127 } failing_peers;
128 };
129
130 inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {
131 hb.print(out);
132 return out;
133 }
134
135 /*
136 * Event driven interface for Heartbeat::Peer to be notified when both hb_front
137 * and hb_back are connected, or connection is lost.
138 */
139 class Heartbeat::ConnectionListener {
140 public:
141 ConnectionListener(size_t connections) : connections{connections} {}
142
143 void increase_connected() {
144 assert(connected < connections);
145 ++connected;
146 if (connected == connections) {
147 on_connected();
148 }
149 }
150 void decrease_connected() {
151 assert(connected > 0);
152 if (connected == connections) {
153 on_disconnected();
154 }
155 --connected;
156 }
157 enum class type_t { front, back };
158 virtual entity_addr_t get_peer_addr(type_t) = 0;
159
160 protected:
161 virtual void on_connected() = 0;
162 virtual void on_disconnected() = 0;
163
164 private:
165 const size_t connections;
166 size_t connected = 0;
167 };
168
169 class Heartbeat::Connection {
170 public:
171 using type_t = ConnectionListener::type_t;
172 Connection(osd_id_t peer, bool is_winner_side, type_t type,
173 crimson::net::Messenger& msgr,
174 ConnectionListener& listener)
175 : peer{peer}, type{type},
176 msgr{msgr}, listener{listener},
177 is_winner_side{is_winner_side} {
178 connect();
179 }
180 Connection(const Connection&) = delete;
181 Connection(Connection&&) = delete;
182 Connection& operator=(const Connection&) = delete;
183 Connection& operator=(Connection&&) = delete;
184
185 ~Connection();
186
187 bool matches(crimson::net::ConnectionRef _conn) const;
188 void connected() {
189 set_connected();
190 }
191 void accepted(crimson::net::ConnectionRef);
192 void replaced();
193 void reset();
194 seastar::future<> send(MessageURef msg);
195 void validate();
196 // retry connection if still pending
197 void retry();
198
199 private:
200 void set_connected();
201 void connect();
202
203 const osd_id_t peer;
204 const type_t type;
205 crimson::net::Messenger& msgr;
206 ConnectionListener& listener;
207
208 /*
209 * Resolve the following racing when both me and peer are trying to connect
210 * each other symmetrically, under SocketPolicy::lossy_client:
211 *
212 * OSD.A OSD.B
213 * - -
214 * |-[1]----> <----[2]-|
215 * \ /
216 * \ /
217 * delay.. X delay..
218 * / \
219 * |-[1]x> / \ <x[2]-|
220 * |<-[2]--- ---[1]->|
221 * |(reset#1) (reset#2)|
222 * |(reconnectB) (reconnectA)|
223 * |-[2]---> <---[1]-|
224 * delay.. delay..
225 * (remote close populated)
226 * |-[2]x> <x[1]-|
227 * |(reset#2) (reset#1)|
228 * | ... ... |
229 * (dead loop!)
230 *
231 * Our solution is to remember if such racing was happened recently, and
232 * establish connection asymmetrically only from the winner side whose osd-id
233 * is larger.
234 */
235 const bool is_winner_side;
236 bool racing_detected = false;
237
238 crimson::net::ConnectionRef conn;
239 bool is_connected = false;
240
241 friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
242 if (c.type == type_t::front) {
243 return os << "con_front(osd." << c.peer << ")";
244 } else {
245 return os << "con_back(osd." << c.peer << ")";
246 }
247 }
248 };
249
250 /*
251 * Track the ping history and ping reply (the pong) from the same session, clean up
252 * history once hb_front or hb_back loses connection and restart the session once
253 * both connections are connected again.
254 *
255 * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back
256 * loses connection, because we would end up with the following deadloop:
257 *
258 * OSD.A OSD.B
259 * - -
260 * hb_front reset <--(network)--- hb_front close
261 * | ^
262 * | |
263 * remove Peer B (dead loop!) remove Peer A
264 * | |
265 * V |
266 * hb_back close ----(network)---> hb_back reset
267 */
268 class Heartbeat::Session {
269 public:
270 Session(osd_id_t peer) : peer{peer} {}
271
272 void set_epoch(epoch_t epoch_) { epoch = epoch_; }
273 epoch_t get_epoch() const { return epoch; }
274 bool is_started() const { return connected; }
275 bool pinged() const {
276 if (clock::is_zero(first_tx)) {
277 // i can never receive a pong without sending any ping message first.
278 assert(clock::is_zero(last_rx_front) &&
279 clock::is_zero(last_rx_back));
280 return false;
281 } else {
282 return true;
283 }
284 }
285
286 enum class health_state {
287 UNKNOWN,
288 UNHEALTHY,
289 HEALTHY,
290 };
291 health_state do_health_screen(clock::time_point now) const {
292 if (!pinged()) {
293 // we are not healty nor unhealty because we haven't sent anything yet
294 return health_state::UNKNOWN;
295 } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) {
296 return health_state::UNHEALTHY;
297 } else if (!clock::is_zero(last_rx_front) &&
298 !clock::is_zero(last_rx_back)) {
299 // only declare to be healthy until we have received the first
300 // replies from both front/back connections
301 return health_state::HEALTHY;
302 } else {
303 return health_state::UNKNOWN;
304 }
305 }
306
307 clock::time_point failed_since(clock::time_point now) const;
308
309 void set_tx(clock::time_point now) {
310 if (!pinged()) {
311 first_tx = now;
312 }
313 last_tx = now;
314 }
315
316 void on_connected() {
317 assert(!connected);
318 connected = true;
319 ping_history.clear();
320 }
321
322 void on_ping(const utime_t& sent_stamp,
323 const clock::time_point& deadline) {
324 assert(connected);
325 [[maybe_unused]] auto [reply, added] =
326 ping_history.emplace(sent_stamp, reply_t{deadline, 2});
327 }
328
329 bool on_pong(const utime_t& ping_stamp,
330 Connection::type_t type,
331 clock::time_point now) {
332 assert(connected);
333 auto ping = ping_history.find(ping_stamp);
334 if (ping == ping_history.end()) {
335 // old replies, deprecated by newly sent pings.
336 return false;
337 }
338 auto& unacked = ping->second.unacknowledged;
339 assert(unacked);
340 if (type == Connection::type_t::front) {
341 last_rx_front = now;
342 unacked--;
343 } else {
344 last_rx_back = now;
345 unacked--;
346 }
347 if (unacked == 0) {
348 ping_history.erase(ping_history.begin(), ++ping);
349 }
350 return true;
351 }
352
353 void on_disconnected() {
354 assert(connected);
355 connected = false;
356 if (!ping_history.empty()) {
357 // we lost our ping_history of the last session, but still need to keep
358 // the oldest deadline for unhealthy check.
359 auto oldest = ping_history.begin();
360 auto sent_stamp = oldest->first;
361 auto deadline = oldest->second.deadline;
362 ping_history.clear();
363 ping_history.emplace(sent_stamp, reply_t{deadline, 0});
364 }
365 }
366
367 // maintain an entry in ping_history for unhealthy check
368 void set_inactive_history(clock::time_point);
369
370 private:
371 const osd_id_t peer;
372 bool connected = false;
373 // time we sent our first ping request
374 clock::time_point first_tx;
375 // last time we sent a ping request
376 clock::time_point last_tx;
377 // last time we got a ping reply on the front side
378 clock::time_point last_rx_front;
379 // last time we got a ping reply on the back side
380 clock::time_point last_rx_back;
381 // most recent epoch we wanted this peer
382 epoch_t epoch;
383
384 struct reply_t {
385 clock::time_point deadline;
386 // one sent over front conn, another sent over back conn
387 uint8_t unacknowledged = 0;
388 };
389 // history of inflight pings, arranging by timestamp we sent
390 std::map<utime_t, reply_t> ping_history;
391 };
392
393 class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
394 public:
395 Peer(Heartbeat&, osd_id_t);
396 ~Peer();
397 Peer(Peer&&) = delete;
398 Peer(const Peer&) = delete;
399 Peer& operator=(Peer&&) = delete;
400 Peer& operator=(const Peer&) = delete;
401
402 void set_epoch(epoch_t epoch) { session.set_epoch(epoch); }
403 epoch_t get_epoch() const { return session.get_epoch(); }
404
405 // if failure, return time_point since last active
406 // else, return clock::zero()
407 clock::time_point failed_since(clock::time_point now) const {
408 return session.failed_since(now);
409 }
410 void send_heartbeat(
411 clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
412 seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>);
413 void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
414 for_each_conn([&] (auto& _conn) {
415 if (_conn.matches(conn)) {
416 if (is_replace) {
417 _conn.replaced();
418 } else {
419 _conn.reset();
420 }
421 }
422 });
423 }
424 void handle_connect(crimson::net::ConnectionRef conn) {
425 for_each_conn([&] (auto& _conn) {
426 if (_conn.matches(conn)) {
427 _conn.connected();
428 }
429 });
430 }
431 void handle_accept(crimson::net::ConnectionRef conn) {
432 for_each_conn([&] (auto& _conn) {
433 _conn.accepted(conn);
434 });
435 }
436
437 private:
438 entity_addr_t get_peer_addr(type_t type) override;
439 void on_connected() override;
440 void on_disconnected() override;
441 void do_send_heartbeat(
442 clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*);
443
444 template <typename Func>
445 void for_each_conn(Func&& f) {
446 f(con_front);
447 f(con_back);
448 }
449
450 Heartbeat& heartbeat;
451 const osd_id_t peer;
452 Session session;
453 // if need to send heartbeat when session connected
454 bool pending_send = false;
455 Connection con_front;
456 Connection con_back;
457 };