]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
6 | #include <cstdint> | |
7 | #include <seastar/core/future.hh> | |
8 | #include "common/ceph_time.h" | |
f67539c2 | 9 | #include "crimson/common/gated.h" |
11fdf7f2 TL |
10 | #include "crimson/net/Dispatcher.h" |
11 | #include "crimson/net/Fwd.h" | |
12 | ||
13 | class MOSDPing; | |
11fdf7f2 | 14 | |
9f95a23c TL |
15 | namespace crimson::osd { |
16 | class ShardServices; | |
17 | } | |
18 | ||
19 | namespace crimson::mon { | |
11fdf7f2 TL |
20 | class Client; |
21 | } | |
22 | ||
23 | template<typename Message> using Ref = boost::intrusive_ptr<Message>; | |
24 | ||
9f95a23c | 25 | class Heartbeat : public crimson::net::Dispatcher { |
11fdf7f2 TL |
26 | public: |
27 | using osd_id_t = int; | |
28 | ||
f67539c2 | 29 | Heartbeat(osd_id_t whoami, |
1e59de90 | 30 | crimson::osd::ShardServices& service, |
9f95a23c | 31 | crimson::mon::Client& monc, |
1e59de90 TL |
32 | crimson::net::Messenger &front_msgr, |
33 | crimson::net::Messenger &back_msgr); | |
11fdf7f2 TL |
34 | |
35 | seastar::future<> start(entity_addrvec_t front, | |
36 | entity_addrvec_t back); | |
37 | seastar::future<> stop(); | |
38 | ||
f67539c2 | 39 | using osds_t = std::vector<osd_id_t>; |
9f95a23c | 40 | void add_peer(osd_id_t peer, epoch_t epoch); |
f67539c2 TL |
41 | void update_peers(int whoami); |
42 | void remove_peer(osd_id_t peer); | |
43 | osds_t get_peers() const; | |
11fdf7f2 TL |
44 | |
45 | const entity_addrvec_t& get_front_addrs() const; | |
46 | const entity_addrvec_t& get_back_addrs() const; | |
47 | ||
1e59de90 TL |
48 | crimson::net::Messenger &get_front_msgr() const; |
49 | crimson::net::Messenger &get_back_msgr() const; | |
9f95a23c | 50 | |
11fdf7f2 | 51 | // Dispatcher methods |
f67539c2 TL |
52 | std::optional<seastar::future<>> ms_dispatch( |
53 | crimson::net::ConnectionRef conn, MessageRef m) override; | |
54 | void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; | |
55 | void ms_handle_connect(crimson::net::ConnectionRef conn) override; | |
56 | void ms_handle_accept(crimson::net::ConnectionRef conn) override; | |
11fdf7f2 | 57 | |
f67539c2 | 58 | void print(std::ostream&) const; |
11fdf7f2 | 59 | private: |
f67539c2 | 60 | seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn, |
11fdf7f2 | 61 | Ref<MOSDPing> m); |
f67539c2 | 62 | seastar::future<> handle_ping(crimson::net::ConnectionRef conn, |
11fdf7f2 | 63 | Ref<MOSDPing> m); |
f67539c2 | 64 | seastar::future<> handle_reply(crimson::net::ConnectionRef conn, |
11fdf7f2 TL |
65 | Ref<MOSDPing> m); |
66 | seastar::future<> handle_you_died(); | |
67 | ||
11fdf7f2 | 68 | /// remove down OSDs |
f67539c2 TL |
69 | /// @return peers not added in this epoch |
70 | osds_t remove_down_peers(); | |
11fdf7f2 TL |
71 | /// add enough reporters for fast failure detection |
72 | void add_reporter_peers(int whoami); | |
73 | ||
9f95a23c | 74 | seastar::future<> start_messenger(crimson::net::Messenger& msgr, |
11fdf7f2 | 75 | const entity_addrvec_t& addrs); |
1e59de90 TL |
76 | seastar::future<> maybe_share_osdmap(crimson::net::ConnectionRef, |
77 | Ref<MOSDPing> m); | |
11fdf7f2 | 78 | private: |
f67539c2 | 79 | const osd_id_t whoami; |
1e59de90 | 80 | crimson::osd::ShardServices& service; |
9f95a23c | 81 | crimson::mon::Client& monc; |
1e59de90 TL |
82 | crimson::net::Messenger &front_msgr; |
83 | crimson::net::Messenger &back_msgr; | |
11fdf7f2 TL |
84 | |
85 | seastar::timer<seastar::lowres_clock> timer; | |
86 | // use real_clock so it can be converted to utime_t | |
87 | using clock = ceph::coarse_real_clock; | |
88 | ||
f67539c2 TL |
89 | class ConnectionListener; |
90 | class Connection; | |
91 | class Session; | |
92 | class Peer; | |
93 | using peers_map_t = std::map<osd_id_t, Peer>; | |
11fdf7f2 TL |
94 | peers_map_t peers; |
95 | ||
96 | // osds which are considered failed | |
97 | // osd_id => when was the last time that both front and back pings were acked | |
f67539c2 | 98 | // or sent. |
11fdf7f2 TL |
99 | // use for calculating how long the OSD has been unresponsive |
100 | using failure_queue_t = std::map<osd_id_t, clock::time_point>; | |
f67539c2 TL |
101 | seastar::future<> send_failures(failure_queue_t&& failure_queue); |
102 | seastar::future<> send_heartbeats(); | |
103 | void heartbeat_check(); | |
104 | ||
11fdf7f2 TL |
105 | // osds we've reported to monior as failed ones, but they are not marked down |
106 | // yet | |
f67539c2 TL |
107 | crimson::common::Gated gate; |
108 | ||
109 | class FailingPeers { | |
110 | public: | |
111 | FailingPeers(Heartbeat& heartbeat) : heartbeat(heartbeat) {} | |
112 | bool add_pending(osd_id_t peer, | |
113 | clock::time_point failed_since, | |
114 | clock::time_point now, | |
115 | std::vector<seastar::future<>>& futures); | |
116 | seastar::future<> cancel_one(osd_id_t peer); | |
117 | ||
118 | private: | |
119 | seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&); | |
120 | ||
121 | Heartbeat& heartbeat; | |
122 | ||
123 | struct failure_info_t { | |
124 | clock::time_point failed_since; | |
125 | entity_addrvec_t addrs; | |
126 | }; | |
127 | std::map<osd_id_t, failure_info_t> failure_pending; | |
128 | } failing_peers; | |
129 | }; | |
130 | ||
131 | inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) { | |
132 | hb.print(out); | |
133 | return out; | |
134 | } | |
135 | ||
136 | /* | |
137 | * Event driven interface for Heartbeat::Peer to be notified when both hb_front | |
138 | * and hb_back are connected, or connection is lost. | |
139 | */ | |
140 | class Heartbeat::ConnectionListener { | |
141 | public: | |
142 | ConnectionListener(size_t connections) : connections{connections} {} | |
143 | ||
144 | void increase_connected() { | |
145 | assert(connected < connections); | |
146 | ++connected; | |
147 | if (connected == connections) { | |
148 | on_connected(); | |
149 | } | |
150 | } | |
151 | void decrease_connected() { | |
152 | assert(connected > 0); | |
153 | if (connected == connections) { | |
154 | on_disconnected(); | |
155 | } | |
156 | --connected; | |
157 | } | |
158 | enum class type_t { front, back }; | |
159 | virtual entity_addr_t get_peer_addr(type_t) = 0; | |
160 | ||
161 | protected: | |
162 | virtual void on_connected() = 0; | |
163 | virtual void on_disconnected() = 0; | |
164 | ||
165 | private: | |
166 | const size_t connections; | |
167 | size_t connected = 0; | |
168 | }; | |
169 | ||
170 | class Heartbeat::Connection { | |
171 | public: | |
172 | using type_t = ConnectionListener::type_t; | |
173 | Connection(osd_id_t peer, bool is_winner_side, type_t type, | |
174 | crimson::net::Messenger& msgr, | |
175 | ConnectionListener& listener) | |
176 | : peer{peer}, type{type}, | |
177 | msgr{msgr}, listener{listener}, | |
178 | is_winner_side{is_winner_side} { | |
179 | connect(); | |
180 | } | |
181 | Connection(const Connection&) = delete; | |
182 | Connection(Connection&&) = delete; | |
183 | Connection& operator=(const Connection&) = delete; | |
184 | Connection& operator=(Connection&&) = delete; | |
185 | ||
186 | ~Connection(); | |
187 | ||
188 | bool matches(crimson::net::ConnectionRef _conn) const; | |
189 | void connected() { | |
190 | set_connected(); | |
191 | } | |
192 | void accepted(crimson::net::ConnectionRef); | |
193 | void replaced(); | |
194 | void reset(); | |
20effc67 | 195 | seastar::future<> send(MessageURef msg); |
f67539c2 TL |
196 | void validate(); |
197 | // retry connection if still pending | |
198 | void retry(); | |
199 | ||
200 | private: | |
201 | void set_connected(); | |
202 | void connect(); | |
203 | ||
204 | const osd_id_t peer; | |
205 | const type_t type; | |
206 | crimson::net::Messenger& msgr; | |
207 | ConnectionListener& listener; | |
208 | ||
209 | /* | |
210 | * Resolve the following racing when both me and peer are trying to connect | |
211 | * each other symmetrically, under SocketPolicy::lossy_client: | |
212 | * | |
213 | * OSD.A OSD.B | |
214 | * - - | |
215 | * |-[1]----> <----[2]-| | |
216 | * \ / | |
217 | * \ / | |
218 | * delay.. X delay.. | |
219 | * / \ | |
220 | * |-[1]x> / \ <x[2]-| | |
221 | * |<-[2]--- ---[1]->| | |
222 | * |(reset#1) (reset#2)| | |
223 | * |(reconnectB) (reconnectA)| | |
224 | * |-[2]---> <---[1]-| | |
225 | * delay.. delay.. | |
226 | * (remote close populated) | |
227 | * |-[2]x> <x[1]-| | |
228 | * |(reset#2) (reset#1)| | |
229 | * | ... ... | | |
230 | * (dead loop!) | |
231 | * | |
232 | * Our solution is to remember if such racing was happened recently, and | |
233 | * establish connection asymmetrically only from the winner side whose osd-id | |
234 | * is larger. | |
235 | */ | |
236 | const bool is_winner_side; | |
237 | bool racing_detected = false; | |
238 | ||
239 | crimson::net::ConnectionRef conn; | |
240 | bool is_connected = false; | |
241 | ||
242 | friend std::ostream& operator<<(std::ostream& os, const Connection& c) { | |
243 | if (c.type == type_t::front) { | |
244 | return os << "con_front(osd." << c.peer << ")"; | |
245 | } else { | |
246 | return os << "con_back(osd." << c.peer << ")"; | |
247 | } | |
248 | } | |
249 | }; | |
250 | ||
1e59de90 TL |
251 | #if FMT_VERSION >= 90000 |
252 | template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {}; | |
253 | #endif | |
254 | ||
f67539c2 TL |
255 | /* |
256 | * Track the ping history and ping reply (the pong) from the same session, clean up | |
257 | * history once hb_front or hb_back loses connection and restart the session once | |
258 | * both connections are connected again. | |
259 | * | |
260 | * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back | |
261 | * loses connection, because we would end up with the following deadloop: | |
262 | * | |
263 | * OSD.A OSD.B | |
264 | * - - | |
265 | * hb_front reset <--(network)--- hb_front close | |
266 | * | ^ | |
267 | * | | | |
268 | * remove Peer B (dead loop!) remove Peer A | |
269 | * | | | |
270 | * V | | |
271 | * hb_back close ----(network)---> hb_back reset | |
272 | */ | |
273 | class Heartbeat::Session { | |
274 | public: | |
275 | Session(osd_id_t peer) : peer{peer} {} | |
276 | ||
1e59de90 TL |
277 | void set_epoch_added(epoch_t epoch_) { epoch = epoch_; } |
278 | epoch_t get_epoch_added() const { return epoch; } | |
279 | ||
280 | void set_last_epoch_sent(epoch_t epoch_) { last_sent_epoch = epoch_; } | |
281 | epoch_t get_last_epoch_sent() const { return last_sent_epoch; } | |
282 | ||
f67539c2 TL |
283 | bool is_started() const { return connected; } |
284 | bool pinged() const { | |
285 | if (clock::is_zero(first_tx)) { | |
286 | // i can never receive a pong without sending any ping message first. | |
287 | assert(clock::is_zero(last_rx_front) && | |
288 | clock::is_zero(last_rx_back)); | |
289 | return false; | |
290 | } else { | |
291 | return true; | |
292 | } | |
293 | } | |
294 | ||
295 | enum class health_state { | |
296 | UNKNOWN, | |
297 | UNHEALTHY, | |
298 | HEALTHY, | |
299 | }; | |
300 | health_state do_health_screen(clock::time_point now) const { | |
301 | if (!pinged()) { | |
302 | // we are not healty nor unhealty because we haven't sent anything yet | |
303 | return health_state::UNKNOWN; | |
304 | } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) { | |
305 | return health_state::UNHEALTHY; | |
306 | } else if (!clock::is_zero(last_rx_front) && | |
307 | !clock::is_zero(last_rx_back)) { | |
308 | // only declare to be healthy until we have received the first | |
309 | // replies from both front/back connections | |
310 | return health_state::HEALTHY; | |
311 | } else { | |
312 | return health_state::UNKNOWN; | |
313 | } | |
314 | } | |
315 | ||
316 | clock::time_point failed_since(clock::time_point now) const; | |
317 | ||
318 | void set_tx(clock::time_point now) { | |
319 | if (!pinged()) { | |
320 | first_tx = now; | |
321 | } | |
322 | last_tx = now; | |
323 | } | |
324 | ||
325 | void on_connected() { | |
326 | assert(!connected); | |
327 | connected = true; | |
328 | ping_history.clear(); | |
329 | } | |
330 | ||
331 | void on_ping(const utime_t& sent_stamp, | |
332 | const clock::time_point& deadline) { | |
333 | assert(connected); | |
334 | [[maybe_unused]] auto [reply, added] = | |
335 | ping_history.emplace(sent_stamp, reply_t{deadline, 2}); | |
336 | } | |
337 | ||
338 | bool on_pong(const utime_t& ping_stamp, | |
339 | Connection::type_t type, | |
340 | clock::time_point now) { | |
341 | assert(connected); | |
342 | auto ping = ping_history.find(ping_stamp); | |
343 | if (ping == ping_history.end()) { | |
344 | // old replies, deprecated by newly sent pings. | |
345 | return false; | |
346 | } | |
347 | auto& unacked = ping->second.unacknowledged; | |
348 | assert(unacked); | |
349 | if (type == Connection::type_t::front) { | |
350 | last_rx_front = now; | |
351 | unacked--; | |
352 | } else { | |
353 | last_rx_back = now; | |
354 | unacked--; | |
355 | } | |
356 | if (unacked == 0) { | |
357 | ping_history.erase(ping_history.begin(), ++ping); | |
358 | } | |
359 | return true; | |
360 | } | |
361 | ||
362 | void on_disconnected() { | |
363 | assert(connected); | |
364 | connected = false; | |
365 | if (!ping_history.empty()) { | |
366 | // we lost our ping_history of the last session, but still need to keep | |
367 | // the oldest deadline for unhealthy check. | |
368 | auto oldest = ping_history.begin(); | |
369 | auto sent_stamp = oldest->first; | |
370 | auto deadline = oldest->second.deadline; | |
371 | ping_history.clear(); | |
372 | ping_history.emplace(sent_stamp, reply_t{deadline, 0}); | |
373 | } | |
374 | } | |
375 | ||
376 | // maintain an entry in ping_history for unhealthy check | |
377 | void set_inactive_history(clock::time_point); | |
378 | ||
379 | private: | |
380 | const osd_id_t peer; | |
381 | bool connected = false; | |
382 | // time we sent our first ping request | |
383 | clock::time_point first_tx; | |
384 | // last time we sent a ping request | |
385 | clock::time_point last_tx; | |
386 | // last time we got a ping reply on the front side | |
387 | clock::time_point last_rx_front; | |
388 | // last time we got a ping reply on the back side | |
389 | clock::time_point last_rx_back; | |
390 | // most recent epoch we wanted this peer | |
1e59de90 TL |
391 | epoch_t epoch; // rename me to epoch_added |
392 | // last epoch sent | |
393 | epoch_t last_sent_epoch = 0; | |
f67539c2 TL |
394 | |
395 | struct reply_t { | |
396 | clock::time_point deadline; | |
397 | // one sent over front conn, another sent over back conn | |
398 | uint8_t unacknowledged = 0; | |
399 | }; | |
400 | // history of inflight pings, arranging by timestamp we sent | |
401 | std::map<utime_t, reply_t> ping_history; | |
402 | }; | |
403 | ||
404 | class Heartbeat::Peer final : private Heartbeat::ConnectionListener { | |
405 | public: | |
406 | Peer(Heartbeat&, osd_id_t); | |
407 | ~Peer(); | |
408 | Peer(Peer&&) = delete; | |
409 | Peer(const Peer&) = delete; | |
410 | Peer& operator=(Peer&&) = delete; | |
411 | Peer& operator=(const Peer&) = delete; | |
412 | ||
1e59de90 TL |
413 | // set/get the epoch at which the peer was added |
414 | void set_epoch_added(epoch_t epoch) { session.set_epoch_added(epoch); } | |
415 | epoch_t get_epoch_added() const { return session.get_epoch_added(); } | |
416 | ||
417 | void set_last_epoch_sent(epoch_t epoch) { session.set_last_epoch_sent(epoch); } | |
418 | epoch_t get_last_epoch_sent() const { return session.get_last_epoch_sent(); } | |
f67539c2 TL |
419 | |
420 | // if failure, return time_point since last active | |
421 | // else, return clock::zero() | |
422 | clock::time_point failed_since(clock::time_point now) const { | |
423 | return session.failed_since(now); | |
424 | } | |
425 | void send_heartbeat( | |
426 | clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&); | |
427 | seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>); | |
428 | void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { | |
429 | for_each_conn([&] (auto& _conn) { | |
430 | if (_conn.matches(conn)) { | |
431 | if (is_replace) { | |
432 | _conn.replaced(); | |
433 | } else { | |
434 | _conn.reset(); | |
435 | } | |
436 | } | |
437 | }); | |
438 | } | |
439 | void handle_connect(crimson::net::ConnectionRef conn) { | |
440 | for_each_conn([&] (auto& _conn) { | |
441 | if (_conn.matches(conn)) { | |
442 | _conn.connected(); | |
443 | } | |
444 | }); | |
445 | } | |
446 | void handle_accept(crimson::net::ConnectionRef conn) { | |
447 | for_each_conn([&] (auto& _conn) { | |
448 | _conn.accepted(conn); | |
449 | }); | |
450 | } | |
451 | ||
452 | private: | |
453 | entity_addr_t get_peer_addr(type_t type) override; | |
454 | void on_connected() override; | |
455 | void on_disconnected() override; | |
456 | void do_send_heartbeat( | |
457 | clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*); | |
458 | ||
459 | template <typename Func> | |
460 | void for_each_conn(Func&& f) { | |
461 | f(con_front); | |
462 | f(con_back); | |
463 | } | |
464 | ||
465 | Heartbeat& heartbeat; | |
466 | const osd_id_t peer; | |
467 | Session session; | |
468 | // if need to send heartbeat when session connected | |
469 | bool pending_send = false; | |
470 | Connection con_front; | |
471 | Connection con_back; | |
11fdf7f2 | 472 | }; |
1e59de90 TL |
473 | |
474 | #if FMT_VERSION >= 90000 | |
475 | template <> struct fmt::formatter<Heartbeat> : fmt::ostream_formatter {}; | |
476 | #endif |