]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Beacon.cc
update sources to 12.2.10
[ceph.git] / ceph / src / mds / Beacon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
18
19 #include "include/stringify.h"
20 #include "include/util.h"
21
22 #include "messages/MMDSBeacon.h"
23 #include "mon/MonClient.h"
24 #include "mds/MDLog.h"
25 #include "mds/MDSRank.h"
26 #include "mds/MDSMap.h"
27 #include "mds/Locker.h"
28
29 #include "Beacon.h"
30
31 #include <chrono>
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mds
35 #undef dout_prefix
36 #define dout_prefix *_dout << "mds.beacon." << name << ' '
37
38 Beacon::Beacon(CephContext *cct, MonClient *monc, boost::string_view name)
39 :
40 Dispatcher(cct),
41 beacon_interval(g_conf->mds_beacon_interval),
42 monc(monc),
43 name(name)
44 {
45 }
46
47 Beacon::~Beacon()
48 {
49 shutdown();
50 }
51
52 void Beacon::shutdown()
53 {
54 std::unique_lock<std::mutex> lock(mutex);
55 if (!finished) {
56 finished = true;
57 lock.unlock();
58 sender.join();
59 }
60 }
61
62 void Beacon::init(const MDSMap* mdsmap)
63 {
64 std::unique_lock<std::mutex> lock(mutex);
65 assert(mdsmap != NULL);
66
67 _notify_mdsmap(mdsmap);
68 standby_for_rank = mds_rank_t(g_conf->mds_standby_for_rank);
69 standby_for_name = g_conf->mds_standby_for_name;
70 standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
71 standby_replay = g_conf->mds_standby_replay;
72
73 sender = std::thread([this]() {
74 std::unique_lock<std::mutex> lock(mutex);
75 std::condition_variable c; // no one wakes us
76 while (!finished) {
77 auto now = clock::now();
78 auto since = std::chrono::duration<double>(now-last_send).count();
79 auto interval = beacon_interval;
80 if (since >= interval*.90) {
81 _send();
82 } else {
83 interval -= since;
84 }
85 dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
86 c.wait_for(lock, interval*std::chrono::seconds(1));
87 }
88 });
89 }
90
91 bool Beacon::ms_can_fast_dispatch(const Message *m) const
92 {
93 return m->get_type() == MSG_MDS_BEACON;
94 }
95
96 void Beacon::ms_fast_dispatch(Message *m)
97 {
98 bool handled = ms_dispatch(m);
99 assert(handled);
100 }
101
102 bool Beacon::ms_dispatch(Message *m)
103 {
104 if (m->get_type() == MSG_MDS_BEACON) {
105 if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
106 handle_mds_beacon(static_cast<MMDSBeacon*>(m));
107 } else {
108 m->put();
109 }
110 return true;
111 }
112
113 return false;
114 }
115
116
117 /**
118 * Update lagginess state based on response from remote MDSMonitor
119 *
120 * This function puts the passed message before returning
121 */
122 void Beacon::handle_mds_beacon(MMDSBeacon *m)
123 {
124 std::unique_lock<std::mutex> lock(mutex);
125 assert(m != NULL);
126
127 version_t seq = m->get_seq();
128
129 // update lab
130 auto it = seq_stamp.find(seq);
131 if (it != seq_stamp.end()) {
132 auto now = clock::now();
133
134 last_acked_stamp = it->second;
135 auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();
136
137 dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;
138
139 if (laggy && rtt < g_conf->mds_beacon_grace) {
140 dout(0) << " MDS is no longer laggy" << dendl;
141 laggy = false;
142 last_laggy = now;
143 }
144
145 // clean up seq_stamp map
146 seq_stamp.erase(seq_stamp.begin(), ++it);
147
148 // Wake a waiter up if present
149 cvar.notify_all();
150 } else {
151 dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
152 << " seq " << m->get_seq() << " dne" << dendl;
153 }
154 m->put();
155 }
156
157
158 void Beacon::send()
159 {
160 std::unique_lock<std::mutex> lock(mutex);
161 _send();
162 }
163
164
165 void Beacon::send_and_wait(const double duration)
166 {
167 std::unique_lock<std::mutex> lock(mutex);
168 _send();
169 auto awaiting_seq = last_seq;
170 dout(20) << __func__ << ": awaiting " << awaiting_seq
171 << " for up to " << duration << "s" << dendl;
172
173 auto start = clock::now();
174 while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
175 auto now = clock::now();
176 auto s = duration*.95-std::chrono::duration<double>(now-start).count();
177 if (s < 0) break;
178 cvar.wait_for(lock, s*std::chrono::seconds(1));
179 }
180 }
181
182
183 /**
184 * Call periodically, or when you have updated the desired state
185 */
186 void Beacon::_send()
187 {
188 auto now = clock::now();
189 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
190
191 if (!cct->get_heartbeat_map()->is_healthy()) {
192 /* If anything isn't progressing, let avoid sending a beacon so that
193 * the MDS will consider us laggy */
194 dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
195 return;
196 }
197
198 ++last_seq;
199 dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;
200
201 seq_stamp[last_seq] = now;
202
203 assert(want_state != MDSMap::STATE_NULL);
204
205 MMDSBeacon *beacon = new MMDSBeacon(
206 monc->get_fsid(), mds_gid_t(monc->get_global_id()),
207 name,
208 epoch,
209 want_state,
210 last_seq,
211 CEPH_FEATURES_SUPPORTED_DEFAULT);
212
213 beacon->set_standby_for_rank(standby_for_rank);
214 beacon->set_standby_for_name(standby_for_name);
215 beacon->set_standby_for_fscid(standby_for_fscid);
216 beacon->set_standby_replay(standby_replay);
217 beacon->set_health(health);
218 beacon->set_compat(compat);
219 // piggyback the sys info on beacon msg
220 if (want_state == MDSMap::STATE_BOOT) {
221 map<string, string> sys_info;
222 collect_sys_info(&sys_info, cct);
223 sys_info["addr"] = stringify(monc->get_myaddr());
224 beacon->set_sys_info(sys_info);
225 }
226 monc->send_mon_message(beacon);
227 last_send = now;
228 }
229
230 /**
231 * Call this when there is a new MDSMap available
232 */
233 void Beacon::notify_mdsmap(MDSMap const *mdsmap)
234 {
235 std::unique_lock<std::mutex> lock(mutex);
236 assert(mdsmap != NULL);
237
238 _notify_mdsmap(mdsmap);
239 }
240
241 void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
242 {
243 assert(mdsmap != NULL);
244 assert(mdsmap->get_epoch() >= epoch);
245
246 if (mdsmap->get_epoch() != epoch) {
247 epoch = mdsmap->get_epoch();
248 compat = MDSMap::get_compat_set_default();
249 compat.merge(mdsmap->compat);
250 }
251 }
252
253
254 bool Beacon::is_laggy()
255 {
256 std::unique_lock<std::mutex> lock(mutex);
257
258 auto now = clock::now();
259 auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
260 if (since > g_conf->mds_beacon_grace) {
261 if (!laggy) {
262 dout(1) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
263 << " since last acked beacon" << dendl;
264 }
265 laggy = true;
266 auto last_reconnect = std::chrono::duration<double>(now-last_mon_reconnect).count();
267 if (since > (g_conf->mds_beacon_grace*2) && last_reconnect > g_conf->mds_beacon_interval) {
268 // maybe it's not us?
269 dout(1) << "initiating monitor reconnect; maybe we're not the slow one"
270 << dendl;
271 last_mon_reconnect = now;
272 monc->reopen_session();
273 }
274 return true;
275 }
276 return false;
277 }
278
279 void Beacon::set_want_state(const MDSMap* mdsmap, MDSMap::DaemonState const newstate)
280 {
281 std::unique_lock<std::mutex> lock(mutex);
282
283 // Update mdsmap epoch atomically with updating want_state, so that when
284 // we send a beacon with the new want state it has the latest epoch, and
285 // once we have updated to the latest epoch, we are not sending out
286 // a stale want_state (i.e. one from before making it through MDSMap
287 // handling)
288 _notify_mdsmap(mdsmap);
289
290 if (want_state != newstate) {
291 dout(5) << __func__ << ": "
292 << ceph_mds_state_name(want_state) << " -> "
293 << ceph_mds_state_name(newstate) << dendl;
294 want_state = newstate;
295 }
296 }
297
298
299 /**
300 * We are 'shown' an MDS briefly in order to update
301 * some health metrics that we will send in the next
302 * beacon.
303 */
304 void Beacon::notify_health(MDSRank const *mds)
305 {
306 std::unique_lock<std::mutex> lock(mutex);
307 if (!mds) {
308 // No MDS rank held
309 return;
310 }
311
312 // I'm going to touch this MDS, so it must be locked
313 assert(mds->mds_lock.is_locked_by_me());
314
315 health.metrics.clear();
316
317 // Detect presence of entries in DamageTable
318 if (!mds->damage_table.empty()) {
319 MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string(
320 "Metadata damage detected"));
321 health.metrics.push_back(m);
322 }
323
324 // Detect MDS_HEALTH_TRIM condition
325 // Arbitrary factor of 2, indicates MDS is not trimming promptly
326 {
327 if (mds->mdlog->get_num_segments() > (size_t)(g_conf->mds_log_max_segments * 2)) {
328 std::ostringstream oss;
329 oss << "Behind on trimming (" << mds->mdlog->get_num_segments()
330 << "/" << g_conf->mds_log_max_segments << ")";
331
332 MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, oss.str());
333 m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments());
334 m.metadata["max_segments"] = stringify(g_conf->mds_log_max_segments);
335 health.metrics.push_back(m);
336 }
337 }
338
339 // Detect clients failing to respond to modifications to capabilities in
340 // CLIENT_CAPS messages.
341 {
342 std::list<client_t> late_clients;
343 mds->locker->get_late_revoking_clients(&late_clients,
344 mds->mdsmap->get_session_timeout());
345 std::list<MDSHealthMetric> late_cap_metrics;
346
347 for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) {
348
349 // client_t is equivalent to session.info.inst.name.num
350 // Construct an entity_name_t to lookup into SessionMap
351 entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, i->v);
352 Session const *s = mds->sessionmap.get_session(ename);
353 if (s == NULL) {
354 // Shouldn't happen, but not worth crashing if it does as this is
355 // just health-reporting code.
356 derr << "Client ID without session: " << i->v << dendl;
357 continue;
358 }
359
360 std::ostringstream oss;
361 oss << "Client " << s->get_human_name() << " failing to respond to capability release";
362 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, oss.str());
363 m.metadata["client_id"] = stringify(i->v);
364 late_cap_metrics.push_back(m);
365 }
366
367 if (late_cap_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
368 health.metrics.splice(health.metrics.end(), late_cap_metrics);
369 } else {
370 std::ostringstream oss;
371 oss << "Many clients (" << late_cap_metrics.size()
372 << ") failing to respond to capability release";
373 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, oss.str());
374 m.metadata["client_count"] = stringify(late_cap_metrics.size());
375 health.metrics.push_back(m);
376 late_cap_metrics.clear();
377 }
378 }
379
380 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
381 // messages. May be due to buggy client or resource-hogging application.
382 //
383 // Detect clients failing to advance their old_client_tid
384 {
385 set<Session*> sessions;
386 mds->sessionmap.get_client_session_set(sessions);
387
388 auto mds_recall_state_timeout = g_conf->mds_recall_state_timeout;
389 auto last_recall = mds->mdcache->last_recall_state;
390 auto last_recall_span = std::chrono::duration<double>(clock::now()-last_recall).count();
391 bool recall_state_timedout = last_recall_span > mds_recall_state_timeout;
392
393 std::list<MDSHealthMetric> late_recall_metrics;
394 std::list<MDSHealthMetric> large_completed_requests_metrics;
395 for (auto& session : sessions) {
396 if (session->recalled_at != Session::time::min()) {
397 auto last_recall_sent = session->last_recall_sent;
398 auto recalled_at = session->recalled_at;
399 auto recalled_at_span = std::chrono::duration<double>(clock::now()-recalled_at).count();
400
401 dout(20) << "Session servicing RECALL " << session->info.inst
402 << ": " << recalled_at_span << "s ago " << session->recall_release_count
403 << "/" << session->recall_count << dendl;
404 if (recall_state_timedout || last_recall_sent < last_recall) {
405 dout(20) << " no longer recall" << dendl;
406 session->clear_recalled_at();
407 } else if (recalled_at_span > mds_recall_state_timeout) {
408 dout(20) << " exceeded timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
409 std::ostringstream oss;
410 oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
411 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
412 m.metadata["client_id"] = stringify(session->info.inst.name.num());
413 late_recall_metrics.push_back(m);
414 } else {
415 dout(20) << " within timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
416 }
417 }
418 if ((session->get_num_trim_requests_warnings() > 0 &&
419 session->get_num_completed_requests() >= g_conf->mds_max_completed_requests) ||
420 (session->get_num_trim_flushes_warnings() > 0 &&
421 session->get_num_completed_flushes() >= g_conf->mds_max_completed_flushes)) {
422 std::ostringstream oss;
423 oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid";
424 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str());
425 m.metadata["client_id"] = stringify(session->info.inst.name.num());
426 large_completed_requests_metrics.push_back(m);
427 }
428 }
429
430 if (late_recall_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
431 health.metrics.splice(health.metrics.end(), late_recall_metrics);
432 } else {
433 std::ostringstream oss;
434 oss << "Many clients (" << late_recall_metrics.size()
435 << ") failing to respond to cache pressure";
436 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, oss.str());
437 m.metadata["client_count"] = stringify(late_recall_metrics.size());
438 health.metrics.push_back(m);
439 late_recall_metrics.clear();
440 }
441
442 if (large_completed_requests_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
443 health.metrics.splice(health.metrics.end(), large_completed_requests_metrics);
444 } else {
445 std::ostringstream oss;
446 oss << "Many clients (" << large_completed_requests_metrics.size()
447 << ") failing to advance their oldest client/flush tid";
448 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, oss.str());
449 m.metadata["client_count"] = stringify(large_completed_requests_metrics.size());
450 health.metrics.push_back(m);
451 large_completed_requests_metrics.clear();
452 }
453 }
454
455 // Detect MDS_HEALTH_SLOW_REQUEST condition
456 {
457 int slow = mds->get_mds_slow_req_count();
458 if (slow) {
459 dout(20) << slow << " slow request found" << dendl;
460 std::ostringstream oss;
461 oss << slow << " slow requests are blocked > " << g_conf->mds_op_complaint_time << " sec";
462
463 MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, oss.str());
464 health.metrics.push_back(m);
465 }
466 }
467
468 {
469 auto complaint_time = g_conf->osd_op_complaint_time;
470 auto now = clock::now();
471 auto cutoff = now - ceph::make_timespan(complaint_time);
472
473 std::string count;
474 ceph::coarse_mono_time oldest;
475 if (MDSIOContextBase::check_ios_in_flight(cutoff, count, oldest)) {
476 dout(20) << count << " slow metadata IOs found" << dendl;
477
478 auto oldest_secs = std::chrono::duration<double>(now - oldest).count();
479 std::ostringstream oss;
480 oss << count << " slow metadata IOs are blocked > " << complaint_time
481 << " secs, oldest blocked for " << (int64_t)oldest_secs << " secs";
482
483 MDSHealthMetric m(MDS_HEALTH_SLOW_METADATA_IO, HEALTH_WARN, oss.str());
484 health.metrics.push_back(m);
485 }
486 }
487
488 // Report a health warning if we are readonly
489 if (mds->mdcache->is_readonly()) {
490 MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
491 "MDS in read-only mode");
492 health.metrics.push_back(m);
493 }
494
495 // Report if we have significantly exceeded our cache size limit
496 if (mds->mdcache->cache_overfull()) {
497 std::ostringstream oss;
498 oss << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size())
499 << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); "
500 << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, "
501 << mds->mdcache->get_num_strays() << " stray files";
502
503 MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, oss.str());
504 health.metrics.push_back(m);
505 }
506 }
507
508 MDSMap::DaemonState Beacon::get_want_state() const
509 {
510 std::unique_lock<std::mutex> lock(mutex);
511 return want_state;
512 }
513