1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/range/adaptor/map.hpp>
5 #include <boost/range/algorithm/copy.hpp>
8 #include "MetricAggregator.h"
9 #include "mgr/MgrClient.h"
11 #define dout_context g_ceph_context
12 #define dout_subsys ceph_subsys_mds
14 #define dout_prefix *_dout << "mds.metric.aggregator" << " " << __func__
16 MetricAggregator::MetricAggregator(CephContext
*cct
, MDSRank
*mds
, MgrClient
*mgrc
)
23 void MetricAggregator::ping_all_active_ranks() {
24 dout(10) << ": pinging " << active_rank_addrs
.size() << " active mds(s)" << dendl
;
26 for (const auto &[rank
, addr
] : active_rank_addrs
) {
27 dout(20) << ": pinging rank=" << rank
<< " addr=" << addr
<< dendl
;
28 mds_pinger
.send_ping(rank
, addr
);
32 int MetricAggregator::init() {
35 pinger
= std::thread([this]() {
36 std::unique_lock
locker(lock
);
38 ping_all_active_ranks();
40 double timo
= g_conf().get_val
<std::chrono::seconds
>("mds_ping_interval").count();
46 mgrc
->set_perf_metric_query_cb(
47 [this](const ConfigPayload
&config_payload
) {
48 set_perf_queries(config_payload
);
51 return get_perf_reports();
57 void MetricAggregator::shutdown() {
61 std::scoped_lock
locker(lock
);
62 ceph_assert(!stopping
);
66 if (pinger
.joinable()) {
71 bool MetricAggregator::ms_can_fast_dispatch2(const cref_t
<Message
> &m
) const {
72 return m
->get_type() == MSG_MDS_METRICS
;
75 void MetricAggregator::ms_fast_dispatch2(const ref_t
<Message
> &m
) {
76 bool handled
= ms_dispatch2(m
);
80 bool MetricAggregator::ms_dispatch2(const ref_t
<Message
> &m
) {
81 if (m
->get_type() == MSG_MDS_METRICS
&&
82 m
->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS
) {
83 const Message
*msg
= m
.get();
84 const MMDSOp
*op
= dynamic_cast<const MMDSOp
*>(msg
);
86 dout(0) << typeid(*msg
).name() << " is not an MMDSOp type" << dendl
;
88 handle_mds_metrics(ref_cast
<MMDSMetrics
>(m
));
94 void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t
&client
,
95 mds_rank_t rank
, const Metrics
&metrics
) {
96 dout(20) << ": client=" << client
<< ", rank=" << rank
<< ", metrics="
99 auto &p
= clients_by_rank
.at(rank
);
100 bool ins
= p
.insert(client
).second
;
102 dout(20) << ": rank=" << rank
<< " has " << p
.size() << " connected"
103 << " client(s)" << dendl
;
106 auto update_counter_func
= [&metrics
](const MDSPerformanceCounterDescriptor
&d
,
107 PerformanceCounter
*c
) {
108 ceph_assert(d
.is_supported());
110 dout(20) << ": performance_counter_descriptor=" << d
<< dendl
;
113 case MDSPerformanceCounterType::CAP_HIT_METRIC
:
114 c
->first
= metrics
.cap_hit_metric
.hits
;
115 c
->second
= metrics
.cap_hit_metric
.misses
;
117 case MDSPerformanceCounterType::READ_LATENCY_METRIC
:
118 if (metrics
.read_latency_metric
.updated
) {
119 c
->first
= metrics
.read_latency_metric
.lat
.tv
.tv_sec
;
120 c
->second
= metrics
.read_latency_metric
.lat
.tv
.tv_nsec
;
123 case MDSPerformanceCounterType::WRITE_LATENCY_METRIC
:
124 if (metrics
.write_latency_metric
.updated
) {
125 c
->first
= metrics
.write_latency_metric
.lat
.tv
.tv_sec
;
126 c
->second
= metrics
.write_latency_metric
.lat
.tv
.tv_nsec
;
129 case MDSPerformanceCounterType::METADATA_LATENCY_METRIC
:
130 if (metrics
.metadata_latency_metric
.updated
) {
131 c
->first
= metrics
.metadata_latency_metric
.lat
.tv
.tv_sec
;
132 c
->second
= metrics
.metadata_latency_metric
.lat
.tv
.tv_nsec
;
135 case MDSPerformanceCounterType::DENTRY_LEASE_METRIC
:
136 if (metrics
.dentry_lease_metric
.updated
) {
137 c
->first
= metrics
.dentry_lease_metric
.hits
;
138 c
->second
= metrics
.dentry_lease_metric
.misses
;
141 case MDSPerformanceCounterType::OPENED_FILES_METRIC
:
142 if (metrics
.opened_files_metric
.updated
) {
143 c
->first
= metrics
.opened_files_metric
.opened_files
;
144 c
->second
= metrics
.opened_files_metric
.total_inodes
;
147 case MDSPerformanceCounterType::PINNED_ICAPS_METRIC
:
148 if (metrics
.pinned_icaps_metric
.updated
) {
149 c
->first
= metrics
.pinned_icaps_metric
.pinned_icaps
;
150 c
->second
= metrics
.pinned_icaps_metric
.total_inodes
;
153 case MDSPerformanceCounterType::OPENED_INODES_METRIC
:
154 if (metrics
.opened_inodes_metric
.updated
) {
155 c
->first
= metrics
.opened_inodes_metric
.opened_inodes
;
156 c
->second
= metrics
.opened_inodes_metric
.total_inodes
;
159 case MDSPerformanceCounterType::READ_IO_SIZES_METRIC
:
160 if (metrics
.read_io_sizes_metric
.updated
) {
161 c
->first
= metrics
.read_io_sizes_metric
.total_ops
;
162 c
->second
= metrics
.read_io_sizes_metric
.total_size
;
165 case MDSPerformanceCounterType::WRITE_IO_SIZES_METRIC
:
166 if (metrics
.write_io_sizes_metric
.updated
) {
167 c
->first
= metrics
.write_io_sizes_metric
.total_ops
;
168 c
->second
= metrics
.write_io_sizes_metric
.total_size
;
172 ceph_abort_msg("unknown counter type");
176 auto sub_key_func
= [client
, rank
](const MDSPerfMetricSubKeyDescriptor
&d
,
177 MDSPerfMetricSubKey
*sub_key
) {
178 ceph_assert(d
.is_supported());
180 dout(20) << ": sub_key_descriptor=" << d
<< dendl
;
182 std::string match_string
;
184 case MDSPerfMetricSubKeyType::MDS_RANK
:
185 match_string
= stringify(rank
);
187 case MDSPerfMetricSubKeyType::CLIENT_ID
:
188 match_string
= stringify(client
);
191 ceph_abort_msg("unknown counter type");
194 dout(20) << ": match_string=" << match_string
<< dendl
;
197 if (!std::regex_search(match_string
, match
, d
.regex
)) {
200 if (match
.size() <= 1) {
203 for (size_t i
= 1; i
< match
.size(); ++i
) {
204 sub_key
->push_back(match
[i
].str());
209 for (auto& [query
, perf_key_map
] : query_metrics_map
) {
210 MDSPerfMetricKey key
;
211 if (query
.get_key(sub_key_func
, &key
)) {
212 query
.update_counters(update_counter_func
, &perf_key_map
[key
]);
217 void MetricAggregator::remove_metrics_for_rank(const entity_inst_t
&client
,
218 mds_rank_t rank
, bool remove
) {
219 dout(20) << ": client=" << client
<< ", rank=" << rank
<< dendl
;
222 auto &p
= clients_by_rank
.at(rank
);
223 bool rm
= p
.erase(client
) != 0;
225 dout(20) << ": rank=" << rank
<< " has " << p
.size() << " connected"
226 << " client(s)" << dendl
;
229 auto sub_key_func
= [client
, rank
](const MDSPerfMetricSubKeyDescriptor
&d
,
230 MDSPerfMetricSubKey
*sub_key
) {
231 ceph_assert(d
.is_supported());
232 dout(20) << ": sub_key_descriptor=" << d
<< dendl
;
234 std::string match_string
;
236 case MDSPerfMetricSubKeyType::MDS_RANK
:
237 match_string
= stringify(rank
);
239 case MDSPerfMetricSubKeyType::CLIENT_ID
:
240 match_string
= stringify(client
);
243 ceph_abort_msg("unknown counter type");
246 dout(20) << ": match_string=" << match_string
<< dendl
;
249 if (!std::regex_search(match_string
, match
, d
.regex
)) {
252 if (match
.size() <= 1) {
255 for (size_t i
= 1; i
< match
.size(); ++i
) {
256 sub_key
->push_back(match
[i
].str());
261 for (auto& [query
, perf_key_map
] : query_metrics_map
) {
262 MDSPerfMetricKey key
;
263 if (query
.get_key(sub_key_func
, &key
)) {
264 if (perf_key_map
.erase(key
)) {
265 dout(10) << ": removed metric for key=" << key
<< dendl
;
271 void MetricAggregator::handle_mds_metrics(const cref_t
<MMDSMetrics
> &m
) {
272 const metrics_message_t
&metrics_message
= m
->metrics_message
;
274 auto seq
= metrics_message
.seq
;
275 auto rank
= metrics_message
.rank
;
276 auto &client_metrics_map
= metrics_message
.client_metrics_map
;
278 dout(20) << ": applying " << client_metrics_map
.size() << " updates for rank="
279 << rank
<< " with sequence number " << seq
<< dendl
;
281 std::scoped_lock
locker(lock
);
282 if (!mds_pinger
.pong_received(rank
, seq
)) {
286 for (auto& [client
, metrics
] : client_metrics_map
) {
287 switch (metrics
.update_type
) {
288 case UpdateType::UPDATE_TYPE_REFRESH
:
289 refresh_metrics_for_rank(client
, rank
, metrics
);
291 case UpdateType::UPDATE_TYPE_REMOVE
:
292 remove_metrics_for_rank(client
, rank
, true);
300 void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank
) {
301 dout(20) << ": rank=" << rank
<< dendl
;
303 auto &p
= clients_by_rank
.at(rank
);
304 for (auto &client
: p
) {
305 remove_metrics_for_rank(client
, rank
, false);
308 dout(10) << ": culled " << p
.size() << " clients" << dendl
;
309 clients_by_rank
.erase(rank
);
312 void MetricAggregator::notify_mdsmap(const MDSMap
&mdsmap
) {
315 std::scoped_lock
locker(lock
);
316 std::set
<mds_rank_t
> current_active
;
317 mdsmap
.get_active_mds_set(current_active
);
319 std::set
<mds_rank_t
> active_set
;
320 boost::copy(active_rank_addrs
| boost::adaptors::map_keys
,
321 std::inserter(active_set
, active_set
.begin()));
323 std::set
<mds_rank_t
> diff
;
324 std::set_difference(active_set
.begin(), active_set
.end(),
325 current_active
.begin(), current_active
.end(),
326 std::inserter(diff
, diff
.end()));
328 for (auto &rank
: diff
) {
329 dout(10) << ": mds rank=" << rank
<< " removed from mdsmap" << dendl
;
330 active_rank_addrs
.erase(rank
);
331 cull_metrics_for_rank(rank
);
332 mds_pinger
.reset_ping(rank
);
336 std::set_difference(current_active
.begin(), current_active
.end(),
337 active_set
.begin(), active_set
.end(),
338 std::inserter(diff
, diff
.end()));
340 for (auto &rank
: diff
) {
341 auto rank_addr
= mdsmap
.get_addrs(rank
);
342 dout(10) << ": active rank=" << rank
<< " (mds." << mdsmap
.get_mds_info(rank
).name
343 << ") has addr=" << rank_addr
<< dendl
;
344 active_rank_addrs
.emplace(rank
, rank_addr
);
345 clients_by_rank
.emplace(rank
, std::unordered_set
<entity_inst_t
>{});
348 dout(10) << ": active set=[" << active_rank_addrs
<< "]" << dendl
;
351 void MetricAggregator::set_perf_queries(const ConfigPayload
&config_payload
) {
352 const MDSConfigPayload
&mds_config_payload
= boost::get
<MDSConfigPayload
>(config_payload
);
353 const std::map
<MDSPerfMetricQuery
, MDSPerfMetricLimits
> &queries
= mds_config_payload
.config
;
355 dout(10) << ": setting " << queries
.size() << " queries" << dendl
;
357 std::scoped_lock
locker(lock
);
358 std::map
<MDSPerfMetricQuery
, std::map
<MDSPerfMetricKey
, PerformanceCounters
>> new_data
;
359 for (auto &p
: queries
) {
360 std::swap(new_data
[p
.first
], query_metrics_map
[p
.first
]);
362 std::swap(query_metrics_map
, new_data
);
365 MetricPayload
MetricAggregator::get_perf_reports() {
366 MDSMetricPayload payload
;
367 MDSPerfMetricReport
&metric_report
= payload
.metric_report
;
368 std::map
<MDSPerfMetricQuery
, MDSPerfMetrics
> &reports
= metric_report
.reports
;
370 std::scoped_lock
locker(lock
);
372 for (auto& [query
, counters
] : query_metrics_map
) {
373 auto &report
= reports
[query
];
375 query
.get_performance_counter_descriptors(&report
.performance_counter_descriptors
);
377 auto &descriptors
= report
.performance_counter_descriptors
;
379 dout(20) << ": descriptors=" << descriptors
<< dendl
;
381 for (auto &p
: counters
) {
382 dout(20) << ": packing perf_metric_key=" << p
.first
<< ", perf_counter="
383 << p
.second
<< dendl
;
384 auto &bl
= report
.group_packed_performance_counters
[p
.first
];
385 query
.pack_counters(p
.second
, &bl
);
389 // stash a copy of dealyed and failed ranks. mgr culls out metrics
390 // for failed ranks and tags metrics for delayed ranks as "stale".
391 for (auto &p
: active_rank_addrs
) {
393 if (mds_pinger
.is_rank_lagging(rank
)) {
394 metric_report
.rank_metrics_delayed
.insert(rank
);