]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Beacon.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mds / Beacon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
18
19 #include "include/stringify.h"
20 #include "include/util.h"
21
22 #include "mon/MonClient.h"
23 #include "mds/MDLog.h"
24 #include "mds/MDSRank.h"
25 #include "mds/MDSMap.h"
26 #include "mds/Locker.h"
27
28 #include "Beacon.h"
29
30 #include <chrono>
31
32 #define dout_context g_ceph_context
33 #define dout_subsys ceph_subsys_mds
34 #undef dout_prefix
35 #define dout_prefix *_dout << "mds.beacon." << name << ' '
36
37 using namespace std::chrono_literals;
38
39 Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name)
40 :
41 Dispatcher(cct),
42 beacon_interval(g_conf()->mds_beacon_interval),
43 monc(monc),
44 name(name)
45 {
46 }
47
48 Beacon::~Beacon()
49 {
50 shutdown();
51 }
52
53 void Beacon::shutdown()
54 {
55 std::unique_lock<std::mutex> lock(mutex);
56 if (!finished) {
57 finished = true;
58 lock.unlock();
59 if (sender.joinable())
60 sender.join();
61 }
62 }
63
64 void Beacon::init(const MDSMap &mdsmap)
65 {
66 std::unique_lock lock(mutex);
67
68 _notify_mdsmap(mdsmap);
69
70 sender = std::thread([this]() {
71 std::unique_lock<std::mutex> lock(mutex);
72 std::condition_variable c; // no one wakes us
73 while (!finished) {
74 auto now = clock::now();
75 auto since = std::chrono::duration<double>(now-last_send).count();
76 auto interval = beacon_interval;
77 if (since >= interval*.90) {
78 if (!_send()) {
79 interval = 0.5; /* 500ms */
80 }
81 } else {
82 interval -= since;
83 }
84 dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
85 c.wait_for(lock, interval*1s);
86 }
87 });
88 }
89
90 bool Beacon::ms_can_fast_dispatch2(const cref_t<Message>& m) const
91 {
92 return m->get_type() == MSG_MDS_BEACON;
93 }
94
95 void Beacon::ms_fast_dispatch2(const ref_t<Message>& m)
96 {
97 bool handled = ms_dispatch2(m);
98 ceph_assert(handled);
99 }
100
101 bool Beacon::ms_dispatch2(const ref_t<Message>& m)
102 {
103 if (m->get_type() == MSG_MDS_BEACON) {
104 if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
105 handle_mds_beacon(ref_cast<MMDSBeacon>(m));
106 }
107 return true;
108 }
109
110 return false;
111 }
112
113
114 /**
115 * Update lagginess state based on response from remote MDSMonitor
116 *
117 * This function puts the passed message before returning
118 */
119 void Beacon::handle_mds_beacon(const cref_t<MMDSBeacon> &m)
120 {
121 std::unique_lock lock(mutex);
122
123 version_t seq = m->get_seq();
124
125 // update lab
126 auto it = seq_stamp.find(seq);
127 if (it != seq_stamp.end()) {
128 auto now = clock::now();
129
130 last_acked_stamp = it->second;
131 auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();
132
133 dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;
134
135 if (laggy && rtt < g_conf()->mds_beacon_grace) {
136 dout(0) << " MDS is no longer laggy" << dendl;
137 laggy = false;
138 last_laggy = now;
139 }
140
141 // clean up seq_stamp map
142 seq_stamp.erase(seq_stamp.begin(), ++it);
143
144 // Wake a waiter up if present
145 cvar.notify_all();
146 } else {
147 dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
148 << " seq " << m->get_seq() << " dne" << dendl;
149 }
150 }
151
152
153 void Beacon::send()
154 {
155 std::unique_lock lock(mutex);
156 _send();
157 }
158
159
160 void Beacon::send_and_wait(const double duration)
161 {
162 std::unique_lock lock(mutex);
163 _send();
164 auto awaiting_seq = last_seq;
165 dout(20) << __func__ << ": awaiting " << awaiting_seq
166 << " for up to " << duration << "s" << dendl;
167
168 auto start = clock::now();
169 while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
170 auto now = clock::now();
171 auto s = duration*.95-std::chrono::duration<double>(now-start).count();
172 if (s < 0) break;
173 cvar.wait_for(lock, s*1s);
174 }
175 }
176
177
178 /**
179 * Call periodically, or when you have updated the desired state
180 */
181 bool Beacon::_send()
182 {
183 auto now = clock::now();
184 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
185
186 if (!cct->get_heartbeat_map()->is_healthy()) {
187 /* If anything isn't progressing, let avoid sending a beacon so that
188 * the MDS will consider us laggy */
189 dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
190 return false;
191 }
192
193 ++last_seq;
194 dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;
195
196 seq_stamp[last_seq] = now;
197
198 ceph_assert(want_state != MDSMap::STATE_NULL);
199
200 auto beacon = make_message<MMDSBeacon>(
201 monc->get_fsid(), mds_gid_t(monc->get_global_id()),
202 name,
203 epoch,
204 want_state,
205 last_seq,
206 CEPH_FEATURES_SUPPORTED_DEFAULT);
207 beacon->set_health(health);
208 beacon->set_compat(compat);
209 beacon->set_fs(g_conf().get_val<std::string>("mds_join_fs"));
210 // piggyback the sys info on beacon msg
211 if (want_state == MDSMap::STATE_BOOT) {
212 map<string, string> sys_info;
213 collect_sys_info(&sys_info, cct);
214 sys_info["addr"] = stringify(monc->get_myaddrs());
215 beacon->set_sys_info(sys_info);
216 }
217 monc->send_mon_message(beacon.detach());
218 last_send = now;
219 return true;
220 }
221
222 /**
223 * Call this when there is a new MDSMap available
224 */
225 void Beacon::notify_mdsmap(const MDSMap &mdsmap)
226 {
227 std::unique_lock lock(mutex);
228
229 _notify_mdsmap(mdsmap);
230 }
231
232 void Beacon::_notify_mdsmap(const MDSMap &mdsmap)
233 {
234 ceph_assert(mdsmap.get_epoch() >= epoch);
235
236 if (mdsmap.get_epoch() != epoch) {
237 epoch = mdsmap.get_epoch();
238 compat = MDSMap::get_compat_set_default();
239 compat.merge(mdsmap.compat);
240 }
241 }
242
243
244 bool Beacon::is_laggy()
245 {
246 std::unique_lock lock(mutex);
247
248 auto now = clock::now();
249 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
250 if (since > g_conf()->mds_beacon_grace) {
251 if (!laggy) {
252 dout(1) << "MDS connection to Monitors appears to be laggy; " << since
253 << "s since last acked beacon" << dendl;
254 }
255 laggy = true;
256 return true;
257 }
258 return false;
259 }
260
261 void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState newstate)
262 {
263 std::unique_lock lock(mutex);
264
265 // Update mdsmap epoch atomically with updating want_state, so that when
266 // we send a beacon with the new want state it has the latest epoch, and
267 // once we have updated to the latest epoch, we are not sending out
268 // a stale want_state (i.e. one from before making it through MDSMap
269 // handling)
270 _notify_mdsmap(mdsmap);
271
272 if (want_state != newstate) {
273 dout(5) << __func__ << ": "
274 << ceph_mds_state_name(want_state) << " -> "
275 << ceph_mds_state_name(newstate) << dendl;
276 want_state = newstate;
277 }
278 }
279
280
281 /**
282 * We are 'shown' an MDS briefly in order to update
283 * some health metrics that we will send in the next
284 * beacon.
285 */
286 void Beacon::notify_health(MDSRank const *mds)
287 {
288 std::unique_lock lock(mutex);
289 if (!mds) {
290 // No MDS rank held
291 return;
292 }
293
294 // I'm going to touch this MDS, so it must be locked
295 ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
296
297 health.metrics.clear();
298
299 // Detect presence of entries in DamageTable
300 if (!mds->damage_table.empty()) {
301 MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string(
302 "Metadata damage detected"));
303 health.metrics.push_back(m);
304 }
305
306 // Detect MDS_HEALTH_TRIM condition
307 // Indicates MDS is not trimming promptly
308 {
309 if (mds->mdlog->get_num_segments() > (size_t)(g_conf()->mds_log_max_segments * g_conf().get_val<double>("mds_log_warn_factor"))) {
310 CachedStackStringStream css;
311 *css << "Behind on trimming (" << mds->mdlog->get_num_segments()
312 << "/" << g_conf()->mds_log_max_segments << ")";
313
314 MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, css->strv());
315 m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments());
316 m.metadata["max_segments"] = stringify(g_conf()->mds_log_max_segments);
317 health.metrics.push_back(m);
318 }
319 }
320
321 // Detect clients failing to respond to modifications to capabilities in
322 // CLIENT_CAPS messages.
323 {
324 auto&& late_clients = mds->locker->get_late_revoking_clients(mds->mdsmap->get_session_timeout());
325 std::vector<MDSHealthMetric> late_cap_metrics;
326
327 for (const auto& client : late_clients) {
328 // client_t is equivalent to session.info.inst.name.num
329 // Construct an entity_name_t to lookup into SessionMap
330 entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, client.v);
331 Session const *s = mds->sessionmap.get_session(ename);
332 if (s == NULL) {
333 // Shouldn't happen, but not worth crashing if it does as this is
334 // just health-reporting code.
335 derr << "Client ID without session: " << client.v << dendl;
336 continue;
337 }
338
339 CachedStackStringStream css;
340 *css << "Client " << s->get_human_name() << " failing to respond to capability release";
341 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, css->strv());
342 m.metadata["client_id"] = stringify(client.v);
343 late_cap_metrics.emplace_back(std::move(m));
344 }
345
346 if (late_cap_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
347 auto&& m = late_cap_metrics;
348 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
349 } else {
350 CachedStackStringStream css;
351 *css << "Many clients (" << late_cap_metrics.size()
352 << ") failing to respond to capability release";
353 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, css->strv());
354 m.metadata["client_count"] = stringify(late_cap_metrics.size());
355 health.metrics.push_back(std::move(m));
356 }
357 }
358
359 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
360 // messages. May be due to buggy client or resource-hogging application.
361 //
362 // Detect clients failing to advance their old_client_tid
363 {
364 set<Session*> sessions;
365 mds->sessionmap.get_client_session_set(sessions);
366
367 const auto min_caps_working_set = g_conf().get_val<uint64_t>("mds_min_caps_working_set");
368 const auto recall_warning_threshold = g_conf().get_val<Option::size_t>("mds_recall_warning_threshold");
369 const auto max_completed_requests = g_conf()->mds_max_completed_requests;
370 const auto max_completed_flushes = g_conf()->mds_max_completed_flushes;
371 std::vector<MDSHealthMetric> late_recall_metrics;
372 std::vector<MDSHealthMetric> large_completed_requests_metrics;
373 for (auto& session : sessions) {
374 const uint64_t num_caps = session->get_num_caps();
375 const uint64_t recall_caps = session->get_recall_caps();
376 if (recall_caps > recall_warning_threshold && num_caps > min_caps_working_set) {
377 dout(2) << "Session " << *session <<
378 " is not releasing caps fast enough. Recalled caps at " << recall_caps
379 << " > " << recall_warning_threshold << " (mds_recall_warning_threshold)." << dendl;
380 CachedStackStringStream css;
381 *css << "Client " << session->get_human_name() << " failing to respond to cache pressure";
382 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, css->strv());
383 m.metadata["client_id"] = stringify(session->get_client());
384 late_recall_metrics.emplace_back(std::move(m));
385 }
386 if ((session->get_num_trim_requests_warnings() > 0 &&
387 session->get_num_completed_requests() >= max_completed_requests) ||
388 (session->get_num_trim_flushes_warnings() > 0 &&
389 session->get_num_completed_flushes() >= max_completed_flushes)) {
390 CachedStackStringStream css;
391 *css << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid. ";
392 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, css->strv());
393 m.metadata["client_id"] = stringify(session->get_client());
394 large_completed_requests_metrics.emplace_back(std::move(m));
395 }
396 }
397
398 if (late_recall_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
399 auto&& m = late_recall_metrics;
400 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
401 } else {
402 CachedStackStringStream css;
403 *css << "Many clients (" << late_recall_metrics.size()
404 << ") failing to respond to cache pressure";
405 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, css->strv());
406 m.metadata["client_count"] = stringify(late_recall_metrics.size());
407 health.metrics.push_back(m);
408 late_recall_metrics.clear();
409 }
410
411 if (large_completed_requests_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
412 auto&& m = large_completed_requests_metrics;
413 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
414 } else {
415 CachedStackStringStream css;
416 *css << "Many clients (" << large_completed_requests_metrics.size()
417 << ") failing to advance their oldest client/flush tid";
418 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, css->strv());
419 m.metadata["client_count"] = stringify(large_completed_requests_metrics.size());
420 health.metrics.push_back(m);
421 large_completed_requests_metrics.clear();
422 }
423 }
424
425 // Detect MDS_HEALTH_SLOW_REQUEST condition
426 {
427 int slow = mds->get_mds_slow_req_count();
428 if (slow) {
429 dout(20) << slow << " slow request found" << dendl;
430 CachedStackStringStream css;
431 *css << slow << " slow requests are blocked > " << g_conf()->mds_op_complaint_time << " secs";
432
433 MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, css->strv());
434 health.metrics.push_back(m);
435 }
436 }
437
438 {
439 auto complaint_time = g_conf()->osd_op_complaint_time;
440 auto now = clock::now();
441 auto cutoff = now - ceph::make_timespan(complaint_time);
442
443 std::string count;
444 ceph::coarse_mono_time oldest;
445 if (MDSIOContextBase::check_ios_in_flight(cutoff, count, oldest)) {
446 dout(20) << count << " slow metadata IOs found" << dendl;
447
448 auto oldest_secs = std::chrono::duration<double>(now - oldest).count();
449 CachedStackStringStream css;
450 *css << count << " slow metadata IOs are blocked > " << complaint_time
451 << " secs, oldest blocked for " << (int64_t)oldest_secs << " secs";
452
453 MDSHealthMetric m(MDS_HEALTH_SLOW_METADATA_IO, HEALTH_WARN, css->strv());
454 health.metrics.push_back(m);
455 }
456 }
457
458 // Report a health warning if we are readonly
459 if (mds->mdcache->is_readonly()) {
460 MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
461 "MDS in read-only mode");
462 health.metrics.push_back(m);
463 }
464
465 // Report if we have significantly exceeded our cache size limit
466 if (mds->mdcache->cache_overfull()) {
467 CachedStackStringStream css;
468 *css << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size())
469 << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); "
470 << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, "
471 << mds->mdcache->get_num_strays() << " stray files";
472
473 MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, css->strv());
474 health.metrics.push_back(m);
475 }
476 }
477
478 MDSMap::DaemonState Beacon::get_want_state() const
479 {
480 std::unique_lock lock(mutex);
481 return want_state;
482 }
483