]> git.proxmox.com Git - ceph.git/blame_incremental - ceph/src/mds/Beacon.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / mds / Beacon.cc
... / ...
CommitLineData
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "common/dout.h"
17#include "common/HeartbeatMap.h"
18
19#include "include/stringify.h"
20#include "include/util.h"
21
22#include "mon/MonClient.h"
23#include "mds/MDLog.h"
24#include "mds/MDSRank.h"
25#include "mds/MDSMap.h"
26#include "mds/Locker.h"
27
28#include "Beacon.h"
29
30#include <chrono>
31
32#define dout_context g_ceph_context
33#define dout_subsys ceph_subsys_mds
34#undef dout_prefix
35#define dout_prefix *_dout << "mds.beacon." << name << ' '
36
37using namespace std::chrono_literals;
38
39Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name)
40 :
41 Dispatcher(cct),
42 beacon_interval(g_conf()->mds_beacon_interval),
43 monc(monc),
44 name(name),
45 compat(MDSMap::get_compat_set_all())
46{
47}
48
49Beacon::~Beacon()
50{
51 shutdown();
52}
53
54void Beacon::shutdown()
55{
56 std::unique_lock<std::mutex> lock(mutex);
57 if (!finished) {
58 finished = true;
59 lock.unlock();
60 if (sender.joinable())
61 sender.join();
62 }
63}
64
65void Beacon::init(const MDSMap &mdsmap)
66{
67 std::unique_lock lock(mutex);
68
69 _notify_mdsmap(mdsmap);
70
71 sender = std::thread([this]() {
72 std::unique_lock<std::mutex> lock(mutex);
73 std::condition_variable c; // no one wakes us
74 while (!finished) {
75 auto now = clock::now();
76 auto since = std::chrono::duration<double>(now-last_send).count();
77 auto interval = beacon_interval;
78 if (since >= interval*.90) {
79 if (!_send()) {
80 interval = 0.5; /* 500ms */
81 }
82 } else {
83 interval -= since;
84 }
85 dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
86 c.wait_for(lock, interval*1s);
87 }
88 });
89}
90
91bool Beacon::ms_can_fast_dispatch2(const cref_t<Message>& m) const
92{
93 return m->get_type() == MSG_MDS_BEACON;
94}
95
96void Beacon::ms_fast_dispatch2(const ref_t<Message>& m)
97{
98 bool handled = ms_dispatch2(m);
99 ceph_assert(handled);
100}
101
102bool Beacon::ms_dispatch2(const ref_t<Message>& m)
103{
104 if (m->get_type() == MSG_MDS_BEACON) {
105 if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
106 handle_mds_beacon(ref_cast<MMDSBeacon>(m));
107 }
108 return true;
109 }
110
111 return false;
112}
113
114
115/**
116 * Update lagginess state based on response from remote MDSMonitor
117 *
118 * This function puts the passed message before returning
119 */
120void Beacon::handle_mds_beacon(const cref_t<MMDSBeacon> &m)
121{
122 std::unique_lock lock(mutex);
123
124 version_t seq = m->get_seq();
125
126 // update lab
127 auto it = seq_stamp.find(seq);
128 if (it != seq_stamp.end()) {
129 auto now = clock::now();
130
131 last_acked_stamp = it->second;
132 auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();
133
134 dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;
135
136 if (laggy && rtt < g_conf()->mds_beacon_grace) {
137 dout(0) << " MDS is no longer laggy" << dendl;
138 laggy = false;
139 last_laggy = now;
140 }
141
142 // clean up seq_stamp map
143 seq_stamp.erase(seq_stamp.begin(), ++it);
144
145 // Wake a waiter up if present
146 cvar.notify_all();
147 } else {
148 dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
149 << " seq " << m->get_seq() << " dne" << dendl;
150 }
151}
152
153
154void Beacon::send()
155{
156 std::unique_lock lock(mutex);
157 _send();
158}
159
160
161void Beacon::send_and_wait(const double duration)
162{
163 std::unique_lock lock(mutex);
164 _send();
165 auto awaiting_seq = last_seq;
166 dout(20) << __func__ << ": awaiting " << awaiting_seq
167 << " for up to " << duration << "s" << dendl;
168
169 auto start = clock::now();
170 while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
171 auto now = clock::now();
172 auto s = duration*.95-std::chrono::duration<double>(now-start).count();
173 if (s < 0) break;
174 cvar.wait_for(lock, s*1s);
175 }
176}
177
178
179/**
180 * Call periodically, or when you have updated the desired state
181 */
182bool Beacon::_send()
183{
184 auto now = clock::now();
185 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
186
187 if (!cct->get_heartbeat_map()->is_healthy()) {
188 /* If anything isn't progressing, let avoid sending a beacon so that
189 * the MDS will consider us laggy */
190 dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
191 return false;
192 }
193
194 ++last_seq;
195 dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;
196
197 seq_stamp[last_seq] = now;
198
199 ceph_assert(want_state != MDSMap::STATE_NULL);
200
201 auto beacon = make_message<MMDSBeacon>(
202 monc->get_fsid(), mds_gid_t(monc->get_global_id()),
203 name,
204 epoch,
205 want_state,
206 last_seq,
207 CEPH_FEATURES_SUPPORTED_DEFAULT);
208 beacon->set_health(health);
209 beacon->set_compat(compat);
210 beacon->set_fs(g_conf().get_val<std::string>("mds_join_fs"));
211 // piggyback the sys info on beacon msg
212 if (want_state == MDSMap::STATE_BOOT) {
213 map<string, string> sys_info;
214 collect_sys_info(&sys_info, cct);
215 sys_info["addr"] = stringify(monc->get_myaddrs());
216 beacon->set_sys_info(sys_info);
217 }
218 monc->send_mon_message(beacon.detach());
219 last_send = now;
220 return true;
221}
222
223/**
224 * Call this when there is a new MDSMap available
225 */
226void Beacon::notify_mdsmap(const MDSMap &mdsmap)
227{
228 std::unique_lock lock(mutex);
229
230 _notify_mdsmap(mdsmap);
231}
232
233void Beacon::_notify_mdsmap(const MDSMap &mdsmap)
234{
235 ceph_assert(mdsmap.get_epoch() >= epoch);
236
237 if (mdsmap.get_epoch() >= epoch) {
238 epoch = mdsmap.get_epoch();
239 }
240}
241
242
243bool Beacon::is_laggy()
244{
245 std::unique_lock lock(mutex);
246
247 auto now = clock::now();
248 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
249 if (since > g_conf()->mds_beacon_grace) {
250 if (!laggy) {
251 dout(1) << "MDS connection to Monitors appears to be laggy; " << since
252 << "s since last acked beacon" << dendl;
253 }
254 laggy = true;
255 return true;
256 }
257 return false;
258}
259
260void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState newstate)
261{
262 std::unique_lock lock(mutex);
263
264 // Update mdsmap epoch atomically with updating want_state, so that when
265 // we send a beacon with the new want state it has the latest epoch, and
266 // once we have updated to the latest epoch, we are not sending out
267 // a stale want_state (i.e. one from before making it through MDSMap
268 // handling)
269 _notify_mdsmap(mdsmap);
270
271 if (want_state != newstate) {
272 dout(5) << __func__ << ": "
273 << ceph_mds_state_name(want_state) << " -> "
274 << ceph_mds_state_name(newstate) << dendl;
275 want_state = newstate;
276 }
277}
278
279
280/**
281 * We are 'shown' an MDS briefly in order to update
282 * some health metrics that we will send in the next
283 * beacon.
284 */
285void Beacon::notify_health(MDSRank const *mds)
286{
287 std::unique_lock lock(mutex);
288 if (!mds) {
289 // No MDS rank held
290 return;
291 }
292
293 // I'm going to touch this MDS, so it must be locked
294 ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
295
296 health.metrics.clear();
297
298 // Detect presence of entries in DamageTable
299 if (!mds->damage_table.empty()) {
300 MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string(
301 "Metadata damage detected"));
302 health.metrics.push_back(m);
303 }
304
305 // Detect MDS_HEALTH_TRIM condition
306 // Indicates MDS is not trimming promptly
307 {
308 if (mds->mdlog->get_num_segments() > (size_t)(g_conf()->mds_log_max_segments * g_conf().get_val<double>("mds_log_warn_factor"))) {
309 CachedStackStringStream css;
310 *css << "Behind on trimming (" << mds->mdlog->get_num_segments()
311 << "/" << g_conf()->mds_log_max_segments << ")";
312
313 MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, css->strv());
314 m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments());
315 m.metadata["max_segments"] = stringify(g_conf()->mds_log_max_segments);
316 health.metrics.push_back(m);
317 }
318 }
319
320 // Detect clients failing to respond to modifications to capabilities in
321 // CLIENT_CAPS messages.
322 {
323 auto&& late_clients = mds->locker->get_late_revoking_clients(mds->mdsmap->get_session_timeout());
324 std::vector<MDSHealthMetric> late_cap_metrics;
325
326 for (const auto& client : late_clients) {
327 // client_t is equivalent to session.info.inst.name.num
328 // Construct an entity_name_t to lookup into SessionMap
329 entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, client.v);
330 Session const *s = mds->sessionmap.get_session(ename);
331 if (s == NULL) {
332 // Shouldn't happen, but not worth crashing if it does as this is
333 // just health-reporting code.
334 derr << "Client ID without session: " << client.v << dendl;
335 continue;
336 }
337
338 CachedStackStringStream css;
339 *css << "Client " << s->get_human_name() << " failing to respond to capability release";
340 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, css->strv());
341 m.metadata["client_id"] = stringify(client.v);
342 late_cap_metrics.emplace_back(std::move(m));
343 }
344
345 if (late_cap_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
346 auto&& m = late_cap_metrics;
347 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
348 } else {
349 CachedStackStringStream css;
350 *css << "Many clients (" << late_cap_metrics.size()
351 << ") failing to respond to capability release";
352 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, css->strv());
353 m.metadata["client_count"] = stringify(late_cap_metrics.size());
354 health.metrics.push_back(std::move(m));
355 }
356 }
357
358 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
359 // messages. May be due to buggy client or resource-hogging application.
360 //
361 // Detect clients failing to advance their old_client_tid
362 {
363 set<Session*> sessions;
364 mds->sessionmap.get_client_session_set(sessions);
365
366 const auto min_caps_working_set = g_conf().get_val<uint64_t>("mds_min_caps_working_set");
367 const auto recall_warning_threshold = g_conf().get_val<Option::size_t>("mds_recall_warning_threshold");
368 const auto max_completed_requests = g_conf()->mds_max_completed_requests;
369 const auto max_completed_flushes = g_conf()->mds_max_completed_flushes;
370 std::vector<MDSHealthMetric> late_recall_metrics;
371 std::vector<MDSHealthMetric> large_completed_requests_metrics;
372 for (auto& session : sessions) {
373 const uint64_t num_caps = session->get_num_caps();
374 const uint64_t recall_caps = session->get_recall_caps();
375 if (recall_caps > recall_warning_threshold && num_caps > min_caps_working_set) {
376 dout(2) << "Session " << *session <<
377 " is not releasing caps fast enough. Recalled caps at " << recall_caps
378 << " > " << recall_warning_threshold << " (mds_recall_warning_threshold)." << dendl;
379 CachedStackStringStream css;
380 *css << "Client " << session->get_human_name() << " failing to respond to cache pressure";
381 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, css->strv());
382 m.metadata["client_id"] = stringify(session->get_client());
383 late_recall_metrics.emplace_back(std::move(m));
384 }
385 if ((session->get_num_trim_requests_warnings() > 0 &&
386 session->get_num_completed_requests() >= max_completed_requests) ||
387 (session->get_num_trim_flushes_warnings() > 0 &&
388 session->get_num_completed_flushes() >= max_completed_flushes)) {
389 CachedStackStringStream css;
390 *css << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid. ";
391 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, css->strv());
392 m.metadata["client_id"] = stringify(session->get_client());
393 large_completed_requests_metrics.emplace_back(std::move(m));
394 }
395 }
396
397 if (late_recall_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
398 auto&& m = late_recall_metrics;
399 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
400 } else {
401 CachedStackStringStream css;
402 *css << "Many clients (" << late_recall_metrics.size()
403 << ") failing to respond to cache pressure";
404 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, css->strv());
405 m.metadata["client_count"] = stringify(late_recall_metrics.size());
406 health.metrics.push_back(m);
407 late_recall_metrics.clear();
408 }
409
410 if (large_completed_requests_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) {
411 auto&& m = large_completed_requests_metrics;
412 health.metrics.insert(std::end(health.metrics), std::cbegin(m), std::cend(m));
413 } else {
414 CachedStackStringStream css;
415 *css << "Many clients (" << large_completed_requests_metrics.size()
416 << ") failing to advance their oldest client/flush tid";
417 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, css->strv());
418 m.metadata["client_count"] = stringify(large_completed_requests_metrics.size());
419 health.metrics.push_back(m);
420 large_completed_requests_metrics.clear();
421 }
422 }
423
424 // Detect MDS_HEALTH_SLOW_REQUEST condition
425 {
426 int slow = mds->get_mds_slow_req_count();
427 if (slow) {
428 dout(20) << slow << " slow request found" << dendl;
429 CachedStackStringStream css;
430 *css << slow << " slow requests are blocked > " << g_conf()->mds_op_complaint_time << " secs";
431
432 MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, css->strv());
433 health.metrics.push_back(m);
434 }
435 }
436
437 {
438 auto complaint_time = g_conf()->osd_op_complaint_time;
439 auto now = clock::now();
440 auto cutoff = now - ceph::make_timespan(complaint_time);
441
442 std::string count;
443 ceph::coarse_mono_time oldest;
444 if (MDSIOContextBase::check_ios_in_flight(cutoff, count, oldest)) {
445 dout(20) << count << " slow metadata IOs found" << dendl;
446
447 auto oldest_secs = std::chrono::duration<double>(now - oldest).count();
448 CachedStackStringStream css;
449 *css << count << " slow metadata IOs are blocked > " << complaint_time
450 << " secs, oldest blocked for " << (int64_t)oldest_secs << " secs";
451
452 MDSHealthMetric m(MDS_HEALTH_SLOW_METADATA_IO, HEALTH_WARN, css->strv());
453 health.metrics.push_back(m);
454 }
455 }
456
457 // Report a health warning if we are readonly
458 if (mds->mdcache->is_readonly()) {
459 MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
460 "MDS in read-only mode");
461 health.metrics.push_back(m);
462 }
463
464 // Report if we have significantly exceeded our cache size limit
465 if (mds->mdcache->cache_overfull()) {
466 CachedStackStringStream css;
467 *css << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size())
468 << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); "
469 << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, "
470 << mds->mdcache->get_num_strays() << " stray files";
471
472 MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, css->strv());
473 health.metrics.push_back(m);
474 }
475}
476
477MDSMap::DaemonState Beacon::get_want_state() const
478{
479 std::unique_lock lock(mutex);
480 return want_state;
481}
482