]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MetricAggregator.cc
046e79269868e2b34791e1d1232eaec9e2ebc929
[ceph.git] / ceph / src / mds / MetricAggregator.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/range/adaptor/map.hpp>
5 #include <boost/range/algorithm/copy.hpp>
6
7 #include "MDSRank.h"
8 #include "MetricAggregator.h"
9 #include "mgr/MgrClient.h"
10
11 #define dout_context g_ceph_context
12 #define dout_subsys ceph_subsys_mds
13 #undef dout_prefix
14 #define dout_prefix *_dout << "mds.metric.aggregator" << " " << __func__
15
16 MetricAggregator::MetricAggregator(CephContext *cct, MDSRank *mds, MgrClient *mgrc)
17 : Dispatcher(cct),
18 mds(mds),
19 mgrc(mgrc),
20 mds_pinger(mds) {
21 }
22
23 void MetricAggregator::ping_all_active_ranks() {
24 dout(10) << ": pinging " << active_rank_addrs.size() << " active mds(s)" << dendl;
25
26 for (const auto &[rank, addr] : active_rank_addrs) {
27 dout(20) << ": pinging rank=" << rank << " addr=" << addr << dendl;
28 mds_pinger.send_ping(rank, addr);
29 }
30 }
31
32 int MetricAggregator::init() {
33 dout(10) << dendl;
34
35 pinger = std::thread([this]() {
36 std::unique_lock locker(lock);
37 while (!stopping) {
38 ping_all_active_ranks();
39 locker.unlock();
40 double timo = g_conf().get_val<std::chrono::seconds>("mds_ping_interval").count();
41 sleep(timo);
42 locker.lock();
43 }
44 });
45
46 mgrc->set_perf_metric_query_cb(
47 [this](const ConfigPayload &config_payload) {
48 set_perf_queries(config_payload);
49 },
50 [this]() {
51 return get_perf_reports();
52 });
53
54 return 0;
55 }
56
57 void MetricAggregator::shutdown() {
58 dout(10) << dendl;
59
60 {
61 std::scoped_lock locker(lock);
62 ceph_assert(!stopping);
63 stopping = true;
64 }
65
66 if (pinger.joinable()) {
67 pinger.join();
68 }
69 }
70
71 bool MetricAggregator::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
72 return m->get_type() == MSG_MDS_METRICS;
73 }
74
75 void MetricAggregator::ms_fast_dispatch2(const ref_t<Message> &m) {
76 bool handled = ms_dispatch2(m);
77 ceph_assert(handled);
78 }
79
80 bool MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
81 if (m->get_type() == MSG_MDS_METRICS &&
82 m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
83 const Message *msg = m.get();
84 const MMDSOp *op = dynamic_cast<const MMDSOp*>(msg);
85 if (!op)
86 dout(0) << typeid(*msg).name() << " is not an MMDSOp type" << dendl;
87 ceph_assert(op);
88 handle_mds_metrics(ref_cast<MMDSMetrics>(m));
89 return true;
90 }
91 return false;
92 }
93
94 void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client,
95 mds_rank_t rank, const Metrics &metrics) {
96 dout(20) << ": client=" << client << ", rank=" << rank << ", metrics="
97 << metrics << dendl;
98
99 auto &p = clients_by_rank.at(rank);
100 bool ins = p.insert(client).second;
101 if (ins) {
102 dout(20) << ": rank=" << rank << " has " << p.size() << " connected"
103 << " client(s)" << dendl;
104 }
105
106 auto update_counter_func = [&metrics](const MDSPerformanceCounterDescriptor &d,
107 PerformanceCounter *c) {
108 ceph_assert(d.is_supported());
109
110 dout(20) << ": performance_counter_descriptor=" << d << dendl;
111
112 switch (d.type) {
113 case MDSPerformanceCounterType::CAP_HIT_METRIC:
114 c->first = metrics.cap_hit_metric.hits;
115 c->second = metrics.cap_hit_metric.misses;
116 break;
117 case MDSPerformanceCounterType::READ_LATENCY_METRIC:
118 if (metrics.read_latency_metric.updated) {
119 c->first = metrics.read_latency_metric.lat.tv.tv_sec;
120 c->second = metrics.read_latency_metric.lat.tv.tv_nsec;
121 }
122 break;
123 case MDSPerformanceCounterType::WRITE_LATENCY_METRIC:
124 if (metrics.write_latency_metric.updated) {
125 c->first = metrics.write_latency_metric.lat.tv.tv_sec;
126 c->second = metrics.write_latency_metric.lat.tv.tv_nsec;
127 }
128 break;
129 case MDSPerformanceCounterType::METADATA_LATENCY_METRIC:
130 if (metrics.metadata_latency_metric.updated) {
131 c->first = metrics.metadata_latency_metric.lat.tv.tv_sec;
132 c->second = metrics.metadata_latency_metric.lat.tv.tv_nsec;
133 }
134 break;
135 case MDSPerformanceCounterType::DENTRY_LEASE_METRIC:
136 if (metrics.dentry_lease_metric.updated) {
137 c->first = metrics.dentry_lease_metric.hits;
138 c->second = metrics.dentry_lease_metric.misses;
139 }
140 break;
141 case MDSPerformanceCounterType::OPENED_FILES_METRIC:
142 if (metrics.opened_files_metric.updated) {
143 c->first = metrics.opened_files_metric.opened_files;
144 c->second = metrics.opened_files_metric.total_inodes;
145 }
146 break;
147 case MDSPerformanceCounterType::PINNED_ICAPS_METRIC:
148 if (metrics.pinned_icaps_metric.updated) {
149 c->first = metrics.pinned_icaps_metric.pinned_icaps;
150 c->second = metrics.pinned_icaps_metric.total_inodes;
151 }
152 break;
153 case MDSPerformanceCounterType::OPENED_INODES_METRIC:
154 if (metrics.opened_inodes_metric.updated) {
155 c->first = metrics.opened_inodes_metric.opened_inodes;
156 c->second = metrics.opened_inodes_metric.total_inodes;
157 }
158 break;
159 case MDSPerformanceCounterType::READ_IO_SIZES_METRIC:
160 if (metrics.read_io_sizes_metric.updated) {
161 c->first = metrics.read_io_sizes_metric.total_ops;
162 c->second = metrics.read_io_sizes_metric.total_size;
163 }
164 break;
165 case MDSPerformanceCounterType::WRITE_IO_SIZES_METRIC:
166 if (metrics.write_io_sizes_metric.updated) {
167 c->first = metrics.write_io_sizes_metric.total_ops;
168 c->second = metrics.write_io_sizes_metric.total_size;
169 }
170 break;
171 default:
172 ceph_abort_msg("unknown counter type");
173 }
174 };
175
176 auto sub_key_func = [client, rank](const MDSPerfMetricSubKeyDescriptor &d,
177 MDSPerfMetricSubKey *sub_key) {
178 ceph_assert(d.is_supported());
179
180 dout(20) << ": sub_key_descriptor=" << d << dendl;
181
182 std::string match_string;
183 switch (d.type) {
184 case MDSPerfMetricSubKeyType::MDS_RANK:
185 match_string = stringify(rank);
186 break;
187 case MDSPerfMetricSubKeyType::CLIENT_ID:
188 match_string = stringify(client);
189 break;
190 default:
191 ceph_abort_msg("unknown counter type");
192 }
193
194 dout(20) << ": match_string=" << match_string << dendl;
195
196 std::smatch match;
197 if (!std::regex_search(match_string, match, d.regex)) {
198 return false;
199 }
200 if (match.size() <= 1) {
201 return false;
202 }
203 for (size_t i = 1; i < match.size(); ++i) {
204 sub_key->push_back(match[i].str());
205 }
206 return true;
207 };
208
209 for (auto& [query, perf_key_map] : query_metrics_map) {
210 MDSPerfMetricKey key;
211 if (query.get_key(sub_key_func, &key)) {
212 query.update_counters(update_counter_func, &perf_key_map[key]);
213 }
214 }
215 }
216
217 void MetricAggregator::remove_metrics_for_rank(const entity_inst_t &client,
218 mds_rank_t rank, bool remove) {
219 dout(20) << ": client=" << client << ", rank=" << rank << dendl;
220
221 if (remove) {
222 auto &p = clients_by_rank.at(rank);
223 bool rm = p.erase(client) != 0;
224 ceph_assert(rm);
225 dout(20) << ": rank=" << rank << " has " << p.size() << " connected"
226 << " client(s)" << dendl;
227 }
228
229 auto sub_key_func = [client, rank](const MDSPerfMetricSubKeyDescriptor &d,
230 MDSPerfMetricSubKey *sub_key) {
231 ceph_assert(d.is_supported());
232 dout(20) << ": sub_key_descriptor=" << d << dendl;
233
234 std::string match_string;
235 switch (d.type) {
236 case MDSPerfMetricSubKeyType::MDS_RANK:
237 match_string = stringify(rank);
238 break;
239 case MDSPerfMetricSubKeyType::CLIENT_ID:
240 match_string = stringify(client);
241 break;
242 default:
243 ceph_abort_msg("unknown counter type");
244 }
245
246 dout(20) << ": match_string=" << match_string << dendl;
247
248 std::smatch match;
249 if (!std::regex_search(match_string, match, d.regex)) {
250 return false;
251 }
252 if (match.size() <= 1) {
253 return false;
254 }
255 for (size_t i = 1; i < match.size(); ++i) {
256 sub_key->push_back(match[i].str());
257 }
258 return true;
259 };
260
261 for (auto& [query, perf_key_map] : query_metrics_map) {
262 MDSPerfMetricKey key;
263 if (query.get_key(sub_key_func, &key)) {
264 if (perf_key_map.erase(key)) {
265 dout(10) << ": removed metric for key=" << key << dendl;
266 }
267 }
268 }
269 }
270
271 void MetricAggregator::handle_mds_metrics(const cref_t<MMDSMetrics> &m) {
272 const metrics_message_t &metrics_message = m->metrics_message;
273
274 auto seq = metrics_message.seq;
275 auto rank = metrics_message.rank;
276 auto &client_metrics_map = metrics_message.client_metrics_map;
277
278 dout(20) << ": applying " << client_metrics_map.size() << " updates for rank="
279 << rank << " with sequence number " << seq << dendl;
280
281 std::scoped_lock locker(lock);
282 if (!mds_pinger.pong_received(rank, seq)) {
283 return;
284 }
285
286 for (auto& [client, metrics] : client_metrics_map) {
287 switch (metrics.update_type) {
288 case UpdateType::UPDATE_TYPE_REFRESH:
289 refresh_metrics_for_rank(client, rank, metrics);
290 break;
291 case UpdateType::UPDATE_TYPE_REMOVE:
292 remove_metrics_for_rank(client, rank, true);
293 break;
294 default:
295 ceph_abort();
296 }
297 }
298 }
299
300 void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) {
301 dout(20) << ": rank=" << rank << dendl;
302
303 auto &p = clients_by_rank.at(rank);
304 for (auto &client : p) {
305 remove_metrics_for_rank(client, rank, false);
306 }
307
308 dout(10) << ": culled " << p.size() << " clients" << dendl;
309 clients_by_rank.erase(rank);
310 }
311
312 void MetricAggregator::notify_mdsmap(const MDSMap &mdsmap) {
313 dout(10) << dendl;
314
315 std::scoped_lock locker(lock);
316 std::set<mds_rank_t> current_active;
317 mdsmap.get_active_mds_set(current_active);
318
319 std::set<mds_rank_t> active_set;
320 boost::copy(active_rank_addrs | boost::adaptors::map_keys,
321 std::inserter(active_set, active_set.begin()));
322
323 std::set<mds_rank_t> diff;
324 std::set_difference(active_set.begin(), active_set.end(),
325 current_active.begin(), current_active.end(),
326 std::inserter(diff, diff.end()));
327
328 for (auto &rank : diff) {
329 dout(10) << ": mds rank=" << rank << " removed from mdsmap" << dendl;
330 active_rank_addrs.erase(rank);
331 cull_metrics_for_rank(rank);
332 mds_pinger.reset_ping(rank);
333 }
334
335 diff.clear();
336 std::set_difference(current_active.begin(), current_active.end(),
337 active_set.begin(), active_set.end(),
338 std::inserter(diff, diff.end()));
339
340 for (auto &rank : diff) {
341 auto rank_addr = mdsmap.get_addrs(rank);
342 dout(10) << ": active rank=" << rank << " (mds." << mdsmap.get_mds_info(rank).name
343 << ") has addr=" << rank_addr << dendl;
344 active_rank_addrs.emplace(rank, rank_addr);
345 clients_by_rank.emplace(rank, std::unordered_set<entity_inst_t>{});
346 }
347
348 dout(10) << ": active set=[" << active_rank_addrs << "]" << dendl;
349 }
350
351 void MetricAggregator::set_perf_queries(const ConfigPayload &config_payload) {
352 const MDSConfigPayload &mds_config_payload = boost::get<MDSConfigPayload>(config_payload);
353 const std::map<MDSPerfMetricQuery, MDSPerfMetricLimits> &queries = mds_config_payload.config;
354
355 dout(10) << ": setting " << queries.size() << " queries" << dendl;
356
357 std::scoped_lock locker(lock);
358 std::map<MDSPerfMetricQuery, std::map<MDSPerfMetricKey, PerformanceCounters>> new_data;
359 for (auto &p : queries) {
360 std::swap(new_data[p.first], query_metrics_map[p.first]);
361 }
362 std::swap(query_metrics_map, new_data);
363 }
364
365 MetricPayload MetricAggregator::get_perf_reports() {
366 MDSMetricPayload payload;
367 MDSPerfMetricReport &metric_report = payload.metric_report;
368 std::map<MDSPerfMetricQuery, MDSPerfMetrics> &reports = metric_report.reports;
369
370 std::scoped_lock locker(lock);
371
372 for (auto& [query, counters] : query_metrics_map) {
373 auto &report = reports[query];
374
375 query.get_performance_counter_descriptors(&report.performance_counter_descriptors);
376
377 auto &descriptors = report.performance_counter_descriptors;
378
379 dout(20) << ": descriptors=" << descriptors << dendl;
380
381 for (auto &p : counters) {
382 dout(20) << ": packing perf_metric_key=" << p.first << ", perf_counter="
383 << p.second << dendl;
384 auto &bl = report.group_packed_performance_counters[p.first];
385 query.pack_counters(p.second, &bl);
386 }
387 }
388
389 // stash a copy of dealyed and failed ranks. mgr culls out metrics
390 // for failed ranks and tags metrics for delayed ranks as "stale".
391 for (auto &p : active_rank_addrs) {
392 auto rank = p.first;
393 if (mds_pinger.is_rank_lagging(rank)) {
394 metric_report.rank_metrics_delayed.insert(rank);
395 }
396 }
397
398 return payload;
399 }