]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Beacon.cc
766e4c5d26711e7a55b00eafebea1dc223e91bd0
[ceph.git] / ceph / src / mds / Beacon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
18
19 #include "include/stringify.h"
20 #include "include/util.h"
21
22 #include "mon/MonClient.h"
23 #include "mds/MDLog.h"
24 #include "mds/MDSRank.h"
25 #include "mds/MDSMap.h"
26 #include "mds/Locker.h"
27
28 #include "Beacon.h"
29
30 #include <chrono>
31
32 #define dout_context g_ceph_context
33 #define dout_subsys ceph_subsys_mds
34 #undef dout_prefix
35 #define dout_prefix *_dout << "mds.beacon." << name << ' '
36
37 using std::map;
38 using std::string;
39
40 using namespace std::chrono_literals;
41
42 Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name)
43 :
44 Dispatcher(cct),
45 beacon_interval(g_conf()->mds_beacon_interval),
46 monc(monc),
47 name(name),
48 compat(MDSMap::get_compat_set_all())
49 {
50 }
51
52 Beacon::~Beacon()
53 {
54 shutdown();
55 }
56
57 void Beacon::shutdown()
58 {
59 std::unique_lock<std::mutex> lock(mutex);
60 if (!finished) {
61 finished = true;
62 lock.unlock();
63 if (sender.joinable())
64 sender.join();
65 }
66 }
67
68 void Beacon::init(const MDSMap &mdsmap)
69 {
70 std::unique_lock lock(mutex);
71
72 _notify_mdsmap(mdsmap);
73
74 sender = std::thread([this]() {
75 std::unique_lock<std::mutex> lock(mutex);
76 std::condition_variable c; // no one wakes us
77 while (!finished) {
78 auto now = clock::now();
79 auto since = std::chrono::duration<double>(now-last_send).count();
80 auto interval = beacon_interval;
81 if (since >= interval*.90) {
82 if (!_send()) {
83 interval = 0.5; /* 500ms */
84 }
85 } else {
86 interval -= since;
87 }
88 dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
89 c.wait_for(lock, interval*1s);
90 }
91 });
92 }
93
94 bool Beacon::ms_can_fast_dispatch2(const cref_t<Message>& m) const
95 {
96 return m->get_type() == MSG_MDS_BEACON;
97 }
98
99 void Beacon::ms_fast_dispatch2(const ref_t<Message>& m)
100 {
101 bool handled = ms_dispatch2(m);
102 ceph_assert(handled);
103 }
104
105 bool Beacon::ms_dispatch2(const ref_t<Message>& m)
106 {
107 if (m->get_type() == MSG_MDS_BEACON) {
108 if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
109 handle_mds_beacon(ref_cast<MMDSBeacon>(m));
110 }
111 return true;
112 }
113
114 return false;
115 }
116
117
118 /**
119 * Update lagginess state based on response from remote MDSMonitor
120 *
121 * This function puts the passed message before returning
122 */
123 void Beacon::handle_mds_beacon(const cref_t<MMDSBeacon> &m)
124 {
125 std::unique_lock lock(mutex);
126
127 version_t seq = m->get_seq();
128
129 // update lab
130 auto it = seq_stamp.find(seq);
131 if (it != seq_stamp.end()) {
132 auto now = clock::now();
133
134 last_acked_stamp = it->second;
135 auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();
136
137 dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;
138
139 if (laggy && rtt < g_conf()->mds_beacon_grace) {
140 dout(0) << " MDS is no longer laggy" << dendl;
141 laggy = false;
142 last_laggy = now;
143 }
144
145 // clean up seq_stamp map
146 seq_stamp.erase(seq_stamp.begin(), ++it);
147
148 // Wake a waiter up if present
149 cvar.notify_all();
150 } else {
151 dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
152 << " seq " << m->get_seq() << " dne" << dendl;
153 }
154 }
155
156
157 void Beacon::send()
158 {
159 std::unique_lock lock(mutex);
160 _send();
161 }
162
163
164 void Beacon::send_and_wait(const double duration)
165 {
166 std::unique_lock lock(mutex);
167 _send();
168 auto awaiting_seq = last_seq;
169 dout(20) << __func__ << ": awaiting " << awaiting_seq
170 << " for up to " << duration << "s" << dendl;
171
172 auto start = clock::now();
173 while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
174 auto now = clock::now();
175 auto s = duration*.95-std::chrono::duration<double>(now-start).count();
176 if (s < 0) break;
177 cvar.wait_for(lock, s*1s);
178 }
179 }
180
181
182 /**
183 * Call periodically, or when you have updated the desired state
184 */
185 bool Beacon::_send()
186 {
187 auto now = clock::now();
188 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
189
190 if (!cct->get_heartbeat_map()->is_healthy()) {
191 /* If anything isn't progressing, let avoid sending a beacon so that
192 * the MDS will consider us laggy */
193 dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
194 return false;
195 }
196
197 ++last_seq;
198 dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;
199
200 seq_stamp[last_seq] = now;
201
202 ceph_assert(want_state != MDSMap::STATE_NULL);
203
204 auto beacon = make_message<MMDSBeacon>(
205 monc->get_fsid(), mds_gid_t(monc->get_global_id()),
206 name,
207 epoch,
208 want_state,
209 last_seq,
210 CEPH_FEATURES_SUPPORTED_DEFAULT);
211 beacon->set_health(health);
212 beacon->set_compat(compat);
213 beacon->set_fs(g_conf().get_val<std::string>("mds_join_fs"));
214 // piggyback the sys info on beacon msg
215 if (want_state == MDSMap::STATE_BOOT) {
216 map<string, string> sys_info;
217 collect_sys_info(&sys_info, cct);
218 sys_info["addr"] = stringify(monc->get_myaddrs());
219 beacon->set_sys_info(sys_info);
220 }
221 monc->send_mon_message(beacon.detach());
222 last_send = now;
223 return true;
224 }
225
226 /**
227 * Call this when there is a new MDSMap available
228 */
229 void Beacon::notify_mdsmap(const MDSMap &mdsmap)
230 {
231 std::unique_lock lock(mutex);
232
233 _notify_mdsmap(mdsmap);
234 }
235
236 void Beacon::_notify_mdsmap(const MDSMap &mdsmap)
237 {
238 ceph_assert(mdsmap.get_epoch() >= epoch);
239
240 if (mdsmap.get_epoch() >= epoch) {
241 epoch = mdsmap.get_epoch();
242 }
243 }
244
245
246 bool Beacon::is_laggy()
247 {
248 std::unique_lock lock(mutex);
249
250 auto now = clock::now();
251 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
252 if (since > g_conf()->mds_beacon_grace) {
253 if (!laggy) {
254 dout(1) << "MDS connection to Monitors appears to be laggy; " << since
255 << "s since last acked beacon" << dendl;
256 }
257 laggy = true;
258 return true;
259 }
260 return false;
261 }
262
263 void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState newstate)
264 {
265 std::unique_lock lock(mutex);
266
267 // Update mdsmap epoch atomically with updating want_state, so that when
268 // we send a beacon with the new want state it has the latest epoch, and
269 // once we have updated to the latest epoch, we are not sending out
270 // a stale want_state (i.e. one from before making it through MDSMap
271 // handling)
272 _notify_mdsmap(mdsmap);
273
274 if (want_state != newstate) {
275 dout(5) << __func__ << ": "
276 << ceph_mds_state_name(want_state) << " -> "
277 << ceph_mds_state_name(newstate) << dendl;
278 want_state = newstate;
279 }
280 }
281
282
283 /**
284 * We are 'shown' an MDS briefly in order to update
285 * some health metrics that we will send in the next
286 * beacon.
287 */
288 void Beacon::notify_health(MDSRank const *mds)
289 {
290 std::unique_lock lock(mutex);
291 if (!mds) {
292 // No MDS rank held
293 return;
294 }
295
296 // I'm going to touch this MDS, so it must be locked
297 ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
298
299 health.metrics.clear();
300
301 // Detect presence of entries in DamageTable
302 if (!mds->damage_table.empty()) {
303 MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string(
304 "Metadata damage detected"));
305 health.metrics.push_back(m);
306 }
307
308 // Detect MDS_HEALTH_TRIM condition
309 // Indicates MDS is not trimming promptly
310 {
311 if (mds->mdlog->get_num_segments() > (size_t)(g_conf()->mds_log_max_segments * g_conf().get_val<double>("mds_log_warn_factor"))) {
312 CachedStackStringStream css;
313 *css << "Behind on trimming (" << mds->mdlog->get_num_segments()
314 << "/" << g_conf()->mds_log_max_segments << ")";
315
316 MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, css->strv());
317 m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments());
318 m.metadata["max_segments"] = stringify(g_conf()->mds_log_max_segments);
319 health.metrics.push_back(m);
320 }
321 }
322
323 // Detect clients failing to respond to modifications to capabilities in
324 // CLIENT_CAPS messages.
325 {
326 auto&& late_clients = mds->locker->get_late_revoking_clients(mds->mdsmap->get_session_timeout());
327 std::vector<MDSHealthMetric> late_cap_metrics;
328
329 for (const auto& client : late_clients) {
330 // client_t is equivalent to session.info.inst.name.num
331 // Construct an entity_name_t to lookup into SessionMap
332 entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, client.v);
333 Session const *s = mds->sessionmap.get_session(ename);
334 if (s == NULL) {
335 // Shouldn't happen, but not worth crashing if it does as this is
336 // just health-reporting code.
337 derr << "Client ID without session: " << client.v << dendl;
338 continue;
339 }
340
341 CachedStackStringStream css;
342 *css << "Client " << s->get_human_name() << " failing to respond to capability release";
343 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, css->strv());
344 m.metadata["client_id"] = stringify(client.v);
345 late_cap_metrics.emplace_back(std::move(m));
346 }
347
348 if (late_cap_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
349 auto&& m = late_cap_metrics;
350 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
351 } else {
352 CachedStackStringStream css;
353 *css << "Many clients (" << late_cap_metrics.size()
354 << ") failing to respond to capability release";
355 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, css->strv());
356 m.metadata["client_count"] = stringify(late_cap_metrics.size());
357 health.metrics.push_back(std::move(m));
358 }
359 }
360
361 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
362 // messages. May be due to buggy client or resource-hogging application.
363 //
364 // Detect clients failing to advance their old_client_tid
365 {
366 std::set<Session*> sessions;
367 mds->sessionmap.get_client_session_set(sessions);
368
369 const auto min_caps_working_set = g_conf().get_val<uint64_t>("mds_min_caps_working_set");
370 const auto recall_warning_threshold = g_conf().get_val<Option::size_t>("mds_recall_warning_threshold");
371 const auto max_completed_requests = g_conf()->mds_max_completed_requests;
372 const auto max_completed_flushes = g_conf()->mds_max_completed_flushes;
373 std::vector<MDSHealthMetric> late_recall_metrics;
374 std::vector<MDSHealthMetric> large_completed_requests_metrics;
375 for (auto& session : sessions) {
376 const uint64_t num_caps = session->get_num_caps();
377 const uint64_t recall_caps = session->get_recall_caps();
378 if (recall_caps > recall_warning_threshold && num_caps > min_caps_working_set) {
379 dout(2) << "Session " << *session <<
380 " is not releasing caps fast enough. Recalled caps at " << recall_caps
381 << " > " << recall_warning_threshold << " (mds_recall_warning_threshold)." << dendl;
382 CachedStackStringStream css;
383 *css << "Client " << session->get_human_name() << " failing to respond to cache pressure";
384 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, css->strv());
385 m.metadata["client_id"] = stringify(session->get_client());
386 late_recall_metrics.emplace_back(std::move(m));
387 }
388 if ((session->get_num_trim_requests_warnings() > 0 &&
389 session->get_num_completed_requests() >= max_completed_requests) ||
390 (session->get_num_trim_flushes_warnings() > 0 &&
391 session->get_num_completed_flushes() >= max_completed_flushes)) {
392 CachedStackStringStream css;
393 *css << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid. ";
394 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, css->strv());
395 m.metadata["client_id"] = stringify(session->get_client());
396 large_completed_requests_metrics.emplace_back(std::move(m));
397 }
398 }
399
400 if (late_recall_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
401 auto&& m = late_recall_metrics;
402 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
403 } else {
404 CachedStackStringStream css;
405 *css << "Many clients (" << late_recall_metrics.size()
406 << ") failing to respond to cache pressure";
407 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, css->strv());
408 m.metadata["client_count"] = stringify(late_recall_metrics.size());
409 health.metrics.push_back(m);
410 late_recall_metrics.clear();
411 }
412
413 if (large_completed_requests_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
414 auto&& m = large_completed_requests_metrics;
415 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
416 } else {
417 CachedStackStringStream css;
418 *css << "Many clients (" << large_completed_requests_metrics.size()
419 << ") failing to advance their oldest client/flush tid";
420 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, css->strv());
421 m.metadata["client_count"] = stringify(large_completed_requests_metrics.size());
422 health.metrics.push_back(m);
423 large_completed_requests_metrics.clear();
424 }
425 }
426
427 // Detect MDS_HEALTH_SLOW_REQUEST condition
428 {
429 int slow = mds->get_mds_slow_req_count();
430 if (slow) {
431 dout(20) << slow << " slow request found" << dendl;
432 CachedStackStringStream css;
433 *css << slow << " slow requests are blocked > " << g_conf()->mds_op_complaint_time << " secs";
434
435 MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, css->strv());
436 health.metrics.push_back(m);
437 }
438 }
439
440 {
441 auto complaint_time = g_conf()->osd_op_complaint_time;
442 auto now = clock::now();
443 auto cutoff = now - ceph::make_timespan(complaint_time);
444
445 std::string count;
446 ceph::coarse_mono_time oldest;
447 if (MDSIOContextBase::check_ios_in_flight(cutoff, count, oldest)) {
448 dout(20) << count << " slow metadata IOs found" << dendl;
449
450 auto oldest_secs = std::chrono::duration<double>(now - oldest).count();
451 CachedStackStringStream css;
452 *css << count << " slow metadata IOs are blocked > " << complaint_time
453 << " secs, oldest blocked for " << (int64_t)oldest_secs << " secs";
454
455 MDSHealthMetric m(MDS_HEALTH_SLOW_METADATA_IO, HEALTH_WARN, css->strv());
456 health.metrics.push_back(m);
457 }
458 }
459
460 // Report a health warning if we are readonly
461 if (mds->mdcache->is_readonly()) {
462 MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
463 "MDS in read-only mode");
464 health.metrics.push_back(m);
465 }
466
467 // Report if we have significantly exceeded our cache size limit
468 if (mds->mdcache->cache_overfull()) {
469 CachedStackStringStream css;
470 *css << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size())
471 << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); "
472 << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, "
473 << mds->mdcache->get_num_strays() << " stray files";
474
475 MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, css->strv());
476 health.metrics.push_back(m);
477 }
478 }
479
480 MDSMap::DaemonState Beacon::get_want_state() const
481 {
482 std::unique_lock lock(mutex);
483 return want_state;
484 }
485