1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
19 #include "include/stringify.h"
20 #include "include/util.h"
22 #include "messages/MMDSBeacon.h"
23 #include "mon/MonClient.h"
24 #include "mds/MDLog.h"
25 #include "mds/MDSRank.h"
26 #include "mds/MDSMap.h"
27 #include "mds/Locker.h"
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mds
34 #define dout_prefix *_dout << "mds.beacon." << name << ' '
37 Beacon::Beacon(CephContext
*cct_
, MonClient
*monc_
, boost::string_view name_
) :
38 Dispatcher(cct_
), lock("Beacon"), monc(monc_
), timer(g_ceph_context
, lock
),
39 name(name_
), standby_for_rank(MDS_RANK_NONE
),
40 standby_for_fscid(FS_CLUSTER_ID_NONE
), want_state(MDSMap::STATE_BOOT
),
55 void Beacon::init(MDSMap
const *mdsmap
)
57 Mutex::Locker
l(lock
);
58 assert(mdsmap
!= NULL
);
60 _notify_mdsmap(mdsmap
);
61 standby_for_rank
= mds_rank_t(g_conf
->mds_standby_for_rank
);
62 standby_for_name
= g_conf
->mds_standby_for_name
;
63 standby_for_fscid
= fs_cluster_id_t(g_conf
->mds_standby_for_fscid
);
64 standby_replay
= g_conf
->mds_standby_replay
;
66 // Spawn threads and start messaging
72 void Beacon::shutdown()
74 Mutex::Locker
l(lock
);
76 timer
.cancel_event(sender
);
83 bool Beacon::ms_dispatch(Message
*m
)
85 if (m
->get_type() == MSG_MDS_BEACON
) {
86 if (m
->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
87 handle_mds_beacon(static_cast<MMDSBeacon
*>(m
));
97 * Update lagginess state based on response from remote MDSMonitor
99 * This function puts the passed message before returning
101 void Beacon::handle_mds_beacon(MMDSBeacon
*m
)
103 Mutex::Locker
l(lock
);
106 version_t seq
= m
->get_seq();
109 if (seq_stamp
.count(seq
)) {
110 utime_t now
= ceph_clock_now();
111 if (seq_stamp
[seq
] > last_acked_stamp
) {
112 last_acked_stamp
= seq_stamp
[seq
];
113 utime_t rtt
= now
- last_acked_stamp
;
115 dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m
->get_state())
116 << " seq " << m
->get_seq() << " rtt " << rtt
<< dendl
;
118 if (was_laggy
&& rtt
< g_conf
->mds_beacon_grace
) {
119 dout(0) << "handle_mds_beacon no longer laggy" << dendl
;
124 // Mark myself laggy if system clock goes backwards. Hopping
125 // later beacons will clear it.
126 dout(1) << "handle_mds_beacon system clock goes backwards, "
127 << "mark myself laggy" << dendl
;
128 last_acked_stamp
= now
- utime_t(g_conf
->mds_beacon_grace
+ 1, 0);
132 // clean up seq_stamp map
133 while (!seq_stamp
.empty() &&
134 seq_stamp
.begin()->first
<= seq
)
135 seq_stamp
.erase(seq_stamp
.begin());
137 // Wake a waiter up if present
138 if (awaiting_seq
== seq
) {
139 waiting_cond
.Signal();
142 dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m
->get_state())
143 << " seq " << m
->get_seq() << " dne" << dendl
;
151 Mutex::Locker
l(lock
);
156 void Beacon::send_and_wait(const double duration
)
158 Mutex::Locker
l(lock
);
160 awaiting_seq
= last_seq
;
161 dout(20) << __func__
<< ": awaiting " << awaiting_seq
162 << " for up to " << duration
<< "s" << dendl
;
165 timeout
.set_from_double(ceph_clock_now() + duration
);
166 while ((!seq_stamp
.empty() && seq_stamp
.begin()->first
<= awaiting_seq
)
167 && ceph_clock_now() < timeout
) {
168 waiting_cond
.WaitUntil(lock
, timeout
);
176 * Call periodically, or when you have updated the desired state
181 timer
.cancel_event(sender
);
183 sender
= timer
.add_event_after(
184 g_conf
->mds_beacon_interval
,
185 new FunctionContext([this](int) {
186 assert(lock
.is_locked_by_me());
191 if (!cct
->get_heartbeat_map()->is_healthy()) {
192 /* If anything isn't progressing, let avoid sending a beacon so that
193 * the MDS will consider us laggy */
194 dout(1) << __func__
<< " skipping beacon, heartbeat map not healthy" << dendl
;
199 dout(10) << __func__
<< " " << ceph_mds_state_name(want_state
)
200 << " seq " << last_seq
203 seq_stamp
[last_seq
] = ceph_clock_now();
205 assert(want_state
!= MDSMap::STATE_NULL
);
207 MMDSBeacon
*beacon
= new MMDSBeacon(
208 monc
->get_fsid(), mds_gid_t(monc
->get_global_id()),
213 CEPH_FEATURES_SUPPORTED_DEFAULT
);
215 beacon
->set_standby_for_rank(standby_for_rank
);
216 beacon
->set_standby_for_name(standby_for_name
);
217 beacon
->set_standby_for_fscid(standby_for_fscid
);
218 beacon
->set_standby_replay(standby_replay
);
219 beacon
->set_health(health
);
220 beacon
->set_compat(compat
);
221 // piggyback the sys info on beacon msg
222 if (want_state
== MDSMap::STATE_BOOT
) {
223 map
<string
, string
> sys_info
;
224 collect_sys_info(&sys_info
, cct
);
225 sys_info
["addr"] = stringify(monc
->get_myaddr());
226 beacon
->set_sys_info(sys_info
);
228 monc
->send_mon_message(beacon
);
232 * Call this when there is a new MDSMap available
234 void Beacon::notify_mdsmap(MDSMap
const *mdsmap
)
236 Mutex::Locker
l(lock
);
237 assert(mdsmap
!= NULL
);
239 _notify_mdsmap(mdsmap
);
242 void Beacon::_notify_mdsmap(MDSMap
const *mdsmap
)
244 assert(mdsmap
!= NULL
);
245 assert(mdsmap
->get_epoch() >= epoch
);
247 if (mdsmap
->get_epoch() != epoch
) {
248 epoch
= mdsmap
->get_epoch();
249 compat
= MDSMap::get_compat_set_default();
250 compat
.merge(mdsmap
->compat
);
255 bool Beacon::is_laggy()
257 Mutex::Locker
l(lock
);
259 if (last_acked_stamp
== utime_t())
262 utime_t now
= ceph_clock_now();
263 utime_t since
= now
- last_acked_stamp
;
264 if (since
> g_conf
->mds_beacon_grace
) {
265 dout(5) << "is_laggy " << since
<< " > " << g_conf
->mds_beacon_grace
266 << " since last acked beacon" << dendl
;
268 if (since
> (g_conf
->mds_beacon_grace
*2) &&
269 now
> last_mon_reconnect
+ g_conf
->mds_beacon_interval
) {
270 // maybe it's not us?
271 dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
273 last_mon_reconnect
= now
;
274 monc
->reopen_session();
281 utime_t
Beacon::get_laggy_until() const
283 Mutex::Locker
l(lock
);
288 void Beacon::set_want_state(MDSMap
const *mdsmap
, MDSMap::DaemonState
const newstate
)
290 Mutex::Locker
l(lock
);
292 // Update mdsmap epoch atomically with updating want_state, so that when
293 // we send a beacon with the new want state it has the latest epoch, and
294 // once we have updated to the latest epoch, we are not sending out
295 // a stale want_state (i.e. one from before making it through MDSMap
297 _notify_mdsmap(mdsmap
);
299 if (want_state
!= newstate
) {
300 dout(10) << __func__
<< ": "
301 << ceph_mds_state_name(want_state
) << " -> "
302 << ceph_mds_state_name(newstate
) << dendl
;
303 want_state
= newstate
;
309 * We are 'shown' an MDS briefly in order to update
310 * some health metrics that we will send in the next
313 void Beacon::notify_health(MDSRank
const *mds
)
315 Mutex::Locker
l(lock
);
321 // I'm going to touch this MDS, so it must be locked
322 assert(mds
->mds_lock
.is_locked_by_me());
324 health
.metrics
.clear();
326 // Detect presence of entries in DamageTable
327 if (!mds
->damage_table
.empty()) {
328 MDSHealthMetric
m(MDS_HEALTH_DAMAGE
, HEALTH_ERR
, std::string(
329 "Metadata damage detected"));
330 health
.metrics
.push_back(m
);
333 // Detect MDS_HEALTH_TRIM condition
334 // Arbitrary factor of 2, indicates MDS is not trimming promptly
336 if (mds
->mdlog
->get_num_segments() > (size_t)(g_conf
->mds_log_max_segments
* 2)) {
337 std::ostringstream oss
;
338 oss
<< "Behind on trimming (" << mds
->mdlog
->get_num_segments()
339 << "/" << g_conf
->mds_log_max_segments
<< ")";
341 MDSHealthMetric
m(MDS_HEALTH_TRIM
, HEALTH_WARN
, oss
.str());
342 m
.metadata
["num_segments"] = stringify(mds
->mdlog
->get_num_segments());
343 m
.metadata
["max_segments"] = stringify(g_conf
->mds_log_max_segments
);
344 health
.metrics
.push_back(m
);
348 // Detect clients failing to respond to modifications to capabilities in
349 // CLIENT_CAPS messages.
351 std::list
<client_t
> late_clients
;
352 mds
->locker
->get_late_revoking_clients(&late_clients
);
353 std::list
<MDSHealthMetric
> late_cap_metrics
;
355 for (std::list
<client_t
>::iterator i
= late_clients
.begin(); i
!= late_clients
.end(); ++i
) {
357 // client_t is equivalent to session.info.inst.name.num
358 // Construct an entity_name_t to lookup into SessionMap
359 entity_name_t
ename(CEPH_ENTITY_TYPE_CLIENT
, i
->v
);
360 Session
const *s
= mds
->sessionmap
.get_session(ename
);
362 // Shouldn't happen, but not worth crashing if it does as this is
363 // just health-reporting code.
364 derr
<< "Client ID without session: " << i
->v
<< dendl
;
368 std::ostringstream oss
;
369 oss
<< "Client " << s
->get_human_name() << " failing to respond to capability release";
370 MDSHealthMetric
m(MDS_HEALTH_CLIENT_LATE_RELEASE
, HEALTH_WARN
, oss
.str());
371 m
.metadata
["client_id"] = stringify(i
->v
);
372 late_cap_metrics
.push_back(m
);
375 if (late_cap_metrics
.size() <= (size_t)g_conf
->mds_health_summarize_threshold
) {
376 health
.metrics
.splice(health
.metrics
.end(), late_cap_metrics
);
378 std::ostringstream oss
;
379 oss
<< "Many clients (" << late_cap_metrics
.size()
380 << ") failing to respond to capability release";
381 MDSHealthMetric
m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY
, HEALTH_WARN
, oss
.str());
382 m
.metadata
["client_count"] = stringify(late_cap_metrics
.size());
383 health
.metrics
.push_back(m
);
384 late_cap_metrics
.clear();
388 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
389 // messages. May be due to buggy client or resource-hogging application.
391 // Detect clients failing to advance their old_client_tid
393 set
<Session
*> sessions
;
394 mds
->sessionmap
.get_client_session_set(sessions
);
396 utime_t cutoff
= ceph_clock_now();
397 cutoff
-= g_conf
->mds_recall_state_timeout
;
398 utime_t last_recall
= mds
->mdcache
->last_recall_state
;
400 std::list
<MDSHealthMetric
> late_recall_metrics
;
401 std::list
<MDSHealthMetric
> large_completed_requests_metrics
;
402 for (set
<Session
*>::iterator i
= sessions
.begin(); i
!= sessions
.end(); ++i
) {
403 Session
*session
= *i
;
404 if (!session
->recalled_at
.is_zero()) {
405 dout(20) << "Session servicing RECALL " << session
->info
.inst
406 << ": " << session
->recalled_at
<< " " << session
->recall_release_count
407 << "/" << session
->recall_count
<< dendl
;
408 if (last_recall
< cutoff
|| session
->last_recall_sent
< last_recall
) {
409 dout(20) << " no longer recall" << dendl
;
410 session
->clear_recalled_at();
411 } else if (session
->recalled_at
< cutoff
) {
412 dout(20) << " exceeded timeout " << session
->recalled_at
<< " vs. " << cutoff
<< dendl
;
413 std::ostringstream oss
;
414 oss
<< "Client " << session
->get_human_name() << " failing to respond to cache pressure";
415 MDSHealthMetric
m(MDS_HEALTH_CLIENT_RECALL
, HEALTH_WARN
, oss
.str());
416 m
.metadata
["client_id"] = stringify(session
->info
.inst
.name
.num());
417 late_recall_metrics
.push_back(m
);
419 dout(20) << " within timeout " << session
->recalled_at
<< " vs. " << cutoff
<< dendl
;
422 if ((session
->get_num_trim_requests_warnings() > 0 &&
423 session
->get_num_completed_requests() >= g_conf
->mds_max_completed_requests
) ||
424 (session
->get_num_trim_flushes_warnings() > 0 &&
425 session
->get_num_completed_flushes() >= g_conf
->mds_max_completed_flushes
)) {
426 std::ostringstream oss
;
427 oss
<< "Client " << session
->get_human_name() << " failing to advance its oldest client/flush tid";
428 MDSHealthMetric
m(MDS_HEALTH_CLIENT_OLDEST_TID
, HEALTH_WARN
, oss
.str());
429 m
.metadata
["client_id"] = stringify(session
->info
.inst
.name
.num());
430 large_completed_requests_metrics
.push_back(m
);
434 if (late_recall_metrics
.size() <= (size_t)g_conf
->mds_health_summarize_threshold
) {
435 health
.metrics
.splice(health
.metrics
.end(), late_recall_metrics
);
437 std::ostringstream oss
;
438 oss
<< "Many clients (" << late_recall_metrics
.size()
439 << ") failing to respond to cache pressure";
440 MDSHealthMetric
m(MDS_HEALTH_CLIENT_RECALL_MANY
, HEALTH_WARN
, oss
.str());
441 m
.metadata
["client_count"] = stringify(late_recall_metrics
.size());
442 health
.metrics
.push_back(m
);
443 late_recall_metrics
.clear();
446 if (large_completed_requests_metrics
.size() <= (size_t)g_conf
->mds_health_summarize_threshold
) {
447 health
.metrics
.splice(health
.metrics
.end(), large_completed_requests_metrics
);
449 std::ostringstream oss
;
450 oss
<< "Many clients (" << large_completed_requests_metrics
.size()
451 << ") failing to advance their oldest client/flush tid";
452 MDSHealthMetric
m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY
, HEALTH_WARN
, oss
.str());
453 m
.metadata
["client_count"] = stringify(large_completed_requests_metrics
.size());
454 health
.metrics
.push_back(m
);
455 large_completed_requests_metrics
.clear();
459 // Detect MDS_HEALTH_SLOW_REQUEST condition
461 int slow
= mds
->get_mds_slow_req_count();
462 dout(20) << slow
<< " slow request found" << dendl
;
464 std::ostringstream oss
;
465 oss
<< slow
<< " slow requests are blocked > " << g_conf
->mds_op_complaint_time
<< " sec";
467 MDSHealthMetric
m(MDS_HEALTH_SLOW_REQUEST
, HEALTH_WARN
, oss
.str());
468 health
.metrics
.push_back(m
);
472 // Report a health warning if we are readonly
473 if (mds
->mdcache
->is_readonly()) {
474 MDSHealthMetric
m(MDS_HEALTH_READ_ONLY
, HEALTH_WARN
,
475 "MDS in read-only mode");
476 health
.metrics
.push_back(m
);
479 // Report if we have significantly exceeded our cache size limit
480 if (mds
->mdcache
->cache_overfull()) {
481 std::ostringstream oss
;
482 oss
<< "MDS cache is too large (" << bytes2str(mds
->mdcache
->cache_size())
483 << "/" << bytes2str(mds
->mdcache
->cache_limit_memory()) << "); "
484 << mds
->mdcache
->num_inodes_with_caps
<< " inodes in use by clients, "
485 << mds
->mdcache
->get_num_strays() << " stray files";
487 MDSHealthMetric
m(MDS_HEALTH_CACHE_OVERSIZED
, HEALTH_WARN
, oss
.str());
488 health
.metrics
.push_back(m
);
492 MDSMap::DaemonState
Beacon::get_want_state() const
494 Mutex::Locker
l(lock
);