]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/heartbeat.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / heartbeat.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#pragma once
5
6#include <cstdint>
7#include <seastar/core/future.hh>
8#include "common/ceph_time.h"
f67539c2 9#include "crimson/common/gated.h"
11fdf7f2
TL
10#include "crimson/net/Dispatcher.h"
11#include "crimson/net/Fwd.h"
12
13class MOSDPing;
11fdf7f2 14
9f95a23c
TL
15namespace crimson::osd {
16 class ShardServices;
17}
18
19namespace crimson::mon {
11fdf7f2
TL
20 class Client;
21}
22
23template<typename Message> using Ref = boost::intrusive_ptr<Message>;
24
9f95a23c 25class Heartbeat : public crimson::net::Dispatcher {
11fdf7f2
TL
26public:
27 using osd_id_t = int;
28
f67539c2
TL
29 Heartbeat(osd_id_t whoami,
30 const crimson::osd::ShardServices& service,
9f95a23c
TL
31 crimson::mon::Client& monc,
32 crimson::net::MessengerRef front_msgr,
33 crimson::net::MessengerRef back_msgr);
11fdf7f2
TL
34
35 seastar::future<> start(entity_addrvec_t front,
36 entity_addrvec_t back);
37 seastar::future<> stop();
38
f67539c2 39 using osds_t = std::vector<osd_id_t>;
9f95a23c 40 void add_peer(osd_id_t peer, epoch_t epoch);
f67539c2
TL
41 void update_peers(int whoami);
42 void remove_peer(osd_id_t peer);
43 osds_t get_peers() const;
11fdf7f2
TL
44
45 const entity_addrvec_t& get_front_addrs() const;
46 const entity_addrvec_t& get_back_addrs() const;
47
9f95a23c
TL
48 void set_require_authorizer(bool);
49
11fdf7f2 50 // Dispatcher methods
f67539c2
TL
51 std::optional<seastar::future<>> ms_dispatch(
52 crimson::net::ConnectionRef conn, MessageRef m) override;
53 void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
54 void ms_handle_connect(crimson::net::ConnectionRef conn) override;
55 void ms_handle_accept(crimson::net::ConnectionRef conn) override;
11fdf7f2 56
f67539c2 57 void print(std::ostream&) const;
11fdf7f2 58private:
f67539c2 59 seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn,
11fdf7f2 60 Ref<MOSDPing> m);
f67539c2 61 seastar::future<> handle_ping(crimson::net::ConnectionRef conn,
11fdf7f2 62 Ref<MOSDPing> m);
f67539c2 63 seastar::future<> handle_reply(crimson::net::ConnectionRef conn,
11fdf7f2
TL
64 Ref<MOSDPing> m);
65 seastar::future<> handle_you_died();
66
11fdf7f2 67 /// remove down OSDs
f67539c2
TL
68 /// @return peers not added in this epoch
69 osds_t remove_down_peers();
11fdf7f2
TL
70 /// add enough reporters for fast failure detection
71 void add_reporter_peers(int whoami);
72
9f95a23c 73 seastar::future<> start_messenger(crimson::net::Messenger& msgr,
11fdf7f2
TL
74 const entity_addrvec_t& addrs);
75private:
f67539c2 76 const osd_id_t whoami;
9f95a23c
TL
77 const crimson::osd::ShardServices& service;
78 crimson::mon::Client& monc;
79 crimson::net::MessengerRef front_msgr;
80 crimson::net::MessengerRef back_msgr;
11fdf7f2
TL
81
82 seastar::timer<seastar::lowres_clock> timer;
83 // use real_clock so it can be converted to utime_t
84 using clock = ceph::coarse_real_clock;
85
f67539c2
TL
86 class ConnectionListener;
87 class Connection;
88 class Session;
89 class Peer;
90 using peers_map_t = std::map<osd_id_t, Peer>;
11fdf7f2
TL
91 peers_map_t peers;
92
93 // osds which are considered failed
94 // osd_id => when was the last time that both front and back pings were acked
f67539c2 95 // or sent.
11fdf7f2
TL
96 // use for calculating how long the OSD has been unresponsive
97 using failure_queue_t = std::map<osd_id_t, clock::time_point>;
f67539c2
TL
98 seastar::future<> send_failures(failure_queue_t&& failure_queue);
99 seastar::future<> send_heartbeats();
100 void heartbeat_check();
101
11fdf7f2
TL
102 // osds we've reported to monior as failed ones, but they are not marked down
103 // yet
f67539c2
TL
104 crimson::common::Gated gate;
105
106 class FailingPeers {
107 public:
108 FailingPeers(Heartbeat& heartbeat) : heartbeat(heartbeat) {}
109 bool add_pending(osd_id_t peer,
110 clock::time_point failed_since,
111 clock::time_point now,
112 std::vector<seastar::future<>>& futures);
113 seastar::future<> cancel_one(osd_id_t peer);
114
115 private:
116 seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
117
118 Heartbeat& heartbeat;
119
120 struct failure_info_t {
121 clock::time_point failed_since;
122 entity_addrvec_t addrs;
123 };
124 std::map<osd_id_t, failure_info_t> failure_pending;
125 } failing_peers;
126};
127
128inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {
129 hb.print(out);
130 return out;
131}
132
133/*
134 * Event driven interface for Heartbeat::Peer to be notified when both hb_front
135 * and hb_back are connected, or connection is lost.
136 */
137class Heartbeat::ConnectionListener {
138 public:
139 ConnectionListener(size_t connections) : connections{connections} {}
140
141 void increase_connected() {
142 assert(connected < connections);
143 ++connected;
144 if (connected == connections) {
145 on_connected();
146 }
147 }
148 void decrease_connected() {
149 assert(connected > 0);
150 if (connected == connections) {
151 on_disconnected();
152 }
153 --connected;
154 }
155 enum class type_t { front, back };
156 virtual entity_addr_t get_peer_addr(type_t) = 0;
157
158 protected:
159 virtual void on_connected() = 0;
160 virtual void on_disconnected() = 0;
161
162 private:
163 const size_t connections;
164 size_t connected = 0;
165};
166
167class Heartbeat::Connection {
168 public:
169 using type_t = ConnectionListener::type_t;
170 Connection(osd_id_t peer, bool is_winner_side, type_t type,
171 crimson::net::Messenger& msgr,
172 ConnectionListener& listener)
173 : peer{peer}, type{type},
174 msgr{msgr}, listener{listener},
175 is_winner_side{is_winner_side} {
176 connect();
177 }
178 Connection(const Connection&) = delete;
179 Connection(Connection&&) = delete;
180 Connection& operator=(const Connection&) = delete;
181 Connection& operator=(Connection&&) = delete;
182
183 ~Connection();
184
185 bool matches(crimson::net::ConnectionRef _conn) const;
186 void connected() {
187 set_connected();
188 }
189 void accepted(crimson::net::ConnectionRef);
190 void replaced();
191 void reset();
192 seastar::future<> send(MessageRef msg);
193 void validate();
194 // retry connection if still pending
195 void retry();
196
197 private:
198 void set_connected();
199 void connect();
200
201 const osd_id_t peer;
202 const type_t type;
203 crimson::net::Messenger& msgr;
204 ConnectionListener& listener;
205
206/*
207 * Resolve the following racing when both me and peer are trying to connect
208 * each other symmetrically, under SocketPolicy::lossy_client:
209 *
210 * OSD.A OSD.B
211 * - -
212 * |-[1]----> <----[2]-|
213 * \ /
214 * \ /
215 * delay.. X delay..
216 * / \
217 * |-[1]x> / \ <x[2]-|
218 * |<-[2]--- ---[1]->|
219 * |(reset#1) (reset#2)|
220 * |(reconnectB) (reconnectA)|
221 * |-[2]---> <---[1]-|
222 * delay.. delay..
223 * (remote close populated)
224 * |-[2]x> <x[1]-|
225 * |(reset#2) (reset#1)|
226 * | ... ... |
227 * (dead loop!)
228 *
229 * Our solution is to remember if such racing was happened recently, and
230 * establish connection asymmetrically only from the winner side whose osd-id
231 * is larger.
232 */
233 const bool is_winner_side;
234 bool racing_detected = false;
235
236 crimson::net::ConnectionRef conn;
237 bool is_connected = false;
238
239 friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
240 if (c.type == type_t::front) {
241 return os << "con_front(osd." << c.peer << ")";
242 } else {
243 return os << "con_back(osd." << c.peer << ")";
244 }
245 }
246};
247
248/*
249 * Track the ping history and ping reply (the pong) from the same session, clean up
250 * history once hb_front or hb_back loses connection and restart the session once
251 * both connections are connected again.
252 *
253 * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back
254 * loses connection, because we would end up with the following deadloop:
255 *
256 * OSD.A OSD.B
257 * - -
258 * hb_front reset <--(network)--- hb_front close
259 * | ^
260 * | |
261 * remove Peer B (dead loop!) remove Peer A
262 * | |
263 * V |
264 * hb_back close ----(network)---> hb_back reset
265 */
266class Heartbeat::Session {
267 public:
268 Session(osd_id_t peer) : peer{peer} {}
269
270 void set_epoch(epoch_t epoch_) { epoch = epoch_; }
271 epoch_t get_epoch() const { return epoch; }
272 bool is_started() const { return connected; }
273 bool pinged() const {
274 if (clock::is_zero(first_tx)) {
275 // i can never receive a pong without sending any ping message first.
276 assert(clock::is_zero(last_rx_front) &&
277 clock::is_zero(last_rx_back));
278 return false;
279 } else {
280 return true;
281 }
282 }
283
284 enum class health_state {
285 UNKNOWN,
286 UNHEALTHY,
287 HEALTHY,
288 };
289 health_state do_health_screen(clock::time_point now) const {
290 if (!pinged()) {
291 // we are not healty nor unhealty because we haven't sent anything yet
292 return health_state::UNKNOWN;
293 } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) {
294 return health_state::UNHEALTHY;
295 } else if (!clock::is_zero(last_rx_front) &&
296 !clock::is_zero(last_rx_back)) {
297 // only declare to be healthy until we have received the first
298 // replies from both front/back connections
299 return health_state::HEALTHY;
300 } else {
301 return health_state::UNKNOWN;
302 }
303 }
304
305 clock::time_point failed_since(clock::time_point now) const;
306
307 void set_tx(clock::time_point now) {
308 if (!pinged()) {
309 first_tx = now;
310 }
311 last_tx = now;
312 }
313
314 void on_connected() {
315 assert(!connected);
316 connected = true;
317 ping_history.clear();
318 }
319
320 void on_ping(const utime_t& sent_stamp,
321 const clock::time_point& deadline) {
322 assert(connected);
323 [[maybe_unused]] auto [reply, added] =
324 ping_history.emplace(sent_stamp, reply_t{deadline, 2});
325 }
326
327 bool on_pong(const utime_t& ping_stamp,
328 Connection::type_t type,
329 clock::time_point now) {
330 assert(connected);
331 auto ping = ping_history.find(ping_stamp);
332 if (ping == ping_history.end()) {
333 // old replies, deprecated by newly sent pings.
334 return false;
335 }
336 auto& unacked = ping->second.unacknowledged;
337 assert(unacked);
338 if (type == Connection::type_t::front) {
339 last_rx_front = now;
340 unacked--;
341 } else {
342 last_rx_back = now;
343 unacked--;
344 }
345 if (unacked == 0) {
346 ping_history.erase(ping_history.begin(), ++ping);
347 }
348 return true;
349 }
350
351 void on_disconnected() {
352 assert(connected);
353 connected = false;
354 if (!ping_history.empty()) {
355 // we lost our ping_history of the last session, but still need to keep
356 // the oldest deadline for unhealthy check.
357 auto oldest = ping_history.begin();
358 auto sent_stamp = oldest->first;
359 auto deadline = oldest->second.deadline;
360 ping_history.clear();
361 ping_history.emplace(sent_stamp, reply_t{deadline, 0});
362 }
363 }
364
365 // maintain an entry in ping_history for unhealthy check
366 void set_inactive_history(clock::time_point);
367
368 private:
369 const osd_id_t peer;
370 bool connected = false;
371 // time we sent our first ping request
372 clock::time_point first_tx;
373 // last time we sent a ping request
374 clock::time_point last_tx;
375 // last time we got a ping reply on the front side
376 clock::time_point last_rx_front;
377 // last time we got a ping reply on the back side
378 clock::time_point last_rx_back;
379 // most recent epoch we wanted this peer
380 epoch_t epoch;
381
382 struct reply_t {
383 clock::time_point deadline;
384 // one sent over front conn, another sent over back conn
385 uint8_t unacknowledged = 0;
386 };
387 // history of inflight pings, arranging by timestamp we sent
388 std::map<utime_t, reply_t> ping_history;
389};
390
391class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
392 public:
393 Peer(Heartbeat&, osd_id_t);
394 ~Peer();
395 Peer(Peer&&) = delete;
396 Peer(const Peer&) = delete;
397 Peer& operator=(Peer&&) = delete;
398 Peer& operator=(const Peer&) = delete;
399
400 void set_epoch(epoch_t epoch) { session.set_epoch(epoch); }
401 epoch_t get_epoch() const { return session.get_epoch(); }
402
403 // if failure, return time_point since last active
404 // else, return clock::zero()
405 clock::time_point failed_since(clock::time_point now) const {
406 return session.failed_since(now);
407 }
408 void send_heartbeat(
409 clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
410 seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>);
411 void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
412 for_each_conn([&] (auto& _conn) {
413 if (_conn.matches(conn)) {
414 if (is_replace) {
415 _conn.replaced();
416 } else {
417 _conn.reset();
418 }
419 }
420 });
421 }
422 void handle_connect(crimson::net::ConnectionRef conn) {
423 for_each_conn([&] (auto& _conn) {
424 if (_conn.matches(conn)) {
425 _conn.connected();
426 }
427 });
428 }
429 void handle_accept(crimson::net::ConnectionRef conn) {
430 for_each_conn([&] (auto& _conn) {
431 _conn.accepted(conn);
432 });
433 }
434
435 private:
436 entity_addr_t get_peer_addr(type_t type) override;
437 void on_connected() override;
438 void on_disconnected() override;
439 void do_send_heartbeat(
440 clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*);
441
442 template <typename Func>
443 void for_each_conn(Func&& f) {
444 f(con_front);
445 f(con_back);
446 }
447
448 Heartbeat& heartbeat;
449 const osd_id_t peer;
450 Session session;
451 // if need to send heartbeat when session connected
452 bool pending_send = false;
453 Connection con_front;
454 Connection con_back;
11fdf7f2 455};