]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2012 Red Hat | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #include "common/dout.h" | |
17 | #include "common/HeartbeatMap.h" | |
181888fb | 18 | |
7c673cae FG |
19 | #include "include/stringify.h" |
20 | #include "include/util.h" | |
21 | ||
22 | #include "messages/MMDSBeacon.h" | |
23 | #include "mon/MonClient.h" | |
24 | #include "mds/MDLog.h" | |
25 | #include "mds/MDSRank.h" | |
26 | #include "mds/MDSMap.h" | |
27 | #include "mds/Locker.h" | |
28 | ||
29 | #include "Beacon.h" | |
30 | ||
31 | #define dout_context g_ceph_context | |
32 | #define dout_subsys ceph_subsys_mds | |
33 | #undef dout_prefix | |
34 | #define dout_prefix *_dout << "mds.beacon." << name << ' ' | |
35 | ||
36 | ||
94b18763 | 37 | Beacon::Beacon(CephContext *cct_, MonClient *monc_, boost::string_view name_) : |
7c673cae FG |
38 | Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock), |
39 | name(name_), standby_for_rank(MDS_RANK_NONE), | |
40 | standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT), | |
41 | awaiting_seq(-1) | |
42 | { | |
43 | last_seq = 0; | |
7c673cae FG |
44 | was_laggy = false; |
45 | ||
46 | epoch = 0; | |
47 | } | |
48 | ||
49 | ||
50 | Beacon::~Beacon() | |
51 | { | |
52 | } | |
53 | ||
54 | ||
55 | void Beacon::init(MDSMap const *mdsmap) | |
56 | { | |
57 | Mutex::Locker l(lock); | |
58 | assert(mdsmap != NULL); | |
59 | ||
60 | _notify_mdsmap(mdsmap); | |
61 | standby_for_rank = mds_rank_t(g_conf->mds_standby_for_rank); | |
62 | standby_for_name = g_conf->mds_standby_for_name; | |
63 | standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid); | |
64 | standby_replay = g_conf->mds_standby_replay; | |
65 | ||
66 | // Spawn threads and start messaging | |
67 | timer.init(); | |
68 | _send(); | |
69 | } | |
70 | ||
71 | ||
72 | void Beacon::shutdown() | |
73 | { | |
74 | Mutex::Locker l(lock); | |
75 | if (sender) { | |
76 | timer.cancel_event(sender); | |
77 | sender = NULL; | |
78 | } | |
79 | timer.shutdown(); | |
80 | } | |
81 | ||
82 | ||
83 | bool Beacon::ms_dispatch(Message *m) | |
84 | { | |
85 | if (m->get_type() == MSG_MDS_BEACON) { | |
86 | if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) { | |
87 | handle_mds_beacon(static_cast<MMDSBeacon*>(m)); | |
88 | } | |
89 | return true; | |
90 | } | |
91 | ||
92 | return false; | |
93 | } | |
94 | ||
95 | ||
96 | /** | |
97 | * Update lagginess state based on response from remote MDSMonitor | |
98 | * | |
99 | * This function puts the passed message before returning | |
100 | */ | |
101 | void Beacon::handle_mds_beacon(MMDSBeacon *m) | |
102 | { | |
103 | Mutex::Locker l(lock); | |
104 | assert(m != NULL); | |
105 | ||
106 | version_t seq = m->get_seq(); | |
107 | ||
108 | // update lab | |
109 | if (seq_stamp.count(seq)) { | |
110 | utime_t now = ceph_clock_now(); | |
111 | if (seq_stamp[seq] > last_acked_stamp) { | |
112 | last_acked_stamp = seq_stamp[seq]; | |
113 | utime_t rtt = now - last_acked_stamp; | |
114 | ||
115 | dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state()) | |
116 | << " seq " << m->get_seq() << " rtt " << rtt << dendl; | |
117 | ||
118 | if (was_laggy && rtt < g_conf->mds_beacon_grace) { | |
119 | dout(0) << "handle_mds_beacon no longer laggy" << dendl; | |
120 | was_laggy = false; | |
121 | laggy_until = now; | |
122 | } | |
123 | } else { | |
124 | // Mark myself laggy if system clock goes backwards. Hopping | |
125 | // later beacons will clear it. | |
126 | dout(1) << "handle_mds_beacon system clock goes backwards, " | |
127 | << "mark myself laggy" << dendl; | |
128 | last_acked_stamp = now - utime_t(g_conf->mds_beacon_grace + 1, 0); | |
129 | was_laggy = true; | |
130 | } | |
131 | ||
132 | // clean up seq_stamp map | |
133 | while (!seq_stamp.empty() && | |
134 | seq_stamp.begin()->first <= seq) | |
135 | seq_stamp.erase(seq_stamp.begin()); | |
136 | ||
137 | // Wake a waiter up if present | |
138 | if (awaiting_seq == seq) { | |
139 | waiting_cond.Signal(); | |
140 | } | |
141 | } else { | |
142 | dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state()) | |
143 | << " seq " << m->get_seq() << " dne" << dendl; | |
144 | } | |
28e407b8 | 145 | m->put(); |
7c673cae FG |
146 | } |
147 | ||
148 | ||
149 | void Beacon::send() | |
150 | { | |
151 | Mutex::Locker l(lock); | |
152 | _send(); | |
153 | } | |
154 | ||
155 | ||
156 | void Beacon::send_and_wait(const double duration) | |
157 | { | |
158 | Mutex::Locker l(lock); | |
159 | _send(); | |
160 | awaiting_seq = last_seq; | |
161 | dout(20) << __func__ << ": awaiting " << awaiting_seq | |
162 | << " for up to " << duration << "s" << dendl; | |
163 | ||
164 | utime_t timeout; | |
165 | timeout.set_from_double(ceph_clock_now() + duration); | |
166 | while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) | |
167 | && ceph_clock_now() < timeout) { | |
168 | waiting_cond.WaitUntil(lock, timeout); | |
169 | } | |
170 | ||
171 | awaiting_seq = -1; | |
172 | } | |
173 | ||
174 | ||
175 | /** | |
176 | * Call periodically, or when you have updated the desired state | |
177 | */ | |
178 | void Beacon::_send() | |
179 | { | |
180 | if (sender) { | |
181 | timer.cancel_event(sender); | |
182 | } | |
3efd9988 FG |
183 | sender = timer.add_event_after( |
184 | g_conf->mds_beacon_interval, | |
185 | new FunctionContext([this](int) { | |
186 | assert(lock.is_locked_by_me()); | |
187 | sender = nullptr; | |
188 | _send(); | |
189 | })); | |
7c673cae FG |
190 | |
191 | if (!cct->get_heartbeat_map()->is_healthy()) { | |
192 | /* If anything isn't progressing, let avoid sending a beacon so that | |
193 | * the MDS will consider us laggy */ | |
194 | dout(1) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl; | |
195 | return; | |
196 | } | |
197 | ||
198 | ++last_seq; | |
199 | dout(10) << __func__ << " " << ceph_mds_state_name(want_state) | |
200 | << " seq " << last_seq | |
201 | << dendl; | |
202 | ||
203 | seq_stamp[last_seq] = ceph_clock_now(); | |
204 | ||
205 | assert(want_state != MDSMap::STATE_NULL); | |
206 | ||
207 | MMDSBeacon *beacon = new MMDSBeacon( | |
208 | monc->get_fsid(), mds_gid_t(monc->get_global_id()), | |
209 | name, | |
210 | epoch, | |
211 | want_state, | |
212 | last_seq, | |
213 | CEPH_FEATURES_SUPPORTED_DEFAULT); | |
214 | ||
215 | beacon->set_standby_for_rank(standby_for_rank); | |
216 | beacon->set_standby_for_name(standby_for_name); | |
217 | beacon->set_standby_for_fscid(standby_for_fscid); | |
218 | beacon->set_standby_replay(standby_replay); | |
219 | beacon->set_health(health); | |
220 | beacon->set_compat(compat); | |
221 | // piggyback the sys info on beacon msg | |
222 | if (want_state == MDSMap::STATE_BOOT) { | |
223 | map<string, string> sys_info; | |
224 | collect_sys_info(&sys_info, cct); | |
225 | sys_info["addr"] = stringify(monc->get_myaddr()); | |
226 | beacon->set_sys_info(sys_info); | |
227 | } | |
228 | monc->send_mon_message(beacon); | |
229 | } | |
230 | ||
231 | /** | |
232 | * Call this when there is a new MDSMap available | |
233 | */ | |
234 | void Beacon::notify_mdsmap(MDSMap const *mdsmap) | |
235 | { | |
236 | Mutex::Locker l(lock); | |
237 | assert(mdsmap != NULL); | |
238 | ||
239 | _notify_mdsmap(mdsmap); | |
240 | } | |
241 | ||
242 | void Beacon::_notify_mdsmap(MDSMap const *mdsmap) | |
243 | { | |
244 | assert(mdsmap != NULL); | |
245 | assert(mdsmap->get_epoch() >= epoch); | |
246 | ||
247 | if (mdsmap->get_epoch() != epoch) { | |
248 | epoch = mdsmap->get_epoch(); | |
249 | compat = get_mdsmap_compat_set_default(); | |
250 | compat.merge(mdsmap->compat); | |
251 | } | |
252 | } | |
253 | ||
254 | ||
255 | bool Beacon::is_laggy() | |
256 | { | |
257 | Mutex::Locker l(lock); | |
258 | ||
259 | if (last_acked_stamp == utime_t()) | |
260 | return false; | |
261 | ||
262 | utime_t now = ceph_clock_now(); | |
263 | utime_t since = now - last_acked_stamp; | |
264 | if (since > g_conf->mds_beacon_grace) { | |
265 | dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace | |
266 | << " since last acked beacon" << dendl; | |
267 | was_laggy = true; | |
268 | if (since > (g_conf->mds_beacon_grace*2) && | |
269 | now > last_mon_reconnect + g_conf->mds_beacon_interval) { | |
270 | // maybe it's not us? | |
271 | dout(5) << "initiating monitor reconnect; maybe we're not the slow one" | |
272 | << dendl; | |
273 | last_mon_reconnect = now; | |
274 | monc->reopen_session(); | |
275 | } | |
276 | return true; | |
277 | } | |
278 | return false; | |
279 | } | |
280 | ||
281 | utime_t Beacon::get_laggy_until() const | |
282 | { | |
283 | Mutex::Locker l(lock); | |
284 | ||
285 | return laggy_until; | |
286 | } | |
287 | ||
288 | void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate) | |
289 | { | |
290 | Mutex::Locker l(lock); | |
291 | ||
292 | // Update mdsmap epoch atomically with updating want_state, so that when | |
293 | // we send a beacon with the new want state it has the latest epoch, and | |
294 | // once we have updated to the latest epoch, we are not sending out | |
295 | // a stale want_state (i.e. one from before making it through MDSMap | |
296 | // handling) | |
297 | _notify_mdsmap(mdsmap); | |
298 | ||
299 | if (want_state != newstate) { | |
300 | dout(10) << __func__ << ": " | |
301 | << ceph_mds_state_name(want_state) << " -> " | |
302 | << ceph_mds_state_name(newstate) << dendl; | |
303 | want_state = newstate; | |
304 | } | |
305 | } | |
306 | ||
307 | ||
308 | /** | |
309 | * We are 'shown' an MDS briefly in order to update | |
310 | * some health metrics that we will send in the next | |
311 | * beacon. | |
312 | */ | |
313 | void Beacon::notify_health(MDSRank const *mds) | |
314 | { | |
315 | Mutex::Locker l(lock); | |
316 | if (!mds) { | |
317 | // No MDS rank held | |
318 | return; | |
319 | } | |
320 | ||
321 | // I'm going to touch this MDS, so it must be locked | |
322 | assert(mds->mds_lock.is_locked_by_me()); | |
323 | ||
324 | health.metrics.clear(); | |
325 | ||
326 | // Detect presence of entries in DamageTable | |
327 | if (!mds->damage_table.empty()) { | |
328 | MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string( | |
329 | "Metadata damage detected")); | |
330 | health.metrics.push_back(m); | |
331 | } | |
332 | ||
333 | // Detect MDS_HEALTH_TRIM condition | |
334 | // Arbitrary factor of 2, indicates MDS is not trimming promptly | |
335 | { | |
336 | if (mds->mdlog->get_num_segments() > (size_t)(g_conf->mds_log_max_segments * 2)) { | |
337 | std::ostringstream oss; | |
338 | oss << "Behind on trimming (" << mds->mdlog->get_num_segments() | |
339 | << "/" << g_conf->mds_log_max_segments << ")"; | |
340 | ||
341 | MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, oss.str()); | |
342 | m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments()); | |
343 | m.metadata["max_segments"] = stringify(g_conf->mds_log_max_segments); | |
344 | health.metrics.push_back(m); | |
345 | } | |
346 | } | |
347 | ||
348 | // Detect clients failing to respond to modifications to capabilities in | |
349 | // CLIENT_CAPS messages. | |
350 | { | |
351 | std::list<client_t> late_clients; | |
352 | mds->locker->get_late_revoking_clients(&late_clients); | |
353 | std::list<MDSHealthMetric> late_cap_metrics; | |
354 | ||
355 | for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) { | |
356 | ||
357 | // client_t is equivalent to session.info.inst.name.num | |
358 | // Construct an entity_name_t to lookup into SessionMap | |
359 | entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, i->v); | |
360 | Session const *s = mds->sessionmap.get_session(ename); | |
361 | if (s == NULL) { | |
362 | // Shouldn't happen, but not worth crashing if it does as this is | |
363 | // just health-reporting code. | |
364 | derr << "Client ID without session: " << i->v << dendl; | |
365 | continue; | |
366 | } | |
367 | ||
368 | std::ostringstream oss; | |
369 | oss << "Client " << s->get_human_name() << " failing to respond to capability release"; | |
370 | MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, oss.str()); | |
371 | m.metadata["client_id"] = stringify(i->v); | |
372 | late_cap_metrics.push_back(m); | |
373 | } | |
374 | ||
375 | if (late_cap_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) { | |
376 | health.metrics.splice(health.metrics.end(), late_cap_metrics); | |
377 | } else { | |
378 | std::ostringstream oss; | |
379 | oss << "Many clients (" << late_cap_metrics.size() | |
380 | << ") failing to respond to capability release"; | |
381 | MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, oss.str()); | |
382 | m.metadata["client_count"] = stringify(late_cap_metrics.size()); | |
383 | health.metrics.push_back(m); | |
384 | late_cap_metrics.clear(); | |
385 | } | |
386 | } | |
387 | ||
388 | // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE | |
389 | // messages. May be due to buggy client or resource-hogging application. | |
390 | // | |
391 | // Detect clients failing to advance their old_client_tid | |
392 | { | |
393 | set<Session*> sessions; | |
394 | mds->sessionmap.get_client_session_set(sessions); | |
395 | ||
396 | utime_t cutoff = ceph_clock_now(); | |
397 | cutoff -= g_conf->mds_recall_state_timeout; | |
398 | utime_t last_recall = mds->mdcache->last_recall_state; | |
399 | ||
400 | std::list<MDSHealthMetric> late_recall_metrics; | |
401 | std::list<MDSHealthMetric> large_completed_requests_metrics; | |
402 | for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) { | |
403 | Session *session = *i; | |
404 | if (!session->recalled_at.is_zero()) { | |
405 | dout(20) << "Session servicing RECALL " << session->info.inst | |
406 | << ": " << session->recalled_at << " " << session->recall_release_count | |
407 | << "/" << session->recall_count << dendl; | |
408 | if (last_recall < cutoff || session->last_recall_sent < last_recall) { | |
409 | dout(20) << " no longer recall" << dendl; | |
410 | session->clear_recalled_at(); | |
411 | } else if (session->recalled_at < cutoff) { | |
412 | dout(20) << " exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl; | |
413 | std::ostringstream oss; | |
414 | oss << "Client " << session->get_human_name() << " failing to respond to cache pressure"; | |
415 | MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str()); | |
416 | m.metadata["client_id"] = stringify(session->info.inst.name.num()); | |
417 | late_recall_metrics.push_back(m); | |
418 | } else { | |
419 | dout(20) << " within timeout " << session->recalled_at << " vs. " << cutoff << dendl; | |
420 | } | |
421 | } | |
422 | if ((session->get_num_trim_requests_warnings() > 0 && | |
423 | session->get_num_completed_requests() >= g_conf->mds_max_completed_requests) || | |
424 | (session->get_num_trim_flushes_warnings() > 0 && | |
425 | session->get_num_completed_flushes() >= g_conf->mds_max_completed_flushes)) { | |
426 | std::ostringstream oss; | |
427 | oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid"; | |
428 | MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str()); | |
429 | m.metadata["client_id"] = stringify(session->info.inst.name.num()); | |
430 | large_completed_requests_metrics.push_back(m); | |
431 | } | |
432 | } | |
433 | ||
434 | if (late_recall_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) { | |
435 | health.metrics.splice(health.metrics.end(), late_recall_metrics); | |
436 | } else { | |
437 | std::ostringstream oss; | |
438 | oss << "Many clients (" << late_recall_metrics.size() | |
439 | << ") failing to respond to cache pressure"; | |
440 | MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, oss.str()); | |
441 | m.metadata["client_count"] = stringify(late_recall_metrics.size()); | |
442 | health.metrics.push_back(m); | |
443 | late_recall_metrics.clear(); | |
444 | } | |
445 | ||
446 | if (large_completed_requests_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) { | |
447 | health.metrics.splice(health.metrics.end(), large_completed_requests_metrics); | |
448 | } else { | |
449 | std::ostringstream oss; | |
450 | oss << "Many clients (" << large_completed_requests_metrics.size() | |
451 | << ") failing to advance their oldest client/flush tid"; | |
452 | MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, oss.str()); | |
453 | m.metadata["client_count"] = stringify(large_completed_requests_metrics.size()); | |
454 | health.metrics.push_back(m); | |
455 | large_completed_requests_metrics.clear(); | |
456 | } | |
457 | } | |
458 | ||
459 | // Detect MDS_HEALTH_SLOW_REQUEST condition | |
460 | { | |
461 | int slow = mds->get_mds_slow_req_count(); | |
462 | dout(20) << slow << " slow request found" << dendl; | |
463 | if (slow) { | |
464 | std::ostringstream oss; | |
465 | oss << slow << " slow requests are blocked > " << g_conf->mds_op_complaint_time << " sec"; | |
466 | ||
467 | MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, oss.str()); | |
468 | health.metrics.push_back(m); | |
469 | } | |
470 | } | |
471 | ||
472 | // Report a health warning if we are readonly | |
473 | if (mds->mdcache->is_readonly()) { | |
474 | MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN, | |
475 | "MDS in read-only mode"); | |
476 | health.metrics.push_back(m); | |
477 | } | |
478 | ||
479 | // Report if we have significantly exceeded our cache size limit | |
181888fb | 480 | if (mds->mdcache->cache_overfull()) { |
7c673cae | 481 | std::ostringstream oss; |
181888fb FG |
482 | oss << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size()) |
483 | << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); " | |
7c673cae FG |
484 | << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, " |
485 | << mds->mdcache->get_num_strays() << " stray files"; | |
486 | ||
487 | MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, oss.str()); | |
488 | health.metrics.push_back(m); | |
489 | } | |
490 | } | |
491 | ||
492 | MDSMap::DaemonState Beacon::get_want_state() const | |
493 | { | |
494 | Mutex::Locker l(lock); | |
495 | return want_state; | |
496 | } | |
497 |