]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Beacon.cc
update sources to v12.2.1
[ceph.git] / ceph / src / mds / Beacon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
18
19 #include "include/stringify.h"
20 #include "include/util.h"
21
22 #include "messages/MMDSBeacon.h"
23 #include "mon/MonClient.h"
24 #include "mds/MDLog.h"
25 #include "mds/MDSRank.h"
26 #include "mds/MDSMap.h"
27 #include "mds/Locker.h"
28
29 #include "Beacon.h"
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mds
33 #undef dout_prefix
34 #define dout_prefix *_dout << "mds.beacon." << name << ' '
35
36
37 class Beacon::C_MDS_BeaconSender : public Context {
38 public:
39 explicit C_MDS_BeaconSender(Beacon *beacon_) : beacon(beacon_) {}
40 void finish(int r) override {
41 assert(beacon->lock.is_locked_by_me());
42 beacon->sender = NULL;
43 beacon->_send();
44 }
45 private:
46 Beacon *beacon;
47 };
48
49 Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string name_) :
50 Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
51 name(name_), standby_for_rank(MDS_RANK_NONE),
52 standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
53 awaiting_seq(-1)
54 {
55 last_seq = 0;
56 sender = NULL;
57 was_laggy = false;
58
59 epoch = 0;
60 }
61
62
63 Beacon::~Beacon()
64 {
65 }
66
67
68 void Beacon::init(MDSMap const *mdsmap)
69 {
70 Mutex::Locker l(lock);
71 assert(mdsmap != NULL);
72
73 _notify_mdsmap(mdsmap);
74 standby_for_rank = mds_rank_t(g_conf->mds_standby_for_rank);
75 standby_for_name = g_conf->mds_standby_for_name;
76 standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
77 standby_replay = g_conf->mds_standby_replay;
78
79 // Spawn threads and start messaging
80 timer.init();
81 _send();
82 }
83
84
85 void Beacon::shutdown()
86 {
87 Mutex::Locker l(lock);
88 if (sender) {
89 timer.cancel_event(sender);
90 sender = NULL;
91 }
92 timer.shutdown();
93 }
94
95
96 bool Beacon::ms_dispatch(Message *m)
97 {
98 if (m->get_type() == MSG_MDS_BEACON) {
99 if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
100 handle_mds_beacon(static_cast<MMDSBeacon*>(m));
101 }
102 return true;
103 }
104
105 return false;
106 }
107
108
109 /**
110 * Update lagginess state based on response from remote MDSMonitor
111 *
112 * This function puts the passed message before returning
113 */
114 void Beacon::handle_mds_beacon(MMDSBeacon *m)
115 {
116 Mutex::Locker l(lock);
117 assert(m != NULL);
118
119 version_t seq = m->get_seq();
120
121 // update lab
122 if (seq_stamp.count(seq)) {
123 utime_t now = ceph_clock_now();
124 if (seq_stamp[seq] > last_acked_stamp) {
125 last_acked_stamp = seq_stamp[seq];
126 utime_t rtt = now - last_acked_stamp;
127
128 dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
129 << " seq " << m->get_seq() << " rtt " << rtt << dendl;
130
131 if (was_laggy && rtt < g_conf->mds_beacon_grace) {
132 dout(0) << "handle_mds_beacon no longer laggy" << dendl;
133 was_laggy = false;
134 laggy_until = now;
135 }
136 } else {
137 // Mark myself laggy if system clock goes backwards. Hopping
138 // later beacons will clear it.
139 dout(1) << "handle_mds_beacon system clock goes backwards, "
140 << "mark myself laggy" << dendl;
141 last_acked_stamp = now - utime_t(g_conf->mds_beacon_grace + 1, 0);
142 was_laggy = true;
143 }
144
145 // clean up seq_stamp map
146 while (!seq_stamp.empty() &&
147 seq_stamp.begin()->first <= seq)
148 seq_stamp.erase(seq_stamp.begin());
149
150 // Wake a waiter up if present
151 if (awaiting_seq == seq) {
152 waiting_cond.Signal();
153 }
154 } else {
155 dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
156 << " seq " << m->get_seq() << " dne" << dendl;
157 }
158 }
159
160
161 void Beacon::send()
162 {
163 Mutex::Locker l(lock);
164 _send();
165 }
166
167
168 void Beacon::send_and_wait(const double duration)
169 {
170 Mutex::Locker l(lock);
171 _send();
172 awaiting_seq = last_seq;
173 dout(20) << __func__ << ": awaiting " << awaiting_seq
174 << " for up to " << duration << "s" << dendl;
175
176 utime_t timeout;
177 timeout.set_from_double(ceph_clock_now() + duration);
178 while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
179 && ceph_clock_now() < timeout) {
180 waiting_cond.WaitUntil(lock, timeout);
181 }
182
183 awaiting_seq = -1;
184 }
185
186
187 /**
188 * Call periodically, or when you have updated the desired state
189 */
190 void Beacon::_send()
191 {
192 if (sender) {
193 timer.cancel_event(sender);
194 }
195 sender = new C_MDS_BeaconSender(this);
196 timer.add_event_after(g_conf->mds_beacon_interval, sender);
197
198 if (!cct->get_heartbeat_map()->is_healthy()) {
199 /* If anything isn't progressing, let avoid sending a beacon so that
200 * the MDS will consider us laggy */
201 dout(1) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl;
202 return;
203 }
204
205 ++last_seq;
206 dout(10) << __func__ << " " << ceph_mds_state_name(want_state)
207 << " seq " << last_seq
208 << dendl;
209
210 seq_stamp[last_seq] = ceph_clock_now();
211
212 assert(want_state != MDSMap::STATE_NULL);
213
214 MMDSBeacon *beacon = new MMDSBeacon(
215 monc->get_fsid(), mds_gid_t(monc->get_global_id()),
216 name,
217 epoch,
218 want_state,
219 last_seq,
220 CEPH_FEATURES_SUPPORTED_DEFAULT);
221
222 beacon->set_standby_for_rank(standby_for_rank);
223 beacon->set_standby_for_name(standby_for_name);
224 beacon->set_standby_for_fscid(standby_for_fscid);
225 beacon->set_standby_replay(standby_replay);
226 beacon->set_health(health);
227 beacon->set_compat(compat);
228 // piggyback the sys info on beacon msg
229 if (want_state == MDSMap::STATE_BOOT) {
230 map<string, string> sys_info;
231 collect_sys_info(&sys_info, cct);
232 sys_info["addr"] = stringify(monc->get_myaddr());
233 beacon->set_sys_info(sys_info);
234 }
235 monc->send_mon_message(beacon);
236 }
237
238 /**
239 * Call this when there is a new MDSMap available
240 */
241 void Beacon::notify_mdsmap(MDSMap const *mdsmap)
242 {
243 Mutex::Locker l(lock);
244 assert(mdsmap != NULL);
245
246 _notify_mdsmap(mdsmap);
247 }
248
249 void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
250 {
251 assert(mdsmap != NULL);
252 assert(mdsmap->get_epoch() >= epoch);
253
254 if (mdsmap->get_epoch() != epoch) {
255 epoch = mdsmap->get_epoch();
256 compat = get_mdsmap_compat_set_default();
257 compat.merge(mdsmap->compat);
258 }
259 }
260
261
262 bool Beacon::is_laggy()
263 {
264 Mutex::Locker l(lock);
265
266 if (last_acked_stamp == utime_t())
267 return false;
268
269 utime_t now = ceph_clock_now();
270 utime_t since = now - last_acked_stamp;
271 if (since > g_conf->mds_beacon_grace) {
272 dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
273 << " since last acked beacon" << dendl;
274 was_laggy = true;
275 if (since > (g_conf->mds_beacon_grace*2) &&
276 now > last_mon_reconnect + g_conf->mds_beacon_interval) {
277 // maybe it's not us?
278 dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
279 << dendl;
280 last_mon_reconnect = now;
281 monc->reopen_session();
282 }
283 return true;
284 }
285 return false;
286 }
287
288 utime_t Beacon::get_laggy_until() const
289 {
290 Mutex::Locker l(lock);
291
292 return laggy_until;
293 }
294
295 void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate)
296 {
297 Mutex::Locker l(lock);
298
299 // Update mdsmap epoch atomically with updating want_state, so that when
300 // we send a beacon with the new want state it has the latest epoch, and
301 // once we have updated to the latest epoch, we are not sending out
302 // a stale want_state (i.e. one from before making it through MDSMap
303 // handling)
304 _notify_mdsmap(mdsmap);
305
306 if (want_state != newstate) {
307 dout(10) << __func__ << ": "
308 << ceph_mds_state_name(want_state) << " -> "
309 << ceph_mds_state_name(newstate) << dendl;
310 want_state = newstate;
311 }
312 }
313
314
315 /**
316 * We are 'shown' an MDS briefly in order to update
317 * some health metrics that we will send in the next
318 * beacon.
319 */
320 void Beacon::notify_health(MDSRank const *mds)
321 {
322 Mutex::Locker l(lock);
323 if (!mds) {
324 // No MDS rank held
325 return;
326 }
327
328 // I'm going to touch this MDS, so it must be locked
329 assert(mds->mds_lock.is_locked_by_me());
330
331 health.metrics.clear();
332
333 // Detect presence of entries in DamageTable
334 if (!mds->damage_table.empty()) {
335 MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string(
336 "Metadata damage detected"));
337 health.metrics.push_back(m);
338 }
339
340 // Detect MDS_HEALTH_TRIM condition
341 // Arbitrary factor of 2, indicates MDS is not trimming promptly
342 {
343 if (mds->mdlog->get_num_segments() > (size_t)(g_conf->mds_log_max_segments * 2)) {
344 std::ostringstream oss;
345 oss << "Behind on trimming (" << mds->mdlog->get_num_segments()
346 << "/" << g_conf->mds_log_max_segments << ")";
347
348 MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, oss.str());
349 m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments());
350 m.metadata["max_segments"] = stringify(g_conf->mds_log_max_segments);
351 health.metrics.push_back(m);
352 }
353 }
354
355 // Detect clients failing to respond to modifications to capabilities in
356 // CLIENT_CAPS messages.
357 {
358 std::list<client_t> late_clients;
359 mds->locker->get_late_revoking_clients(&late_clients);
360 std::list<MDSHealthMetric> late_cap_metrics;
361
362 for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) {
363
364 // client_t is equivalent to session.info.inst.name.num
365 // Construct an entity_name_t to lookup into SessionMap
366 entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, i->v);
367 Session const *s = mds->sessionmap.get_session(ename);
368 if (s == NULL) {
369 // Shouldn't happen, but not worth crashing if it does as this is
370 // just health-reporting code.
371 derr << "Client ID without session: " << i->v << dendl;
372 continue;
373 }
374
375 std::ostringstream oss;
376 oss << "Client " << s->get_human_name() << " failing to respond to capability release";
377 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, oss.str());
378 m.metadata["client_id"] = stringify(i->v);
379 late_cap_metrics.push_back(m);
380 }
381
382 if (late_cap_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
383 health.metrics.splice(health.metrics.end(), late_cap_metrics);
384 } else {
385 std::ostringstream oss;
386 oss << "Many clients (" << late_cap_metrics.size()
387 << ") failing to respond to capability release";
388 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, oss.str());
389 m.metadata["client_count"] = stringify(late_cap_metrics.size());
390 health.metrics.push_back(m);
391 late_cap_metrics.clear();
392 }
393 }
394
395 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
396 // messages. May be due to buggy client or resource-hogging application.
397 //
398 // Detect clients failing to advance their old_client_tid
399 {
400 set<Session*> sessions;
401 mds->sessionmap.get_client_session_set(sessions);
402
403 utime_t cutoff = ceph_clock_now();
404 cutoff -= g_conf->mds_recall_state_timeout;
405 utime_t last_recall = mds->mdcache->last_recall_state;
406
407 std::list<MDSHealthMetric> late_recall_metrics;
408 std::list<MDSHealthMetric> large_completed_requests_metrics;
409 for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) {
410 Session *session = *i;
411 if (!session->recalled_at.is_zero()) {
412 dout(20) << "Session servicing RECALL " << session->info.inst
413 << ": " << session->recalled_at << " " << session->recall_release_count
414 << "/" << session->recall_count << dendl;
415 if (last_recall < cutoff || session->last_recall_sent < last_recall) {
416 dout(20) << " no longer recall" << dendl;
417 session->clear_recalled_at();
418 } else if (session->recalled_at < cutoff) {
419 dout(20) << " exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl;
420 std::ostringstream oss;
421 oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
422 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
423 m.metadata["client_id"] = stringify(session->info.inst.name.num());
424 late_recall_metrics.push_back(m);
425 } else {
426 dout(20) << " within timeout " << session->recalled_at << " vs. " << cutoff << dendl;
427 }
428 }
429 if ((session->get_num_trim_requests_warnings() > 0 &&
430 session->get_num_completed_requests() >= g_conf->mds_max_completed_requests) ||
431 (session->get_num_trim_flushes_warnings() > 0 &&
432 session->get_num_completed_flushes() >= g_conf->mds_max_completed_flushes)) {
433 std::ostringstream oss;
434 oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid";
435 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str());
436 m.metadata["client_id"] = stringify(session->info.inst.name.num());
437 large_completed_requests_metrics.push_back(m);
438 }
439 }
440
441 if (late_recall_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
442 health.metrics.splice(health.metrics.end(), late_recall_metrics);
443 } else {
444 std::ostringstream oss;
445 oss << "Many clients (" << late_recall_metrics.size()
446 << ") failing to respond to cache pressure";
447 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, oss.str());
448 m.metadata["client_count"] = stringify(late_recall_metrics.size());
449 health.metrics.push_back(m);
450 late_recall_metrics.clear();
451 }
452
453 if (large_completed_requests_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
454 health.metrics.splice(health.metrics.end(), large_completed_requests_metrics);
455 } else {
456 std::ostringstream oss;
457 oss << "Many clients (" << large_completed_requests_metrics.size()
458 << ") failing to advance their oldest client/flush tid";
459 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, oss.str());
460 m.metadata["client_count"] = stringify(large_completed_requests_metrics.size());
461 health.metrics.push_back(m);
462 large_completed_requests_metrics.clear();
463 }
464 }
465
466 // Detect MDS_HEALTH_SLOW_REQUEST condition
467 {
468 int slow = mds->get_mds_slow_req_count();
469 dout(20) << slow << " slow request found" << dendl;
470 if (slow) {
471 std::ostringstream oss;
472 oss << slow << " slow requests are blocked > " << g_conf->mds_op_complaint_time << " sec";
473
474 MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, oss.str());
475 health.metrics.push_back(m);
476 }
477 }
478
479 // Report a health warning if we are readonly
480 if (mds->mdcache->is_readonly()) {
481 MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
482 "MDS in read-only mode");
483 health.metrics.push_back(m);
484 }
485
486 // Report if we have significantly exceeded our cache size limit
487 if (mds->mdcache->cache_overfull()) {
488 std::ostringstream oss;
489 oss << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size())
490 << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); "
491 << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, "
492 << mds->mdcache->get_num_strays() << " stray files";
493
494 MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, oss.str());
495 health.metrics.push_back(m);
496 }
497 }
498
499 MDSMap::DaemonState Beacon::get_want_state() const
500 {
501 Mutex::Locker l(lock);
502 return want_state;
503 }
504