1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/range/adaptor/map.hpp>
5 #include <boost/range/algorithm/copy.hpp>
8 #include "MetricAggregator.h"
9 #include "mgr/MgrClient.h"
11 #define dout_context g_ceph_context
12 #define dout_subsys ceph_subsys_mds
14 #define dout_prefix *_dout << "mds.metric.aggregator" << " " << __func__
16 MetricAggregator::MetricAggregator(CephContext
*cct
, MDSRank
*mds
, MgrClient
*mgrc
)
23 void MetricAggregator::ping_all_active_ranks() {
24 dout(10) << ": pinging " << active_rank_addrs
.size() << " active mds(s)" << dendl
;
26 for (const auto &[rank
, addr
] : active_rank_addrs
) {
27 dout(20) << ": pinging rank=" << rank
<< " addr=" << addr
<< dendl
;
28 mds_pinger
.send_ping(rank
, addr
);
32 int MetricAggregator::init() {
35 pinger
= std::thread([this]() {
36 std::unique_lock
locker(lock
);
38 ping_all_active_ranks();
40 double timo
= g_conf().get_val
<std::chrono::seconds
>("mds_ping_interval").count();
46 mgrc
->set_perf_metric_query_cb(
47 [this](const ConfigPayload
&config_payload
) {
48 set_perf_queries(config_payload
);
51 return get_perf_reports();
57 void MetricAggregator::shutdown() {
61 std::scoped_lock
locker(lock
);
62 ceph_assert(!stopping
);
66 if (pinger
.joinable()) {
71 bool MetricAggregator::ms_can_fast_dispatch2(const cref_t
<Message
> &m
) const {
72 return m
->get_type() == MSG_MDS_METRICS
;
75 void MetricAggregator::ms_fast_dispatch2(const ref_t
<Message
> &m
) {
76 bool handled
= ms_dispatch2(m
);
80 bool MetricAggregator::ms_dispatch2(const ref_t
<Message
> &m
) {
81 if (m
->get_type() == MSG_MDS_METRICS
&&
82 m
->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS
) {
83 const Message
*msg
= m
.get();
84 const MMDSOp
*op
= dynamic_cast<const MMDSOp
*>(msg
);
86 dout(0) << typeid(*msg
).name() << " is not an MMDSOp type" << dendl
;
88 handle_mds_metrics(ref_cast
<MMDSMetrics
>(m
));
94 void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t
&client
,
95 mds_rank_t rank
, const Metrics
&metrics
) {
96 dout(20) << ": client=" << client
<< ", rank=" << rank
<< ", metrics="
99 auto &p
= clients_by_rank
.at(rank
);
100 bool ins
= p
.insert(client
).second
;
102 dout(20) << ": rank=" << rank
<< " has " << p
.size() << " connected"
103 << " client(s)" << dendl
;
106 auto update_counter_func
= [&metrics
](const MDSPerformanceCounterDescriptor
&d
,
107 PerformanceCounter
*c
) {
108 ceph_assert(d
.is_supported());
110 dout(20) << ": performance_counter_descriptor=" << d
<< dendl
;
113 case MDSPerformanceCounterType::CAP_HIT_METRIC
:
114 c
->first
= metrics
.cap_hit_metric
.hits
;
115 c
->second
= metrics
.cap_hit_metric
.misses
;
117 case MDSPerformanceCounterType::READ_LATENCY_METRIC
:
118 if (metrics
.read_latency_metric
.updated
) {
119 c
->first
= metrics
.read_latency_metric
.lat
.tv
.tv_sec
;
120 c
->second
= metrics
.read_latency_metric
.lat
.tv
.tv_nsec
;
123 case MDSPerformanceCounterType::WRITE_LATENCY_METRIC
:
124 if (metrics
.write_latency_metric
.updated
) {
125 c
->first
= metrics
.write_latency_metric
.lat
.tv
.tv_sec
;
126 c
->second
= metrics
.write_latency_metric
.lat
.tv
.tv_nsec
;
129 case MDSPerformanceCounterType::METADATA_LATENCY_METRIC
:
130 if (metrics
.metadata_latency_metric
.updated
) {
131 c
->first
= metrics
.metadata_latency_metric
.lat
.tv
.tv_sec
;
132 c
->second
= metrics
.metadata_latency_metric
.lat
.tv
.tv_nsec
;
135 case MDSPerformanceCounterType::DENTRY_LEASE_METRIC
:
136 if (metrics
.dentry_lease_metric
.updated
) {
137 c
->first
= metrics
.dentry_lease_metric
.hits
;
138 c
->second
= metrics
.dentry_lease_metric
.misses
;
141 case MDSPerformanceCounterType::OPENED_FILES_METRIC
:
142 if (metrics
.opened_files_metric
.updated
) {
143 c
->first
= metrics
.opened_files_metric
.opened_files
;
144 c
->second
= metrics
.opened_files_metric
.total_inodes
;
147 case MDSPerformanceCounterType::PINNED_ICAPS_METRIC
:
148 if (metrics
.pinned_icaps_metric
.updated
) {
149 c
->first
= metrics
.pinned_icaps_metric
.pinned_icaps
;
150 c
->second
= metrics
.pinned_icaps_metric
.total_inodes
;
153 case MDSPerformanceCounterType::OPENED_INODES_METRIC
:
154 if (metrics
.opened_inodes_metric
.updated
) {
155 c
->first
= metrics
.opened_inodes_metric
.opened_inodes
;
156 c
->second
= metrics
.opened_inodes_metric
.total_inodes
;
159 case MDSPerformanceCounterType::READ_IO_SIZES_METRIC
:
160 if (metrics
.read_io_sizes_metric
.updated
) {
161 c
->first
= metrics
.read_io_sizes_metric
.total_ops
;
162 c
->second
= metrics
.read_io_sizes_metric
.total_size
;
165 case MDSPerformanceCounterType::WRITE_IO_SIZES_METRIC
:
166 if (metrics
.write_io_sizes_metric
.updated
) {
167 c
->first
= metrics
.write_io_sizes_metric
.total_ops
;
168 c
->second
= metrics
.write_io_sizes_metric
.total_size
;
171 case MDSPerformanceCounterType::AVG_READ_LATENCY_METRIC
:
172 if (metrics
.read_latency_metric
.updated
) {
173 c
->first
= metrics
.read_latency_metric
.mean
.tv
.tv_sec
;
174 c
->second
= metrics
.read_latency_metric
.mean
.tv
.tv_nsec
;
177 case MDSPerformanceCounterType::STDEV_READ_LATENCY_METRIC
:
178 if (metrics
.read_latency_metric
.updated
) {
179 c
->first
= metrics
.read_latency_metric
.sq_sum
;
180 c
->second
= metrics
.read_latency_metric
.count
;
183 case MDSPerformanceCounterType::AVG_WRITE_LATENCY_METRIC
:
184 if (metrics
.write_latency_metric
.updated
) {
185 c
->first
= metrics
.write_latency_metric
.mean
.tv
.tv_sec
;
186 c
->second
= metrics
.write_latency_metric
.mean
.tv
.tv_nsec
;
189 case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC
:
190 if (metrics
.write_latency_metric
.updated
) {
191 c
->first
= metrics
.write_latency_metric
.sq_sum
;
192 c
->second
= metrics
.write_latency_metric
.count
;
195 case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC
:
196 if (metrics
.metadata_latency_metric
.updated
) {
197 c
->first
= metrics
.metadata_latency_metric
.mean
.tv
.tv_sec
;
198 c
->second
= metrics
.metadata_latency_metric
.mean
.tv
.tv_nsec
;
201 case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC
:
202 if (metrics
.metadata_latency_metric
.updated
) {
203 c
->first
= metrics
.metadata_latency_metric
.sq_sum
;
204 c
->second
= metrics
.metadata_latency_metric
.count
;
208 ceph_abort_msg("unknown counter type");
212 auto sub_key_func
= [client
, rank
](const MDSPerfMetricSubKeyDescriptor
&d
,
213 MDSPerfMetricSubKey
*sub_key
) {
214 ceph_assert(d
.is_supported());
216 dout(20) << ": sub_key_descriptor=" << d
<< dendl
;
218 std::string match_string
;
220 case MDSPerfMetricSubKeyType::MDS_RANK
:
221 match_string
= stringify(rank
);
223 case MDSPerfMetricSubKeyType::CLIENT_ID
:
224 match_string
= stringify(client
);
227 ceph_abort_msg("unknown counter type");
230 dout(20) << ": match_string=" << match_string
<< dendl
;
233 if (!std::regex_search(match_string
, match
, d
.regex
)) {
236 if (match
.size() <= 1) {
239 for (size_t i
= 1; i
< match
.size(); ++i
) {
240 sub_key
->push_back(match
[i
].str());
245 for (auto& [query
, perf_key_map
] : query_metrics_map
) {
246 MDSPerfMetricKey key
;
247 if (query
.get_key(sub_key_func
, &key
)) {
248 query
.update_counters(update_counter_func
, &perf_key_map
[key
]);
253 void MetricAggregator::remove_metrics_for_rank(const entity_inst_t
&client
,
254 mds_rank_t rank
, bool remove
) {
255 dout(20) << ": client=" << client
<< ", rank=" << rank
<< dendl
;
258 auto &p
= clients_by_rank
.at(rank
);
259 bool rm
= p
.erase(client
) != 0;
261 dout(20) << ": rank=" << rank
<< " has " << p
.size() << " connected"
262 << " client(s)" << dendl
;
265 auto sub_key_func
= [client
, rank
](const MDSPerfMetricSubKeyDescriptor
&d
,
266 MDSPerfMetricSubKey
*sub_key
) {
267 ceph_assert(d
.is_supported());
268 dout(20) << ": sub_key_descriptor=" << d
<< dendl
;
270 std::string match_string
;
272 case MDSPerfMetricSubKeyType::MDS_RANK
:
273 match_string
= stringify(rank
);
275 case MDSPerfMetricSubKeyType::CLIENT_ID
:
276 match_string
= stringify(client
);
279 ceph_abort_msg("unknown counter type");
282 dout(20) << ": match_string=" << match_string
<< dendl
;
285 if (!std::regex_search(match_string
, match
, d
.regex
)) {
288 if (match
.size() <= 1) {
291 for (size_t i
= 1; i
< match
.size(); ++i
) {
292 sub_key
->push_back(match
[i
].str());
297 for (auto& [query
, perf_key_map
] : query_metrics_map
) {
298 MDSPerfMetricKey key
;
299 if (query
.get_key(sub_key_func
, &key
)) {
300 if (perf_key_map
.erase(key
)) {
301 dout(10) << ": removed metric for key=" << key
<< dendl
;
307 void MetricAggregator::handle_mds_metrics(const cref_t
<MMDSMetrics
> &m
) {
308 const metrics_message_t
&metrics_message
= m
->metrics_message
;
310 auto seq
= metrics_message
.seq
;
311 auto rank
= metrics_message
.rank
;
312 auto &client_metrics_map
= metrics_message
.client_metrics_map
;
314 dout(20) << ": applying " << client_metrics_map
.size() << " updates for rank="
315 << rank
<< " with sequence number " << seq
<< dendl
;
317 std::scoped_lock
locker(lock
);
318 if (!mds_pinger
.pong_received(rank
, seq
)) {
322 for (auto& [client
, metrics
] : client_metrics_map
) {
323 switch (metrics
.update_type
) {
324 case UpdateType::UPDATE_TYPE_REFRESH
:
325 refresh_metrics_for_rank(client
, rank
, metrics
);
327 case UpdateType::UPDATE_TYPE_REMOVE
:
328 remove_metrics_for_rank(client
, rank
, true);
336 void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank
) {
337 dout(20) << ": rank=" << rank
<< dendl
;
339 auto &p
= clients_by_rank
.at(rank
);
340 for (auto &client
: p
) {
341 remove_metrics_for_rank(client
, rank
, false);
344 dout(10) << ": culled " << p
.size() << " clients" << dendl
;
345 clients_by_rank
.erase(rank
);
348 void MetricAggregator::notify_mdsmap(const MDSMap
&mdsmap
) {
351 std::scoped_lock
locker(lock
);
352 std::set
<mds_rank_t
> current_active
;
353 mdsmap
.get_active_mds_set(current_active
);
355 std::set
<mds_rank_t
> active_set
;
356 boost::copy(active_rank_addrs
| boost::adaptors::map_keys
,
357 std::inserter(active_set
, active_set
.begin()));
359 std::set
<mds_rank_t
> diff
;
360 std::set_difference(active_set
.begin(), active_set
.end(),
361 current_active
.begin(), current_active
.end(),
362 std::inserter(diff
, diff
.end()));
364 for (auto &rank
: diff
) {
365 dout(10) << ": mds rank=" << rank
<< " removed from mdsmap" << dendl
;
366 active_rank_addrs
.erase(rank
);
367 cull_metrics_for_rank(rank
);
368 mds_pinger
.reset_ping(rank
);
372 std::set_difference(current_active
.begin(), current_active
.end(),
373 active_set
.begin(), active_set
.end(),
374 std::inserter(diff
, diff
.end()));
376 for (auto &rank
: diff
) {
377 auto rank_addr
= mdsmap
.get_addrs(rank
);
378 dout(10) << ": active rank=" << rank
<< " (mds." << mdsmap
.get_mds_info(rank
).name
379 << ") has addr=" << rank_addr
<< dendl
;
380 active_rank_addrs
.emplace(rank
, rank_addr
);
381 clients_by_rank
.emplace(rank
, std::unordered_set
<entity_inst_t
>{});
384 dout(10) << ": active set=[" << active_rank_addrs
<< "]" << dendl
;
387 void MetricAggregator::set_perf_queries(const ConfigPayload
&config_payload
) {
388 const MDSConfigPayload
&mds_config_payload
= boost::get
<MDSConfigPayload
>(config_payload
);
389 const std::map
<MDSPerfMetricQuery
, MDSPerfMetricLimits
> &queries
= mds_config_payload
.config
;
391 dout(10) << ": setting " << queries
.size() << " queries" << dendl
;
393 std::scoped_lock
locker(lock
);
394 std::map
<MDSPerfMetricQuery
, std::map
<MDSPerfMetricKey
, PerformanceCounters
>> new_data
;
395 for (auto &p
: queries
) {
396 std::swap(new_data
[p
.first
], query_metrics_map
[p
.first
]);
398 std::swap(query_metrics_map
, new_data
);
401 MetricPayload
MetricAggregator::get_perf_reports() {
402 MDSMetricPayload payload
;
403 MDSPerfMetricReport
&metric_report
= payload
.metric_report
;
404 std::map
<MDSPerfMetricQuery
, MDSPerfMetrics
> &reports
= metric_report
.reports
;
406 std::scoped_lock
locker(lock
);
408 for (auto& [query
, counters
] : query_metrics_map
) {
409 auto &report
= reports
[query
];
411 query
.get_performance_counter_descriptors(&report
.performance_counter_descriptors
);
413 auto &descriptors
= report
.performance_counter_descriptors
;
415 dout(20) << ": descriptors=" << descriptors
<< dendl
;
417 for (auto &p
: counters
) {
418 dout(20) << ": packing perf_metric_key=" << p
.first
<< ", perf_counter="
419 << p
.second
<< dendl
;
420 auto &bl
= report
.group_packed_performance_counters
[p
.first
];
421 query
.pack_counters(p
.second
, &bl
);
425 // stash a copy of dealyed and failed ranks. mgr culls out metrics
426 // for failed ranks and tags metrics for delayed ranks as "stale".
427 for (auto &p
: active_rank_addrs
) {
429 if (mds_pinger
.is_rank_lagging(rank
)) {
430 metric_report
.rank_metrics_delayed
.insert(rank
);