]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/heartbeat.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / heartbeat.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "heartbeat.h"
5
6 #include <boost/range/join.hpp>
7
8 #include "messages/MOSDPing.h"
9 #include "messages/MOSDFailure.h"
10
11 #include "crimson/common/config_proxy.h"
12 #include "crimson/net/Connection.h"
13 #include "crimson/net/Messenger.h"
14 #include "crimson/osd/shard_services.h"
15 #include "crimson/mon/MonClient.h"
16
17 #include "osd/OSDMap.h"
18
19 using crimson::common::local_conf;
20
21 namespace {
22 seastar::logger& logger() {
23 return crimson::get_logger(ceph_subsys_osd);
24 }
25 }
26
27 Heartbeat::Heartbeat(const crimson::osd::ShardServices& service,
28 crimson::mon::Client& monc,
29 crimson::net::MessengerRef front_msgr,
30 crimson::net::MessengerRef back_msgr)
31 : service{service},
32 monc{monc},
33 front_msgr{front_msgr},
34 back_msgr{back_msgr},
35 // do this in background
36 timer{[this] { (void)send_heartbeats(); }}
37 {}
38
39 seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
40 entity_addrvec_t back_addrs)
41 {
42 logger().info("heartbeat: start");
43 // i only care about the address, so any unused port would work
44 for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
45 addr.set_port(0);
46 }
47
48 using crimson::net::SocketPolicy;
49 front_msgr->set_policy(entity_name_t::TYPE_OSD,
50 SocketPolicy::stateless_server(0));
51 back_msgr->set_policy(entity_name_t::TYPE_OSD,
52 SocketPolicy::stateless_server(0));
53 return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs),
54 start_messenger(*back_msgr, back_addrs))
55 .then([this] {
56 timer.arm_periodic(
57 std::chrono::seconds(local_conf()->osd_heartbeat_interval));
58 });
59 }
60
61 seastar::future<>
62 Heartbeat::start_messenger(crimson::net::Messenger& msgr,
63 const entity_addrvec_t& addrs)
64 {
65 return msgr.try_bind(addrs,
66 local_conf()->ms_bind_port_min,
67 local_conf()->ms_bind_port_max).then([&msgr, this] {
68 return msgr.start(this);
69 });
70 }
71
72 seastar::future<> Heartbeat::stop()
73 {
74 return seastar::when_all_succeed(front_msgr->shutdown(),
75 back_msgr->shutdown());
76 }
77
78 const entity_addrvec_t& Heartbeat::get_front_addrs() const
79 {
80 return front_msgr->get_myaddrs();
81 }
82
83 const entity_addrvec_t& Heartbeat::get_back_addrs() const
84 {
85 return back_msgr->get_myaddrs();
86 }
87
88 void Heartbeat::set_require_authorizer(bool require_authorizer)
89 {
90 if (front_msgr->get_require_authorizer() != require_authorizer) {
91 front_msgr->set_require_authorizer(require_authorizer);
92 back_msgr->set_require_authorizer(require_authorizer);
93 }
94 }
95
96 void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
97 {
98 auto [peer_info, added] = peers.try_emplace(peer);
99 auto& info = peer_info->second;
100 info.epoch = epoch;
101 if (added) {
102 logger().info("add_peer({})", peer);
103 auto osdmap = service.get_osdmap_service().get_map();
104 // TODO: use addrs
105 peer_info->second.con_front = front_msgr->connect(
106 osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
107 peer_info->second.con_back = back_msgr->connect(
108 osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
109 }
110 }
111
112 seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
113 {
114 osds_t osds;
115 for (auto& peer : peers) {
116 osds.push_back(peer.first);
117 }
118 return seastar::map_reduce(std::move(osds),
119 [this](auto& osd) {
120 auto osdmap = service.get_osdmap_service().get_map();
121 if (!osdmap->is_up(osd)) {
122 return remove_peer(osd).then([] {
123 return seastar::make_ready_future<osd_id_t>(-1);
124 });
125 } else if (peers[osd].epoch < osdmap->get_epoch()) {
126 return seastar::make_ready_future<osd_id_t>(osd);
127 } else {
128 return seastar::make_ready_future<osd_id_t>(-1);
129 }
130 }, osds_t{},
131 [](osds_t&& extras, osd_id_t extra) {
132 if (extra >= 0) {
133 extras.push_back(extra);
134 }
135 return std::move(extras);
136 });
137 }
138
139 void Heartbeat::add_reporter_peers(int whoami)
140 {
141 auto osdmap = service.get_osdmap_service().get_map();
142 // include next and previous up osds to ensure we have a fully-connected set
143 set<int> want;
144 if (auto next = osdmap->get_next_up_osd_after(whoami); next >= 0) {
145 want.insert(next);
146 }
147 if (auto prev = osdmap->get_previous_up_osd_before(whoami); prev >= 0) {
148 want.insert(prev);
149 }
150 // make sure we have at least **min_down** osds coming from different
151 // subtree level (e.g., hosts) for fast failure detection.
152 auto min_down = local_conf().get_val<uint64_t>("mon_osd_min_down_reporters");
153 auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level");
154 osdmap->get_random_up_osds_by_subtree(
155 whoami, subtree, min_down, want, &want);
156 auto epoch = osdmap->get_epoch();
157 for (int osd : want) {
158 add_peer(osd, epoch);
159 };
160 }
161
162 seastar::future<> Heartbeat::update_peers(int whoami)
163 {
164 const auto min_peers = static_cast<size_t>(
165 local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
166 add_reporter_peers(whoami);
167 return remove_down_peers().then([=](osds_t&& extra) {
168 // too many?
169 struct iteration_state {
170 osds_t::const_iterator where;
171 osds_t::const_iterator end;
172 };
173 return seastar::do_with(iteration_state{extra.begin(),extra.end()},
174 [=](iteration_state& s) {
175 return seastar::do_until(
176 [min_peers, &s, this] {
177 return peers.size() <= min_peers || s.where == s.end; },
178 [&s, this] {
179 return remove_peer(*s.where); }
180 );
181 });
182 }).then([=] {
183 // or too few?
184 auto osdmap = service.get_osdmap_service().get_map();
185 auto epoch = osdmap->get_epoch();
186 for (auto next = osdmap->get_next_up_osd_after(whoami);
187 peers.size() < min_peers && next >= 0 && next != whoami;
188 next = osdmap->get_next_up_osd_after(next)) {
189 add_peer(next, epoch);
190 }
191 });
192 }
193
194 seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
195 {
196 auto found = peers.find(peer);
197 assert(found != peers.end());
198 logger().info("remove_peer({})", peer);
199 return seastar::when_all_succeed(found->second.con_front->close(),
200 found->second.con_back->close()).then(
201 [this, peer] {
202 peers.erase(peer);
203 return seastar::now();
204 });
205 }
206
207 seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
208 MessageRef m)
209 {
210 switch (m->get_type()) {
211 case MSG_OSD_PING:
212 return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
213 default:
214 return seastar::now();
215 }
216 }
217
218 seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
219 {
220 auto found = std::find_if(peers.begin(), peers.end(),
221 [conn](const peers_map_t::value_type& peer) {
222 return (peer.second.con_front == conn ||
223 peer.second.con_back == conn);
224 });
225 if (found == peers.end()) {
226 return seastar::now();
227 }
228 const auto peer = found->first;
229 const auto epoch = found->second.epoch;
230 return remove_peer(peer).then([peer, epoch, this] {
231 add_peer(peer, epoch);
232 });
233 }
234
235 seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
236 Ref<MOSDPing> m)
237 {
238 switch (m->op) {
239 case MOSDPing::PING:
240 return handle_ping(conn, m);
241 case MOSDPing::PING_REPLY:
242 return handle_reply(conn, m);
243 case MOSDPing::YOU_DIED:
244 return handle_you_died();
245 default:
246 return seastar::now();
247 }
248 }
249
250 seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn,
251 Ref<MOSDPing> m)
252 {
253 auto min_message = static_cast<uint32_t>(
254 local_conf()->osd_heartbeat_min_size);
255 auto reply =
256 make_message<MOSDPing>(
257 m->fsid,
258 service.get_osdmap_service().get_map()->get_epoch(),
259 MOSDPing::PING_REPLY,
260 m->ping_stamp,
261 m->mono_ping_stamp,
262 service.get_mnow(),
263 service.get_osdmap_service().get_up_epoch(),
264 min_message);
265 return conn->send(reply);
266 }
267
268 seastar::future<> Heartbeat::handle_reply(crimson::net::Connection* conn,
269 Ref<MOSDPing> m)
270 {
271 const osd_id_t from = m->get_source().num();
272 auto found = peers.find(from);
273 if (found == peers.end()) {
274 // stale reply
275 return seastar::now();
276 }
277 auto& peer = found->second;
278 auto ping = peer.ping_history.find(m->ping_stamp);
279 if (ping == peer.ping_history.end()) {
280 // old replies, deprecated by newly sent pings.
281 return seastar::now();
282 }
283 const auto now = clock::now();
284 auto& unacked = ping->second.unacknowledged;
285 if (conn == peer.con_back.get()) {
286 peer.last_rx_back = now;
287 unacked--;
288 } else if (conn == peer.con_front.get()) {
289 peer.last_rx_front = now;
290 unacked--;
291 }
292 if (unacked == 0) {
293 peer.ping_history.erase(peer.ping_history.begin(), ++ping);
294 }
295 if (peer.is_healthy(now)) {
296 // cancel false reports
297 failure_queue.erase(from);
298 if (auto pending = failure_pending.find(from);
299 pending != failure_pending.end()) {
300 return send_still_alive(from, pending->second.addrs);
301 }
302 }
303 return seastar::now();
304 }
305
306 seastar::future<> Heartbeat::handle_you_died()
307 {
308 // TODO: ask for newer osdmap
309 return seastar::now();
310 }
311
312 seastar::future<> Heartbeat::send_heartbeats()
313 {
314 using peers_item_t = typename peers_map_t::value_type;
315 return seastar::parallel_for_each(peers,
316 [this](peers_item_t& item) {
317 const auto mnow = service.get_mnow();
318 const auto now = clock::now();
319 const auto deadline =
320 now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
321 auto& info = item.second;
322 info.last_tx = now;
323 if (clock::is_zero(info.first_tx)) {
324 info.first_tx = now;
325 }
326 const utime_t sent_stamp{now};
327 [[maybe_unused]] auto [reply, added] =
328 info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
329 std::vector<crimson::net::ConnectionRef> conns{info.con_front,
330 info.con_back};
331 return seastar::parallel_for_each(std::move(conns),
332 [sent_stamp, mnow, &reply=reply->second, this] (auto con) {
333 if (con) {
334 auto min_message = static_cast<uint32_t>(
335 local_conf()->osd_heartbeat_min_size);
336 auto ping = make_message<MOSDPing>(
337 monc.get_fsid(),
338 service.get_osdmap_service().get_map()->get_epoch(),
339 MOSDPing::PING,
340 sent_stamp,
341 mnow,
342 mnow,
343 service.get_osdmap_service().get_up_epoch(),
344 min_message);
345 return con->send(ping).then([&reply] {
346 reply.unacknowledged++;
347 return seastar::now();
348 });
349 } else {
350 return seastar::now();
351 }
352 });
353 });
354 }
355
356 seastar::future<> Heartbeat::send_failures()
357 {
358 using failure_item_t = typename failure_queue_t::value_type;
359 return seastar::parallel_for_each(failure_queue,
360 [this](failure_item_t& failure_item) {
361 auto [osd, failed_since] = failure_item;
362 if (failure_pending.count(osd)) {
363 return seastar::now();
364 }
365 auto failed_for = chrono::duration_cast<chrono::seconds>(
366 clock::now() - failed_since).count();
367 auto osdmap = service.get_osdmap_service().get_map();
368 auto failure_report =
369 make_message<MOSDFailure>(monc.get_fsid(),
370 osd,
371 osdmap->get_addrs(osd),
372 static_cast<int>(failed_for),
373 osdmap->get_epoch());
374 failure_pending.emplace(osd, failure_info_t{failed_since,
375 osdmap->get_addrs(osd)});
376 return monc.send_message(failure_report);
377 }).then([this] {
378 failure_queue.clear();
379 return seastar::now();
380 });
381 }
382
383 seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
384 const entity_addrvec_t& addrs)
385 {
386 auto still_alive = make_message<MOSDFailure>(
387 monc.get_fsid(),
388 osd,
389 addrs,
390 0,
391 service.get_osdmap_service().get_map()->get_epoch(),
392 MOSDFailure::FLAG_ALIVE);
393 return monc.send_message(still_alive).then([=] {
394 failure_pending.erase(osd);
395 return seastar::now();
396 });
397 }
398
399 bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
400 {
401 if (ping_history.empty()) {
402 // we haven't sent a ping yet or we have got all replies,
403 // in either way we are safe and healthy for now
404 return false;
405 } else {
406 auto oldest_ping = ping_history.begin();
407 return now > oldest_ping->second.deadline;
408 }
409 }
410
411 bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
412 {
413 if (con_front && clock::is_zero(last_rx_front)) {
414 return false;
415 }
416 if (con_back && clock::is_zero(last_rx_back)) {
417 return false;
418 }
419 // only declare to be healthy until we have received the first
420 // replies from both front/back connections
421 return !is_unhealthy(now);
422 }