]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MetricAggregator.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / mds / MetricAggregator.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/range/adaptor/map.hpp>
5 #include <boost/range/algorithm/copy.hpp>
6
7 #include "MDSRank.h"
8 #include "MetricAggregator.h"
9 #include "mgr/MgrClient.h"
10
11 #define dout_context g_ceph_context
12 #define dout_subsys ceph_subsys_mds
13 #undef dout_prefix
14 #define dout_prefix *_dout << "mds.metric.aggregator" << " " << __func__
15
16 MetricAggregator::MetricAggregator(CephContext *cct, MDSRank *mds, MgrClient *mgrc)
17 : Dispatcher(cct),
18 mds(mds),
19 mgrc(mgrc),
20 mds_pinger(mds) {
21 }
22
23 void MetricAggregator::ping_all_active_ranks() {
24 dout(10) << ": pinging " << active_rank_addrs.size() << " active mds(s)" << dendl;
25
26 for (const auto &[rank, addr] : active_rank_addrs) {
27 dout(20) << ": pinging rank=" << rank << " addr=" << addr << dendl;
28 mds_pinger.send_ping(rank, addr);
29 }
30 }
31
32 int MetricAggregator::init() {
33 dout(10) << dendl;
34
35 pinger = std::thread([this]() {
36 std::unique_lock locker(lock);
37 while (!stopping) {
38 ping_all_active_ranks();
39 locker.unlock();
40 double timo = g_conf().get_val<std::chrono::seconds>("mds_ping_interval").count();
41 sleep(timo);
42 locker.lock();
43 }
44 });
45
46 mgrc->set_perf_metric_query_cb(
47 [this](const ConfigPayload &config_payload) {
48 set_perf_queries(config_payload);
49 },
50 [this]() {
51 return get_perf_reports();
52 });
53
54 return 0;
55 }
56
57 void MetricAggregator::shutdown() {
58 dout(10) << dendl;
59
60 {
61 std::scoped_lock locker(lock);
62 ceph_assert(!stopping);
63 stopping = true;
64 }
65
66 if (pinger.joinable()) {
67 pinger.join();
68 }
69 }
70
71 bool MetricAggregator::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
72 return m->get_type() == MSG_MDS_METRICS;
73 }
74
75 void MetricAggregator::ms_fast_dispatch2(const ref_t<Message> &m) {
76 bool handled = ms_dispatch2(m);
77 ceph_assert(handled);
78 }
79
80 bool MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
81 if (m->get_type() == MSG_MDS_METRICS &&
82 m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
83 const Message *msg = m.get();
84 const MMDSOp *op = dynamic_cast<const MMDSOp*>(msg);
85 if (!op)
86 dout(0) << typeid(*msg).name() << " is not an MMDSOp type" << dendl;
87 ceph_assert(op);
88 handle_mds_metrics(ref_cast<MMDSMetrics>(m));
89 return true;
90 }
91 return false;
92 }
93
94 void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client,
95 mds_rank_t rank, const Metrics &metrics) {
96 dout(20) << ": client=" << client << ", rank=" << rank << ", metrics="
97 << metrics << dendl;
98
99 auto &p = clients_by_rank.at(rank);
100 bool ins = p.insert(client).second;
101 if (ins) {
102 dout(20) << ": rank=" << rank << " has " << p.size() << " connected"
103 << " client(s)" << dendl;
104 }
105
106 auto update_counter_func = [&metrics](const MDSPerformanceCounterDescriptor &d,
107 PerformanceCounter *c) {
108 ceph_assert(d.is_supported());
109
110 dout(20) << ": performance_counter_descriptor=" << d << dendl;
111
112 switch (d.type) {
113 case MDSPerformanceCounterType::CAP_HIT_METRIC:
114 c->first = metrics.cap_hit_metric.hits;
115 c->second = metrics.cap_hit_metric.misses;
116 break;
117 case MDSPerformanceCounterType::READ_LATENCY_METRIC:
118 if (metrics.read_latency_metric.updated) {
119 c->first = metrics.read_latency_metric.lat.tv.tv_sec;
120 c->second = metrics.read_latency_metric.lat.tv.tv_nsec;
121 }
122 break;
123 case MDSPerformanceCounterType::WRITE_LATENCY_METRIC:
124 if (metrics.write_latency_metric.updated) {
125 c->first = metrics.write_latency_metric.lat.tv.tv_sec;
126 c->second = metrics.write_latency_metric.lat.tv.tv_nsec;
127 }
128 break;
129 case MDSPerformanceCounterType::METADATA_LATENCY_METRIC:
130 if (metrics.metadata_latency_metric.updated) {
131 c->first = metrics.metadata_latency_metric.lat.tv.tv_sec;
132 c->second = metrics.metadata_latency_metric.lat.tv.tv_nsec;
133 }
134 break;
135 case MDSPerformanceCounterType::DENTRY_LEASE_METRIC:
136 if (metrics.dentry_lease_metric.updated) {
137 c->first = metrics.dentry_lease_metric.hits;
138 c->second = metrics.dentry_lease_metric.misses;
139 }
140 break;
141 case MDSPerformanceCounterType::OPENED_FILES_METRIC:
142 if (metrics.opened_files_metric.updated) {
143 c->first = metrics.opened_files_metric.opened_files;
144 c->second = metrics.opened_files_metric.total_inodes;
145 }
146 break;
147 case MDSPerformanceCounterType::PINNED_ICAPS_METRIC:
148 if (metrics.pinned_icaps_metric.updated) {
149 c->first = metrics.pinned_icaps_metric.pinned_icaps;
150 c->second = metrics.pinned_icaps_metric.total_inodes;
151 }
152 break;
153 case MDSPerformanceCounterType::OPENED_INODES_METRIC:
154 if (metrics.opened_inodes_metric.updated) {
155 c->first = metrics.opened_inodes_metric.opened_inodes;
156 c->second = metrics.opened_inodes_metric.total_inodes;
157 }
158 break;
159 case MDSPerformanceCounterType::READ_IO_SIZES_METRIC:
160 if (metrics.read_io_sizes_metric.updated) {
161 c->first = metrics.read_io_sizes_metric.total_ops;
162 c->second = metrics.read_io_sizes_metric.total_size;
163 }
164 break;
165 case MDSPerformanceCounterType::WRITE_IO_SIZES_METRIC:
166 if (metrics.write_io_sizes_metric.updated) {
167 c->first = metrics.write_io_sizes_metric.total_ops;
168 c->second = metrics.write_io_sizes_metric.total_size;
169 }
170 break;
171 case MDSPerformanceCounterType::AVG_READ_LATENCY_METRIC:
172 if (metrics.read_latency_metric.updated) {
173 c->first = metrics.read_latency_metric.mean.tv.tv_sec;
174 c->second = metrics.read_latency_metric.mean.tv.tv_nsec;
175 }
176 break;
177 case MDSPerformanceCounterType::STDEV_READ_LATENCY_METRIC:
178 if (metrics.read_latency_metric.updated) {
179 c->first = metrics.read_latency_metric.sq_sum;
180 c->second = metrics.read_latency_metric.count;
181 }
182 break;
183 case MDSPerformanceCounterType::AVG_WRITE_LATENCY_METRIC:
184 if (metrics.write_latency_metric.updated) {
185 c->first = metrics.write_latency_metric.mean.tv.tv_sec;
186 c->second = metrics.write_latency_metric.mean.tv.tv_nsec;
187 }
188 break;
189 case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC:
190 if (metrics.write_latency_metric.updated) {
191 c->first = metrics.write_latency_metric.sq_sum;
192 c->second = metrics.write_latency_metric.count;
193 }
194 break;
195 case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC:
196 if (metrics.metadata_latency_metric.updated) {
197 c->first = metrics.metadata_latency_metric.mean.tv.tv_sec;
198 c->second = metrics.metadata_latency_metric.mean.tv.tv_nsec;
199 }
200 break;
201 case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC:
202 if (metrics.metadata_latency_metric.updated) {
203 c->first = metrics.metadata_latency_metric.sq_sum;
204 c->second = metrics.metadata_latency_metric.count;
205 }
206 break;
207 default:
208 ceph_abort_msg("unknown counter type");
209 }
210 };
211
212 auto sub_key_func = [client, rank](const MDSPerfMetricSubKeyDescriptor &d,
213 MDSPerfMetricSubKey *sub_key) {
214 ceph_assert(d.is_supported());
215
216 dout(20) << ": sub_key_descriptor=" << d << dendl;
217
218 std::string match_string;
219 switch (d.type) {
220 case MDSPerfMetricSubKeyType::MDS_RANK:
221 match_string = stringify(rank);
222 break;
223 case MDSPerfMetricSubKeyType::CLIENT_ID:
224 match_string = stringify(client);
225 break;
226 default:
227 ceph_abort_msg("unknown counter type");
228 }
229
230 dout(20) << ": match_string=" << match_string << dendl;
231
232 std::smatch match;
233 if (!std::regex_search(match_string, match, d.regex)) {
234 return false;
235 }
236 if (match.size() <= 1) {
237 return false;
238 }
239 for (size_t i = 1; i < match.size(); ++i) {
240 sub_key->push_back(match[i].str());
241 }
242 return true;
243 };
244
245 for (auto& [query, perf_key_map] : query_metrics_map) {
246 MDSPerfMetricKey key;
247 if (query.get_key(sub_key_func, &key)) {
248 query.update_counters(update_counter_func, &perf_key_map[key]);
249 }
250 }
251 }
252
253 void MetricAggregator::remove_metrics_for_rank(const entity_inst_t &client,
254 mds_rank_t rank, bool remove) {
255 dout(20) << ": client=" << client << ", rank=" << rank << dendl;
256
257 if (remove) {
258 auto &p = clients_by_rank.at(rank);
259 bool rm = p.erase(client) != 0;
260 ceph_assert(rm);
261 dout(20) << ": rank=" << rank << " has " << p.size() << " connected"
262 << " client(s)" << dendl;
263 }
264
265 auto sub_key_func = [client, rank](const MDSPerfMetricSubKeyDescriptor &d,
266 MDSPerfMetricSubKey *sub_key) {
267 ceph_assert(d.is_supported());
268 dout(20) << ": sub_key_descriptor=" << d << dendl;
269
270 std::string match_string;
271 switch (d.type) {
272 case MDSPerfMetricSubKeyType::MDS_RANK:
273 match_string = stringify(rank);
274 break;
275 case MDSPerfMetricSubKeyType::CLIENT_ID:
276 match_string = stringify(client);
277 break;
278 default:
279 ceph_abort_msg("unknown counter type");
280 }
281
282 dout(20) << ": match_string=" << match_string << dendl;
283
284 std::smatch match;
285 if (!std::regex_search(match_string, match, d.regex)) {
286 return false;
287 }
288 if (match.size() <= 1) {
289 return false;
290 }
291 for (size_t i = 1; i < match.size(); ++i) {
292 sub_key->push_back(match[i].str());
293 }
294 return true;
295 };
296
297 for (auto& [query, perf_key_map] : query_metrics_map) {
298 MDSPerfMetricKey key;
299 if (query.get_key(sub_key_func, &key)) {
300 if (perf_key_map.erase(key)) {
301 dout(10) << ": removed metric for key=" << key << dendl;
302 }
303 }
304 }
305 }
306
307 void MetricAggregator::handle_mds_metrics(const cref_t<MMDSMetrics> &m) {
308 const metrics_message_t &metrics_message = m->metrics_message;
309
310 auto seq = metrics_message.seq;
311 auto rank = metrics_message.rank;
312 auto &client_metrics_map = metrics_message.client_metrics_map;
313
314 dout(20) << ": applying " << client_metrics_map.size() << " updates for rank="
315 << rank << " with sequence number " << seq << dendl;
316
317 std::scoped_lock locker(lock);
318 if (!mds_pinger.pong_received(rank, seq)) {
319 return;
320 }
321
322 for (auto& [client, metrics] : client_metrics_map) {
323 switch (metrics.update_type) {
324 case UpdateType::UPDATE_TYPE_REFRESH:
325 refresh_metrics_for_rank(client, rank, metrics);
326 break;
327 case UpdateType::UPDATE_TYPE_REMOVE:
328 remove_metrics_for_rank(client, rank, true);
329 break;
330 default:
331 ceph_abort();
332 }
333 }
334 }
335
336 void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) {
337 dout(20) << ": rank=" << rank << dendl;
338
339 auto &p = clients_by_rank.at(rank);
340 for (auto &client : p) {
341 remove_metrics_for_rank(client, rank, false);
342 }
343
344 dout(10) << ": culled " << p.size() << " clients" << dendl;
345 clients_by_rank.erase(rank);
346 }
347
348 void MetricAggregator::notify_mdsmap(const MDSMap &mdsmap) {
349 dout(10) << dendl;
350
351 std::scoped_lock locker(lock);
352 std::set<mds_rank_t> current_active;
353 mdsmap.get_active_mds_set(current_active);
354
355 std::set<mds_rank_t> active_set;
356 boost::copy(active_rank_addrs | boost::adaptors::map_keys,
357 std::inserter(active_set, active_set.begin()));
358
359 std::set<mds_rank_t> diff;
360 std::set_difference(active_set.begin(), active_set.end(),
361 current_active.begin(), current_active.end(),
362 std::inserter(diff, diff.end()));
363
364 for (auto &rank : diff) {
365 dout(10) << ": mds rank=" << rank << " removed from mdsmap" << dendl;
366 active_rank_addrs.erase(rank);
367 cull_metrics_for_rank(rank);
368 mds_pinger.reset_ping(rank);
369 }
370
371 diff.clear();
372 std::set_difference(current_active.begin(), current_active.end(),
373 active_set.begin(), active_set.end(),
374 std::inserter(diff, diff.end()));
375
376 for (auto &rank : diff) {
377 auto rank_addr = mdsmap.get_addrs(rank);
378 dout(10) << ": active rank=" << rank << " (mds." << mdsmap.get_mds_info(rank).name
379 << ") has addr=" << rank_addr << dendl;
380 active_rank_addrs.emplace(rank, rank_addr);
381 clients_by_rank.emplace(rank, std::unordered_set<entity_inst_t>{});
382 }
383
384 dout(10) << ": active set=[" << active_rank_addrs << "]" << dendl;
385 }
386
387 void MetricAggregator::set_perf_queries(const ConfigPayload &config_payload) {
388 const MDSConfigPayload &mds_config_payload = boost::get<MDSConfigPayload>(config_payload);
389 const std::map<MDSPerfMetricQuery, MDSPerfMetricLimits> &queries = mds_config_payload.config;
390
391 dout(10) << ": setting " << queries.size() << " queries" << dendl;
392
393 std::scoped_lock locker(lock);
394 std::map<MDSPerfMetricQuery, std::map<MDSPerfMetricKey, PerformanceCounters>> new_data;
395 for (auto &p : queries) {
396 std::swap(new_data[p.first], query_metrics_map[p.first]);
397 }
398 std::swap(query_metrics_map, new_data);
399 }
400
401 MetricPayload MetricAggregator::get_perf_reports() {
402 MDSMetricPayload payload;
403 MDSPerfMetricReport &metric_report = payload.metric_report;
404 std::map<MDSPerfMetricQuery, MDSPerfMetrics> &reports = metric_report.reports;
405
406 std::scoped_lock locker(lock);
407
408 for (auto& [query, counters] : query_metrics_map) {
409 auto &report = reports[query];
410
411 query.get_performance_counter_descriptors(&report.performance_counter_descriptors);
412
413 auto &descriptors = report.performance_counter_descriptors;
414
415 dout(20) << ": descriptors=" << descriptors << dendl;
416
417 for (auto &p : counters) {
418 dout(20) << ": packing perf_metric_key=" << p.first << ", perf_counter="
419 << p.second << dendl;
420 auto &bl = report.group_packed_performance_counters[p.first];
421 query.pack_counters(p.second, &bl);
422 }
423 }
424
425 // stash a copy of dealyed and failed ranks. mgr culls out metrics
426 // for failed ranks and tags metrics for delayed ranks as "stale".
427 for (auto &p : active_rank_addrs) {
428 auto rank = p.first;
429 if (mds_pinger.is_rank_lagging(rank)) {
430 metric_report.rank_metrics_delayed.insert(rank);
431 }
432 }
433
434 return payload;
435 }