1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/range/join.hpp>
8 #include "messages/MOSDPing.h"
9 #include "messages/MOSDFailure.h"
11 #include "crimson/common/config_proxy.h"
12 #include "crimson/net/Connection.h"
13 #include "crimson/net/Messenger.h"
14 #include "crimson/osd/shard_services.h"
15 #include "crimson/mon/MonClient.h"
17 #include "osd/OSDMap.h"
19 using crimson::common::local_conf
;
22 seastar::logger
& logger() {
23 return crimson::get_logger(ceph_subsys_osd
);
27 Heartbeat::Heartbeat(const crimson::osd::ShardServices
& service
,
28 crimson::mon::Client
& monc
,
29 crimson::net::MessengerRef front_msgr
,
30 crimson::net::MessengerRef back_msgr
)
33 front_msgr
{front_msgr
},
35 // do this in background
36 timer
{[this] { (void)send_heartbeats(); }}
39 seastar::future
<> Heartbeat::start(entity_addrvec_t front_addrs
,
40 entity_addrvec_t back_addrs
)
42 logger().info("heartbeat: start");
43 // i only care about the address, so any unused port would work
44 for (auto& addr
: boost::join(front_addrs
.v
, back_addrs
.v
)) {
48 using crimson::net::SocketPolicy
;
49 front_msgr
->set_policy(entity_name_t::TYPE_OSD
,
50 SocketPolicy::stateless_server(0));
51 back_msgr
->set_policy(entity_name_t::TYPE_OSD
,
52 SocketPolicy::stateless_server(0));
53 return seastar::when_all_succeed(start_messenger(*front_msgr
, front_addrs
),
54 start_messenger(*back_msgr
, back_addrs
))
57 std::chrono::seconds(local_conf()->osd_heartbeat_interval
));
62 Heartbeat::start_messenger(crimson::net::Messenger
& msgr
,
63 const entity_addrvec_t
& addrs
)
65 return msgr
.try_bind(addrs
,
66 local_conf()->ms_bind_port_min
,
67 local_conf()->ms_bind_port_max
).then([&msgr
, this] {
68 return msgr
.start(this);
72 seastar::future
<> Heartbeat::stop()
74 return seastar::when_all_succeed(front_msgr
->shutdown(),
75 back_msgr
->shutdown());
78 const entity_addrvec_t
& Heartbeat::get_front_addrs() const
80 return front_msgr
->get_myaddrs();
83 const entity_addrvec_t
& Heartbeat::get_back_addrs() const
85 return back_msgr
->get_myaddrs();
88 void Heartbeat::set_require_authorizer(bool require_authorizer
)
90 if (front_msgr
->get_require_authorizer() != require_authorizer
) {
91 front_msgr
->set_require_authorizer(require_authorizer
);
92 back_msgr
->set_require_authorizer(require_authorizer
);
96 void Heartbeat::add_peer(osd_id_t peer
, epoch_t epoch
)
98 auto [peer_info
, added
] = peers
.try_emplace(peer
);
99 auto& info
= peer_info
->second
;
102 logger().info("add_peer({})", peer
);
103 auto osdmap
= service
.get_osdmap_service().get_map();
105 peer_info
->second
.con_front
= front_msgr
->connect(
106 osdmap
->get_hb_front_addrs(peer
).front(), CEPH_ENTITY_TYPE_OSD
);
107 peer_info
->second
.con_back
= back_msgr
->connect(
108 osdmap
->get_hb_back_addrs(peer
).front(), CEPH_ENTITY_TYPE_OSD
);
112 seastar::future
<Heartbeat::osds_t
> Heartbeat::remove_down_peers()
115 for (auto& peer
: peers
) {
116 osds
.push_back(peer
.first
);
118 return seastar::map_reduce(std::move(osds
),
120 auto osdmap
= service
.get_osdmap_service().get_map();
121 if (!osdmap
->is_up(osd
)) {
122 return remove_peer(osd
).then([] {
123 return seastar::make_ready_future
<osd_id_t
>(-1);
125 } else if (peers
[osd
].epoch
< osdmap
->get_epoch()) {
126 return seastar::make_ready_future
<osd_id_t
>(osd
);
128 return seastar::make_ready_future
<osd_id_t
>(-1);
131 [](osds_t
&& extras
, osd_id_t extra
) {
133 extras
.push_back(extra
);
135 return std::move(extras
);
139 void Heartbeat::add_reporter_peers(int whoami
)
141 auto osdmap
= service
.get_osdmap_service().get_map();
142 // include next and previous up osds to ensure we have a fully-connected set
144 if (auto next
= osdmap
->get_next_up_osd_after(whoami
); next
>= 0) {
147 if (auto prev
= osdmap
->get_previous_up_osd_before(whoami
); prev
>= 0) {
150 // make sure we have at least **min_down** osds coming from different
151 // subtree level (e.g., hosts) for fast failure detection.
152 auto min_down
= local_conf().get_val
<uint64_t>("mon_osd_min_down_reporters");
153 auto subtree
= local_conf().get_val
<string
>("mon_osd_reporter_subtree_level");
154 osdmap
->get_random_up_osds_by_subtree(
155 whoami
, subtree
, min_down
, want
, &want
);
156 auto epoch
= osdmap
->get_epoch();
157 for (int osd
: want
) {
158 add_peer(osd
, epoch
);
162 seastar::future
<> Heartbeat::update_peers(int whoami
)
164 const auto min_peers
= static_cast<size_t>(
165 local_conf().get_val
<int64_t>("osd_heartbeat_min_peers"));
166 add_reporter_peers(whoami
);
167 return remove_down_peers().then([=](osds_t
&& extra
) {
169 struct iteration_state
{
170 osds_t::const_iterator where
;
171 osds_t::const_iterator end
;
173 return seastar::do_with(iteration_state
{extra
.begin(),extra
.end()},
174 [=](iteration_state
& s
) {
175 return seastar::do_until(
176 [min_peers
, &s
, this] {
177 return peers
.size() <= min_peers
|| s
.where
== s
.end
; },
179 return remove_peer(*s
.where
); }
184 auto osdmap
= service
.get_osdmap_service().get_map();
185 auto epoch
= osdmap
->get_epoch();
186 for (auto next
= osdmap
->get_next_up_osd_after(whoami
);
187 peers
.size() < min_peers
&& next
>= 0 && next
!= whoami
;
188 next
= osdmap
->get_next_up_osd_after(next
)) {
189 add_peer(next
, epoch
);
194 seastar::future
<> Heartbeat::remove_peer(osd_id_t peer
)
196 auto found
= peers
.find(peer
);
197 assert(found
!= peers
.end());
198 logger().info("remove_peer({})", peer
);
199 return seastar::when_all_succeed(found
->second
.con_front
->close(),
200 found
->second
.con_back
->close()).then(
203 return seastar::now();
207 seastar::future
<> Heartbeat::ms_dispatch(crimson::net::Connection
* conn
,
210 switch (m
->get_type()) {
212 return handle_osd_ping(conn
, boost::static_pointer_cast
<MOSDPing
>(m
));
214 return seastar::now();
218 seastar::future
<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn
)
220 auto found
= std::find_if(peers
.begin(), peers
.end(),
221 [conn
](const peers_map_t::value_type
& peer
) {
222 return (peer
.second
.con_front
== conn
||
223 peer
.second
.con_back
== conn
);
225 if (found
== peers
.end()) {
226 return seastar::now();
228 const auto peer
= found
->first
;
229 const auto epoch
= found
->second
.epoch
;
230 return remove_peer(peer
).then([peer
, epoch
, this] {
231 add_peer(peer
, epoch
);
235 seastar::future
<> Heartbeat::handle_osd_ping(crimson::net::Connection
* conn
,
240 return handle_ping(conn
, m
);
241 case MOSDPing::PING_REPLY
:
242 return handle_reply(conn
, m
);
243 case MOSDPing::YOU_DIED
:
244 return handle_you_died();
246 return seastar::now();
250 seastar::future
<> Heartbeat::handle_ping(crimson::net::Connection
* conn
,
253 auto min_message
= static_cast<uint32_t>(
254 local_conf()->osd_heartbeat_min_size
);
256 make_message
<MOSDPing
>(
258 service
.get_osdmap_service().get_map()->get_epoch(),
259 MOSDPing::PING_REPLY
,
263 service
.get_osdmap_service().get_up_epoch(),
265 return conn
->send(reply
);
268 seastar::future
<> Heartbeat::handle_reply(crimson::net::Connection
* conn
,
271 const osd_id_t from
= m
->get_source().num();
272 auto found
= peers
.find(from
);
273 if (found
== peers
.end()) {
275 return seastar::now();
277 auto& peer
= found
->second
;
278 auto ping
= peer
.ping_history
.find(m
->ping_stamp
);
279 if (ping
== peer
.ping_history
.end()) {
280 // old replies, deprecated by newly sent pings.
281 return seastar::now();
283 const auto now
= clock::now();
284 auto& unacked
= ping
->second
.unacknowledged
;
285 if (conn
== peer
.con_back
.get()) {
286 peer
.last_rx_back
= now
;
288 } else if (conn
== peer
.con_front
.get()) {
289 peer
.last_rx_front
= now
;
293 peer
.ping_history
.erase(peer
.ping_history
.begin(), ++ping
);
295 if (peer
.is_healthy(now
)) {
296 // cancel false reports
297 failure_queue
.erase(from
);
298 if (auto pending
= failure_pending
.find(from
);
299 pending
!= failure_pending
.end()) {
300 return send_still_alive(from
, pending
->second
.addrs
);
303 return seastar::now();
306 seastar::future
<> Heartbeat::handle_you_died()
308 // TODO: ask for newer osdmap
309 return seastar::now();
312 seastar::future
<> Heartbeat::send_heartbeats()
314 using peers_item_t
= typename
peers_map_t::value_type
;
315 return seastar::parallel_for_each(peers
,
316 [this](peers_item_t
& item
) {
317 const auto mnow
= service
.get_mnow();
318 const auto now
= clock::now();
319 const auto deadline
=
320 now
+ std::chrono::seconds(local_conf()->osd_heartbeat_grace
);
321 auto& info
= item
.second
;
323 if (clock::is_zero(info
.first_tx
)) {
326 const utime_t sent_stamp
{now
};
327 [[maybe_unused
]] auto [reply
, added
] =
328 info
.ping_history
.emplace(sent_stamp
, reply_t
{deadline
, 0});
329 std::vector
<crimson::net::ConnectionRef
> conns
{info
.con_front
,
331 return seastar::parallel_for_each(std::move(conns
),
332 [sent_stamp
, mnow
, &reply
=reply
->second
, this] (auto con
) {
334 auto min_message
= static_cast<uint32_t>(
335 local_conf()->osd_heartbeat_min_size
);
336 auto ping
= make_message
<MOSDPing
>(
338 service
.get_osdmap_service().get_map()->get_epoch(),
343 service
.get_osdmap_service().get_up_epoch(),
345 return con
->send(ping
).then([&reply
] {
346 reply
.unacknowledged
++;
347 return seastar::now();
350 return seastar::now();
356 seastar::future
<> Heartbeat::send_failures()
358 using failure_item_t
= typename
failure_queue_t::value_type
;
359 return seastar::parallel_for_each(failure_queue
,
360 [this](failure_item_t
& failure_item
) {
361 auto [osd
, failed_since
] = failure_item
;
362 if (failure_pending
.count(osd
)) {
363 return seastar::now();
365 auto failed_for
= chrono::duration_cast
<chrono::seconds
>(
366 clock::now() - failed_since
).count();
367 auto osdmap
= service
.get_osdmap_service().get_map();
368 auto failure_report
=
369 make_message
<MOSDFailure
>(monc
.get_fsid(),
371 osdmap
->get_addrs(osd
),
372 static_cast<int>(failed_for
),
373 osdmap
->get_epoch());
374 failure_pending
.emplace(osd
, failure_info_t
{failed_since
,
375 osdmap
->get_addrs(osd
)});
376 return monc
.send_message(failure_report
);
378 failure_queue
.clear();
379 return seastar::now();
383 seastar::future
<> Heartbeat::send_still_alive(osd_id_t osd
,
384 const entity_addrvec_t
& addrs
)
386 auto still_alive
= make_message
<MOSDFailure
>(
391 service
.get_osdmap_service().get_map()->get_epoch(),
392 MOSDFailure::FLAG_ALIVE
);
393 return monc
.send_message(still_alive
).then([=] {
394 failure_pending
.erase(osd
);
395 return seastar::now();
399 bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now
) const
401 if (ping_history
.empty()) {
402 // we haven't sent a ping yet or we have got all replies,
403 // in either way we are safe and healthy for now
406 auto oldest_ping
= ping_history
.begin();
407 return now
> oldest_ping
->second
.deadline
;
411 bool Heartbeat::PeerInfo::is_healthy(clock::time_point now
) const
413 if (con_front
&& clock::is_zero(last_rx_front
)) {
416 if (con_back
&& clock::is_zero(last_rx_back
)) {
419 // only declare to be healthy until we have received the first
420 // replies from both front/back connections
421 return !is_unhealthy(now
);