]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
6 | #include <cstdint> | |
7 | #include <seastar/core/future.hh> | |
8 | #include "common/ceph_time.h" | |
f67539c2 | 9 | #include "crimson/common/gated.h" |
11fdf7f2 TL |
10 | #include "crimson/net/Dispatcher.h" |
11 | #include "crimson/net/Fwd.h" | |
12 | ||
13 | class MOSDPing; | |
11fdf7f2 | 14 | |
9f95a23c TL |
15 | namespace crimson::osd { |
16 | class ShardServices; | |
17 | } | |
18 | ||
19 | namespace crimson::mon { | |
11fdf7f2 TL |
20 | class Client; |
21 | } | |
22 | ||
23 | template<typename Message> using Ref = boost::intrusive_ptr<Message>; | |
24 | ||
9f95a23c | 25 | class Heartbeat : public crimson::net::Dispatcher { |
11fdf7f2 TL |
26 | public: |
27 | using osd_id_t = int; | |
28 | ||
f67539c2 TL |
29 | Heartbeat(osd_id_t whoami, |
30 | const crimson::osd::ShardServices& service, | |
9f95a23c TL |
31 | crimson::mon::Client& monc, |
32 | crimson::net::MessengerRef front_msgr, | |
33 | crimson::net::MessengerRef back_msgr); | |
11fdf7f2 TL |
34 | |
35 | seastar::future<> start(entity_addrvec_t front, | |
36 | entity_addrvec_t back); | |
37 | seastar::future<> stop(); | |
38 | ||
f67539c2 | 39 | using osds_t = std::vector<osd_id_t>; |
9f95a23c | 40 | void add_peer(osd_id_t peer, epoch_t epoch); |
f67539c2 TL |
41 | void update_peers(int whoami); |
42 | void remove_peer(osd_id_t peer); | |
43 | osds_t get_peers() const; | |
11fdf7f2 TL |
44 | |
45 | const entity_addrvec_t& get_front_addrs() const; | |
46 | const entity_addrvec_t& get_back_addrs() const; | |
47 | ||
9f95a23c TL |
48 | void set_require_authorizer(bool); |
49 | ||
11fdf7f2 | 50 | // Dispatcher methods |
f67539c2 TL |
51 | std::optional<seastar::future<>> ms_dispatch( |
52 | crimson::net::ConnectionRef conn, MessageRef m) override; | |
53 | void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; | |
54 | void ms_handle_connect(crimson::net::ConnectionRef conn) override; | |
55 | void ms_handle_accept(crimson::net::ConnectionRef conn) override; | |
11fdf7f2 | 56 | |
f67539c2 | 57 | void print(std::ostream&) const; |
11fdf7f2 | 58 | private: |
f67539c2 | 59 | seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn, |
11fdf7f2 | 60 | Ref<MOSDPing> m); |
f67539c2 | 61 | seastar::future<> handle_ping(crimson::net::ConnectionRef conn, |
11fdf7f2 | 62 | Ref<MOSDPing> m); |
f67539c2 | 63 | seastar::future<> handle_reply(crimson::net::ConnectionRef conn, |
11fdf7f2 TL |
64 | Ref<MOSDPing> m); |
65 | seastar::future<> handle_you_died(); | |
66 | ||
11fdf7f2 | 67 | /// remove down OSDs |
f67539c2 TL |
68 | /// @return peers not added in this epoch |
69 | osds_t remove_down_peers(); | |
11fdf7f2 TL |
70 | /// add enough reporters for fast failure detection |
71 | void add_reporter_peers(int whoami); | |
72 | ||
9f95a23c | 73 | seastar::future<> start_messenger(crimson::net::Messenger& msgr, |
11fdf7f2 TL |
74 | const entity_addrvec_t& addrs); |
75 | private: | |
f67539c2 | 76 | const osd_id_t whoami; |
9f95a23c TL |
77 | const crimson::osd::ShardServices& service; |
78 | crimson::mon::Client& monc; | |
79 | crimson::net::MessengerRef front_msgr; | |
80 | crimson::net::MessengerRef back_msgr; | |
11fdf7f2 TL |
81 | |
82 | seastar::timer<seastar::lowres_clock> timer; | |
83 | // use real_clock so it can be converted to utime_t | |
84 | using clock = ceph::coarse_real_clock; | |
85 | ||
f67539c2 TL |
86 | class ConnectionListener; |
87 | class Connection; | |
88 | class Session; | |
89 | class Peer; | |
90 | using peers_map_t = std::map<osd_id_t, Peer>; | |
11fdf7f2 TL |
91 | peers_map_t peers; |
92 | ||
93 | // osds which are considered failed | |
94 | // osd_id => when was the last time that both front and back pings were acked | |
f67539c2 | 95 | // or sent. |
11fdf7f2 TL |
96 | // use for calculating how long the OSD has been unresponsive |
97 | using failure_queue_t = std::map<osd_id_t, clock::time_point>; | |
f67539c2 TL |
98 | seastar::future<> send_failures(failure_queue_t&& failure_queue); |
99 | seastar::future<> send_heartbeats(); | |
100 | void heartbeat_check(); | |
101 | ||
11fdf7f2 TL |
102 | // osds we've reported to monior as failed ones, but they are not marked down |
103 | // yet | |
f67539c2 TL |
104 | crimson::common::Gated gate; |
105 | ||
106 | class FailingPeers { | |
107 | public: | |
108 | FailingPeers(Heartbeat& heartbeat) : heartbeat(heartbeat) {} | |
109 | bool add_pending(osd_id_t peer, | |
110 | clock::time_point failed_since, | |
111 | clock::time_point now, | |
112 | std::vector<seastar::future<>>& futures); | |
113 | seastar::future<> cancel_one(osd_id_t peer); | |
114 | ||
115 | private: | |
116 | seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&); | |
117 | ||
118 | Heartbeat& heartbeat; | |
119 | ||
120 | struct failure_info_t { | |
121 | clock::time_point failed_since; | |
122 | entity_addrvec_t addrs; | |
123 | }; | |
124 | std::map<osd_id_t, failure_info_t> failure_pending; | |
125 | } failing_peers; | |
126 | }; | |
127 | ||
128 | inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) { | |
129 | hb.print(out); | |
130 | return out; | |
131 | } | |
132 | ||
133 | /* | |
134 | * Event driven interface for Heartbeat::Peer to be notified when both hb_front | |
135 | * and hb_back are connected, or connection is lost. | |
136 | */ | |
137 | class Heartbeat::ConnectionListener { | |
138 | public: | |
139 | ConnectionListener(size_t connections) : connections{connections} {} | |
140 | ||
141 | void increase_connected() { | |
142 | assert(connected < connections); | |
143 | ++connected; | |
144 | if (connected == connections) { | |
145 | on_connected(); | |
146 | } | |
147 | } | |
148 | void decrease_connected() { | |
149 | assert(connected > 0); | |
150 | if (connected == connections) { | |
151 | on_disconnected(); | |
152 | } | |
153 | --connected; | |
154 | } | |
155 | enum class type_t { front, back }; | |
156 | virtual entity_addr_t get_peer_addr(type_t) = 0; | |
157 | ||
158 | protected: | |
159 | virtual void on_connected() = 0; | |
160 | virtual void on_disconnected() = 0; | |
161 | ||
162 | private: | |
163 | const size_t connections; | |
164 | size_t connected = 0; | |
165 | }; | |
166 | ||
167 | class Heartbeat::Connection { | |
168 | public: | |
169 | using type_t = ConnectionListener::type_t; | |
170 | Connection(osd_id_t peer, bool is_winner_side, type_t type, | |
171 | crimson::net::Messenger& msgr, | |
172 | ConnectionListener& listener) | |
173 | : peer{peer}, type{type}, | |
174 | msgr{msgr}, listener{listener}, | |
175 | is_winner_side{is_winner_side} { | |
176 | connect(); | |
177 | } | |
178 | Connection(const Connection&) = delete; | |
179 | Connection(Connection&&) = delete; | |
180 | Connection& operator=(const Connection&) = delete; | |
181 | Connection& operator=(Connection&&) = delete; | |
182 | ||
183 | ~Connection(); | |
184 | ||
185 | bool matches(crimson::net::ConnectionRef _conn) const; | |
186 | void connected() { | |
187 | set_connected(); | |
188 | } | |
189 | void accepted(crimson::net::ConnectionRef); | |
190 | void replaced(); | |
191 | void reset(); | |
192 | seastar::future<> send(MessageRef msg); | |
193 | void validate(); | |
194 | // retry connection if still pending | |
195 | void retry(); | |
196 | ||
197 | private: | |
198 | void set_connected(); | |
199 | void connect(); | |
200 | ||
201 | const osd_id_t peer; | |
202 | const type_t type; | |
203 | crimson::net::Messenger& msgr; | |
204 | ConnectionListener& listener; | |
205 | ||
206 | /* | |
207 | * Resolve the following racing when both me and peer are trying to connect | |
208 | * each other symmetrically, under SocketPolicy::lossy_client: | |
209 | * | |
210 | * OSD.A OSD.B | |
211 | * - - | |
212 | * |-[1]----> <----[2]-| | |
213 | * \ / | |
214 | * \ / | |
215 | * delay.. X delay.. | |
216 | * / \ | |
217 | * |-[1]x> / \ <x[2]-| | |
218 | * |<-[2]--- ---[1]->| | |
219 | * |(reset#1) (reset#2)| | |
220 | * |(reconnectB) (reconnectA)| | |
221 | * |-[2]---> <---[1]-| | |
222 | * delay.. delay.. | |
223 | * (remote close populated) | |
224 | * |-[2]x> <x[1]-| | |
225 | * |(reset#2) (reset#1)| | |
226 | * | ... ... | | |
227 | * (dead loop!) | |
228 | * | |
229 | * Our solution is to remember if such racing was happened recently, and | |
230 | * establish connection asymmetrically only from the winner side whose osd-id | |
231 | * is larger. | |
232 | */ | |
233 | const bool is_winner_side; | |
234 | bool racing_detected = false; | |
235 | ||
236 | crimson::net::ConnectionRef conn; | |
237 | bool is_connected = false; | |
238 | ||
239 | friend std::ostream& operator<<(std::ostream& os, const Connection& c) { | |
240 | if (c.type == type_t::front) { | |
241 | return os << "con_front(osd." << c.peer << ")"; | |
242 | } else { | |
243 | return os << "con_back(osd." << c.peer << ")"; | |
244 | } | |
245 | } | |
246 | }; | |
247 | ||
248 | /* | |
249 | * Track the ping history and ping reply (the pong) from the same session, clean up | |
250 | * history once hb_front or hb_back loses connection and restart the session once | |
251 | * both connections are connected again. | |
252 | * | |
253 | * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back | |
254 | * loses connection, because we would end up with the following deadloop: | |
255 | * | |
256 | * OSD.A OSD.B | |
257 | * - - | |
258 | * hb_front reset <--(network)--- hb_front close | |
259 | * | ^ | |
260 | * | | | |
261 | * remove Peer B (dead loop!) remove Peer A | |
262 | * | | | |
263 | * V | | |
264 | * hb_back close ----(network)---> hb_back reset | |
265 | */ | |
266 | class Heartbeat::Session { | |
267 | public: | |
268 | Session(osd_id_t peer) : peer{peer} {} | |
269 | ||
270 | void set_epoch(epoch_t epoch_) { epoch = epoch_; } | |
271 | epoch_t get_epoch() const { return epoch; } | |
272 | bool is_started() const { return connected; } | |
273 | bool pinged() const { | |
274 | if (clock::is_zero(first_tx)) { | |
275 | // i can never receive a pong without sending any ping message first. | |
276 | assert(clock::is_zero(last_rx_front) && | |
277 | clock::is_zero(last_rx_back)); | |
278 | return false; | |
279 | } else { | |
280 | return true; | |
281 | } | |
282 | } | |
283 | ||
284 | enum class health_state { | |
285 | UNKNOWN, | |
286 | UNHEALTHY, | |
287 | HEALTHY, | |
288 | }; | |
289 | health_state do_health_screen(clock::time_point now) const { | |
290 | if (!pinged()) { | |
291 | // we are not healty nor unhealty because we haven't sent anything yet | |
292 | return health_state::UNKNOWN; | |
293 | } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) { | |
294 | return health_state::UNHEALTHY; | |
295 | } else if (!clock::is_zero(last_rx_front) && | |
296 | !clock::is_zero(last_rx_back)) { | |
297 | // only declare to be healthy until we have received the first | |
298 | // replies from both front/back connections | |
299 | return health_state::HEALTHY; | |
300 | } else { | |
301 | return health_state::UNKNOWN; | |
302 | } | |
303 | } | |
304 | ||
305 | clock::time_point failed_since(clock::time_point now) const; | |
306 | ||
307 | void set_tx(clock::time_point now) { | |
308 | if (!pinged()) { | |
309 | first_tx = now; | |
310 | } | |
311 | last_tx = now; | |
312 | } | |
313 | ||
314 | void on_connected() { | |
315 | assert(!connected); | |
316 | connected = true; | |
317 | ping_history.clear(); | |
318 | } | |
319 | ||
320 | void on_ping(const utime_t& sent_stamp, | |
321 | const clock::time_point& deadline) { | |
322 | assert(connected); | |
323 | [[maybe_unused]] auto [reply, added] = | |
324 | ping_history.emplace(sent_stamp, reply_t{deadline, 2}); | |
325 | } | |
326 | ||
327 | bool on_pong(const utime_t& ping_stamp, | |
328 | Connection::type_t type, | |
329 | clock::time_point now) { | |
330 | assert(connected); | |
331 | auto ping = ping_history.find(ping_stamp); | |
332 | if (ping == ping_history.end()) { | |
333 | // old replies, deprecated by newly sent pings. | |
334 | return false; | |
335 | } | |
336 | auto& unacked = ping->second.unacknowledged; | |
337 | assert(unacked); | |
338 | if (type == Connection::type_t::front) { | |
339 | last_rx_front = now; | |
340 | unacked--; | |
341 | } else { | |
342 | last_rx_back = now; | |
343 | unacked--; | |
344 | } | |
345 | if (unacked == 0) { | |
346 | ping_history.erase(ping_history.begin(), ++ping); | |
347 | } | |
348 | return true; | |
349 | } | |
350 | ||
351 | void on_disconnected() { | |
352 | assert(connected); | |
353 | connected = false; | |
354 | if (!ping_history.empty()) { | |
355 | // we lost our ping_history of the last session, but still need to keep | |
356 | // the oldest deadline for unhealthy check. | |
357 | auto oldest = ping_history.begin(); | |
358 | auto sent_stamp = oldest->first; | |
359 | auto deadline = oldest->second.deadline; | |
360 | ping_history.clear(); | |
361 | ping_history.emplace(sent_stamp, reply_t{deadline, 0}); | |
362 | } | |
363 | } | |
364 | ||
365 | // maintain an entry in ping_history for unhealthy check | |
366 | void set_inactive_history(clock::time_point); | |
367 | ||
368 | private: | |
369 | const osd_id_t peer; | |
370 | bool connected = false; | |
371 | // time we sent our first ping request | |
372 | clock::time_point first_tx; | |
373 | // last time we sent a ping request | |
374 | clock::time_point last_tx; | |
375 | // last time we got a ping reply on the front side | |
376 | clock::time_point last_rx_front; | |
377 | // last time we got a ping reply on the back side | |
378 | clock::time_point last_rx_back; | |
379 | // most recent epoch we wanted this peer | |
380 | epoch_t epoch; | |
381 | ||
382 | struct reply_t { | |
383 | clock::time_point deadline; | |
384 | // one sent over front conn, another sent over back conn | |
385 | uint8_t unacknowledged = 0; | |
386 | }; | |
387 | // history of inflight pings, arranging by timestamp we sent | |
388 | std::map<utime_t, reply_t> ping_history; | |
389 | }; | |
390 | ||
391 | class Heartbeat::Peer final : private Heartbeat::ConnectionListener { | |
392 | public: | |
393 | Peer(Heartbeat&, osd_id_t); | |
394 | ~Peer(); | |
395 | Peer(Peer&&) = delete; | |
396 | Peer(const Peer&) = delete; | |
397 | Peer& operator=(Peer&&) = delete; | |
398 | Peer& operator=(const Peer&) = delete; | |
399 | ||
400 | void set_epoch(epoch_t epoch) { session.set_epoch(epoch); } | |
401 | epoch_t get_epoch() const { return session.get_epoch(); } | |
402 | ||
403 | // if failure, return time_point since last active | |
404 | // else, return clock::zero() | |
405 | clock::time_point failed_since(clock::time_point now) const { | |
406 | return session.failed_since(now); | |
407 | } | |
408 | void send_heartbeat( | |
409 | clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&); | |
410 | seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>); | |
411 | void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { | |
412 | for_each_conn([&] (auto& _conn) { | |
413 | if (_conn.matches(conn)) { | |
414 | if (is_replace) { | |
415 | _conn.replaced(); | |
416 | } else { | |
417 | _conn.reset(); | |
418 | } | |
419 | } | |
420 | }); | |
421 | } | |
422 | void handle_connect(crimson::net::ConnectionRef conn) { | |
423 | for_each_conn([&] (auto& _conn) { | |
424 | if (_conn.matches(conn)) { | |
425 | _conn.connected(); | |
426 | } | |
427 | }); | |
428 | } | |
429 | void handle_accept(crimson::net::ConnectionRef conn) { | |
430 | for_each_conn([&] (auto& _conn) { | |
431 | _conn.accepted(conn); | |
432 | }); | |
433 | } | |
434 | ||
435 | private: | |
436 | entity_addr_t get_peer_addr(type_t type) override; | |
437 | void on_connected() override; | |
438 | void on_disconnected() override; | |
439 | void do_send_heartbeat( | |
440 | clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*); | |
441 | ||
442 | template <typename Func> | |
443 | void for_each_conn(Func&& f) { | |
444 | f(con_front); | |
445 | f(con_back); | |
446 | } | |
447 | ||
448 | Heartbeat& heartbeat; | |
449 | const osd_id_t peer; | |
450 | Session session; | |
451 | // if need to send heartbeat when session connected | |
452 | bool pending_send = false; | |
453 | Connection con_front; | |
454 | Connection con_back; | |
11fdf7f2 | 455 | }; |