1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
19 #include "include/stringify.h"
20 #include "include/util.h"
22 #include "messages/MMDSBeacon.h"
23 #include "mon/MonClient.h"
24 #include "mds/MDLog.h"
25 #include "mds/MDSRank.h"
26 #include "mds/MDSMap.h"
27 #include "mds/Locker.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mds
36 #define dout_prefix *_dout << "mds.beacon." << name << ' '
38 Beacon::Beacon(CephContext
*cct
, MonClient
*monc
, boost::string_view name
)
41 beacon_interval(g_conf
->mds_beacon_interval
),
52 void Beacon::shutdown()
54 std::unique_lock
<std::mutex
> lock(mutex
);
62 void Beacon::init(const MDSMap
* mdsmap
)
64 std::unique_lock
<std::mutex
> lock(mutex
);
65 assert(mdsmap
!= NULL
);
67 _notify_mdsmap(mdsmap
);
68 standby_for_rank
= mds_rank_t(g_conf
->mds_standby_for_rank
);
69 standby_for_name
= g_conf
->mds_standby_for_name
;
70 standby_for_fscid
= fs_cluster_id_t(g_conf
->mds_standby_for_fscid
);
71 standby_replay
= g_conf
->mds_standby_replay
;
73 sender
= std::thread([this]() {
74 std::unique_lock
<std::mutex
> lock(mutex
);
75 std::condition_variable c
; // no one wakes us
77 auto now
= clock::now();
78 auto since
= std::chrono::duration
<double>(now
-last_send
).count();
79 auto interval
= beacon_interval
;
80 if (since
>= interval
*.90) {
85 dout(20) << "sender thread waiting interval " << interval
<< "s" << dendl
;
86 c
.wait_for(lock
, interval
*std::chrono::seconds(1));
91 bool Beacon::ms_can_fast_dispatch(const Message
*m
) const
93 return m
->get_type() == MSG_MDS_BEACON
;
96 void Beacon::ms_fast_dispatch(Message
*m
)
98 bool handled
= ms_dispatch(m
);
102 bool Beacon::ms_dispatch(Message
*m
)
104 if (m
->get_type() == MSG_MDS_BEACON
) {
105 if (m
->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
106 handle_mds_beacon(static_cast<MMDSBeacon
*>(m
));
118 * Update lagginess state based on response from remote MDSMonitor
120 * This function puts the passed message before returning
122 void Beacon::handle_mds_beacon(MMDSBeacon
*m
)
124 std::unique_lock
<std::mutex
> lock(mutex
);
127 version_t seq
= m
->get_seq();
130 auto it
= seq_stamp
.find(seq
);
131 if (it
!= seq_stamp
.end()) {
132 auto now
= clock::now();
134 last_acked_stamp
= it
->second
;
135 auto rtt
= std::chrono::duration
<double>(now
- last_acked_stamp
).count();
137 dout(5) << "received beacon reply " << ceph_mds_state_name(m
->get_state()) << " seq " << m
->get_seq() << " rtt " << rtt
<< dendl
;
139 if (laggy
&& rtt
< g_conf
->mds_beacon_grace
) {
140 dout(0) << " MDS is no longer laggy" << dendl
;
145 // clean up seq_stamp map
146 seq_stamp
.erase(seq_stamp
.begin(), ++it
);
148 // Wake a waiter up if present
151 dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m
->get_state())
152 << " seq " << m
->get_seq() << " dne" << dendl
;
160 std::unique_lock
<std::mutex
> lock(mutex
);
165 void Beacon::send_and_wait(const double duration
)
167 std::unique_lock
<std::mutex
> lock(mutex
);
169 auto awaiting_seq
= last_seq
;
170 dout(20) << __func__
<< ": awaiting " << awaiting_seq
171 << " for up to " << duration
<< "s" << dendl
;
173 auto start
= clock::now();
174 while (!seq_stamp
.empty() && seq_stamp
.begin()->first
<= awaiting_seq
) {
175 auto now
= clock::now();
176 auto s
= duration
*.95-std::chrono::duration
<double>(now
-start
).count();
178 cvar
.wait_for(lock
, s
*std::chrono::seconds(1));
184 * Call periodically, or when you have updated the desired state
188 auto now
= clock::now();
189 auto since
= std::chrono::duration
<double>(now
-last_acked_stamp
).count();
191 if (!cct
->get_heartbeat_map()->is_healthy()) {
192 /* If anything isn't progressing, let avoid sending a beacon so that
193 * the MDS will consider us laggy */
194 dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since
<< "s ago); MDS internal heartbeat is not healthy!" << dendl
;
199 dout(5) << "Sending beacon " << ceph_mds_state_name(want_state
) << " seq " << last_seq
<< dendl
;
201 seq_stamp
[last_seq
] = now
;
203 assert(want_state
!= MDSMap::STATE_NULL
);
205 MMDSBeacon
*beacon
= new MMDSBeacon(
206 monc
->get_fsid(), mds_gid_t(monc
->get_global_id()),
211 CEPH_FEATURES_SUPPORTED_DEFAULT
);
213 beacon
->set_standby_for_rank(standby_for_rank
);
214 beacon
->set_standby_for_name(standby_for_name
);
215 beacon
->set_standby_for_fscid(standby_for_fscid
);
216 beacon
->set_standby_replay(standby_replay
);
217 beacon
->set_health(health
);
218 beacon
->set_compat(compat
);
219 // piggyback the sys info on beacon msg
220 if (want_state
== MDSMap::STATE_BOOT
) {
221 map
<string
, string
> sys_info
;
222 collect_sys_info(&sys_info
, cct
);
223 sys_info
["addr"] = stringify(monc
->get_myaddr());
224 beacon
->set_sys_info(sys_info
);
226 monc
->send_mon_message(beacon
);
231 * Call this when there is a new MDSMap available
233 void Beacon::notify_mdsmap(MDSMap
const *mdsmap
)
235 std::unique_lock
<std::mutex
> lock(mutex
);
236 assert(mdsmap
!= NULL
);
238 _notify_mdsmap(mdsmap
);
241 void Beacon::_notify_mdsmap(MDSMap
const *mdsmap
)
243 assert(mdsmap
!= NULL
);
244 assert(mdsmap
->get_epoch() >= epoch
);
246 if (mdsmap
->get_epoch() != epoch
) {
247 epoch
= mdsmap
->get_epoch();
248 compat
= MDSMap::get_compat_set_default();
249 compat
.merge(mdsmap
->compat
);
254 bool Beacon::is_laggy()
256 std::unique_lock
<std::mutex
> lock(mutex
);
258 auto now
= clock::now();
259 auto since
= std::chrono::duration
<double>(now
-last_acked_stamp
).count();
260 if (since
> g_conf
->mds_beacon_grace
) {
262 dout(1) << "is_laggy " << since
<< " > " << g_conf
->mds_beacon_grace
263 << " since last acked beacon" << dendl
;
266 auto last_reconnect
= std::chrono::duration
<double>(now
-last_mon_reconnect
).count();
267 if (since
> (g_conf
->mds_beacon_grace
*2) && last_reconnect
> g_conf
->mds_beacon_interval
) {
268 // maybe it's not us?
269 dout(1) << "initiating monitor reconnect; maybe we're not the slow one"
271 last_mon_reconnect
= now
;
272 monc
->reopen_session();
279 void Beacon::set_want_state(const MDSMap
* mdsmap
, MDSMap::DaemonState
const newstate
)
281 std::unique_lock
<std::mutex
> lock(mutex
);
283 // Update mdsmap epoch atomically with updating want_state, so that when
284 // we send a beacon with the new want state it has the latest epoch, and
285 // once we have updated to the latest epoch, we are not sending out
286 // a stale want_state (i.e. one from before making it through MDSMap
288 _notify_mdsmap(mdsmap
);
290 if (want_state
!= newstate
) {
291 dout(5) << __func__
<< ": "
292 << ceph_mds_state_name(want_state
) << " -> "
293 << ceph_mds_state_name(newstate
) << dendl
;
294 want_state
= newstate
;
300 * We are 'shown' an MDS briefly in order to update
301 * some health metrics that we will send in the next
304 void Beacon::notify_health(MDSRank
const *mds
)
306 std::unique_lock
<std::mutex
> lock(mutex
);
312 // I'm going to touch this MDS, so it must be locked
313 assert(mds
->mds_lock
.is_locked_by_me());
315 health
.metrics
.clear();
317 // Detect presence of entries in DamageTable
318 if (!mds
->damage_table
.empty()) {
319 MDSHealthMetric
m(MDS_HEALTH_DAMAGE
, HEALTH_ERR
, std::string(
320 "Metadata damage detected"));
321 health
.metrics
.push_back(m
);
324 // Detect MDS_HEALTH_TRIM condition
325 // Arbitrary factor of 2, indicates MDS is not trimming promptly
327 if (mds
->mdlog
->get_num_segments() > (size_t)(g_conf
->mds_log_max_segments
* 2)) {
328 std::ostringstream oss
;
329 oss
<< "Behind on trimming (" << mds
->mdlog
->get_num_segments()
330 << "/" << g_conf
->mds_log_max_segments
<< ")";
332 MDSHealthMetric
m(MDS_HEALTH_TRIM
, HEALTH_WARN
, oss
.str());
333 m
.metadata
["num_segments"] = stringify(mds
->mdlog
->get_num_segments());
334 m
.metadata
["max_segments"] = stringify(g_conf
->mds_log_max_segments
);
335 health
.metrics
.push_back(m
);
339 // Detect clients failing to respond to modifications to capabilities in
340 // CLIENT_CAPS messages.
342 std::list
<client_t
> late_clients
;
343 mds
->locker
->get_late_revoking_clients(&late_clients
,
344 mds
->mdsmap
->get_session_timeout());
345 std::list
<MDSHealthMetric
> late_cap_metrics
;
347 for (std::list
<client_t
>::iterator i
= late_clients
.begin(); i
!= late_clients
.end(); ++i
) {
349 // client_t is equivalent to session.info.inst.name.num
350 // Construct an entity_name_t to lookup into SessionMap
351 entity_name_t
ename(CEPH_ENTITY_TYPE_CLIENT
, i
->v
);
352 Session
const *s
= mds
->sessionmap
.get_session(ename
);
354 // Shouldn't happen, but not worth crashing if it does as this is
355 // just health-reporting code.
356 derr
<< "Client ID without session: " << i
->v
<< dendl
;
360 std::ostringstream oss
;
361 oss
<< "Client " << s
->get_human_name() << " failing to respond to capability release";
362 MDSHealthMetric
m(MDS_HEALTH_CLIENT_LATE_RELEASE
, HEALTH_WARN
, oss
.str());
363 m
.metadata
["client_id"] = stringify(i
->v
);
364 late_cap_metrics
.push_back(m
);
367 if (late_cap_metrics
.size() <= (size_t)g_conf
->mds_health_summarize_threshold
) {
368 health
.metrics
.splice(health
.metrics
.end(), late_cap_metrics
);
370 std::ostringstream oss
;
371 oss
<< "Many clients (" << late_cap_metrics
.size()
372 << ") failing to respond to capability release";
373 MDSHealthMetric
m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY
, HEALTH_WARN
, oss
.str());
374 m
.metadata
["client_count"] = stringify(late_cap_metrics
.size());
375 health
.metrics
.push_back(m
);
376 late_cap_metrics
.clear();
380 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
381 // messages. May be due to buggy client or resource-hogging application.
383 // Detect clients failing to advance their old_client_tid
385 set
<Session
*> sessions
;
386 mds
->sessionmap
.get_client_session_set(sessions
);
388 auto mds_recall_state_timeout
= g_conf
->mds_recall_state_timeout
;
389 auto last_recall
= mds
->mdcache
->last_recall_state
;
390 auto last_recall_span
= std::chrono::duration
<double>(clock::now()-last_recall
).count();
391 bool recall_state_timedout
= last_recall_span
> mds_recall_state_timeout
;
393 std::list
<MDSHealthMetric
> late_recall_metrics
;
394 std::list
<MDSHealthMetric
> large_completed_requests_metrics
;
395 for (auto& session
: sessions
) {
396 if (session
->recalled_at
!= Session::time::min()) {
397 auto last_recall_sent
= session
->last_recall_sent
;
398 auto recalled_at
= session
->recalled_at
;
399 auto recalled_at_span
= std::chrono::duration
<double>(clock::now()-recalled_at
).count();
401 dout(20) << "Session servicing RECALL " << session
->info
.inst
402 << ": " << recalled_at_span
<< "s ago " << session
->recall_release_count
403 << "/" << session
->recall_count
<< dendl
;
404 if (recall_state_timedout
|| last_recall_sent
< last_recall
) {
405 dout(20) << " no longer recall" << dendl
;
406 session
->clear_recalled_at();
407 } else if (recalled_at_span
> mds_recall_state_timeout
) {
408 dout(20) << " exceeded timeout " << recalled_at_span
<< " vs. " << mds_recall_state_timeout
<< dendl
;
409 std::ostringstream oss
;
410 oss
<< "Client " << session
->get_human_name() << " failing to respond to cache pressure";
411 MDSHealthMetric
m(MDS_HEALTH_CLIENT_RECALL
, HEALTH_WARN
, oss
.str());
412 m
.metadata
["client_id"] = stringify(session
->info
.inst
.name
.num());
413 late_recall_metrics
.push_back(m
);
415 dout(20) << " within timeout " << recalled_at_span
<< " vs. " << mds_recall_state_timeout
<< dendl
;
418 if ((session
->get_num_trim_requests_warnings() > 0 &&
419 session
->get_num_completed_requests() >= g_conf
->mds_max_completed_requests
) ||
420 (session
->get_num_trim_flushes_warnings() > 0 &&
421 session
->get_num_completed_flushes() >= g_conf
->mds_max_completed_flushes
)) {
422 std::ostringstream oss
;
423 oss
<< "Client " << session
->get_human_name() << " failing to advance its oldest client/flush tid";
424 MDSHealthMetric
m(MDS_HEALTH_CLIENT_OLDEST_TID
, HEALTH_WARN
, oss
.str());
425 m
.metadata
["client_id"] = stringify(session
->info
.inst
.name
.num());
426 large_completed_requests_metrics
.push_back(m
);
430 if (late_recall_metrics
.size() <= (size_t)g_conf
->mds_health_summarize_threshold
) {
431 health
.metrics
.splice(health
.metrics
.end(), late_recall_metrics
);
433 std::ostringstream oss
;
434 oss
<< "Many clients (" << late_recall_metrics
.size()
435 << ") failing to respond to cache pressure";
436 MDSHealthMetric
m(MDS_HEALTH_CLIENT_RECALL_MANY
, HEALTH_WARN
, oss
.str());
437 m
.metadata
["client_count"] = stringify(late_recall_metrics
.size());
438 health
.metrics
.push_back(m
);
439 late_recall_metrics
.clear();
442 if (large_completed_requests_metrics
.size() <= (size_t)g_conf
->mds_health_summarize_threshold
) {
443 health
.metrics
.splice(health
.metrics
.end(), large_completed_requests_metrics
);
445 std::ostringstream oss
;
446 oss
<< "Many clients (" << large_completed_requests_metrics
.size()
447 << ") failing to advance their oldest client/flush tid";
448 MDSHealthMetric
m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY
, HEALTH_WARN
, oss
.str());
449 m
.metadata
["client_count"] = stringify(large_completed_requests_metrics
.size());
450 health
.metrics
.push_back(m
);
451 large_completed_requests_metrics
.clear();
455 // Detect MDS_HEALTH_SLOW_REQUEST condition
457 int slow
= mds
->get_mds_slow_req_count();
459 dout(20) << slow
<< " slow request found" << dendl
;
460 std::ostringstream oss
;
461 oss
<< slow
<< " slow requests are blocked > " << g_conf
->mds_op_complaint_time
<< " sec";
463 MDSHealthMetric
m(MDS_HEALTH_SLOW_REQUEST
, HEALTH_WARN
, oss
.str());
464 health
.metrics
.push_back(m
);
469 auto complaint_time
= g_conf
->osd_op_complaint_time
;
470 auto now
= clock::now();
471 auto cutoff
= now
- ceph::make_timespan(complaint_time
);
474 ceph::coarse_mono_time oldest
;
475 if (MDSIOContextBase::check_ios_in_flight(cutoff
, count
, oldest
)) {
476 dout(20) << count
<< " slow metadata IOs found" << dendl
;
478 auto oldest_secs
= std::chrono::duration
<double>(now
- oldest
).count();
479 std::ostringstream oss
;
480 oss
<< count
<< " slow metadata IOs are blocked > " << complaint_time
481 << " secs, oldest blocked for " << (int64_t)oldest_secs
<< " secs";
483 MDSHealthMetric
m(MDS_HEALTH_SLOW_METADATA_IO
, HEALTH_WARN
, oss
.str());
484 health
.metrics
.push_back(m
);
488 // Report a health warning if we are readonly
489 if (mds
->mdcache
->is_readonly()) {
490 MDSHealthMetric
m(MDS_HEALTH_READ_ONLY
, HEALTH_WARN
,
491 "MDS in read-only mode");
492 health
.metrics
.push_back(m
);
495 // Report if we have significantly exceeded our cache size limit
496 if (mds
->mdcache
->cache_overfull()) {
497 std::ostringstream oss
;
498 oss
<< "MDS cache is too large (" << bytes2str(mds
->mdcache
->cache_size())
499 << "/" << bytes2str(mds
->mdcache
->cache_limit_memory()) << "); "
500 << mds
->mdcache
->num_inodes_with_caps
<< " inodes in use by clients, "
501 << mds
->mdcache
->get_num_strays() << " stray files";
503 MDSHealthMetric
m(MDS_HEALTH_CACHE_OVERSIZED
, HEALTH_WARN
, oss
.str());
504 health
.metrics
.push_back(m
);
508 MDSMap::DaemonState
Beacon::get_want_state() const
510 std::unique_lock
<std::mutex
> lock(mutex
);