1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/debug.h"
5 #include "common/errno.h"
7 #include "messages/MMDSMetrics.h"
10 #include "SessionMap.h"
11 #include "MetricsHandler.h"
13 #define dout_context g_ceph_context
14 #define dout_subsys ceph_subsys_mds
16 #define dout_prefix *_dout << __func__ << ": mds.metrics"
18 MetricsHandler::MetricsHandler(CephContext
*cct
, MDSRank
*mds
)
23 bool MetricsHandler::ms_can_fast_dispatch2(const cref_t
<Message
> &m
) const {
24 return m
->get_type() == CEPH_MSG_CLIENT_METRICS
|| m
->get_type() == MSG_MDS_PING
;
27 void MetricsHandler::ms_fast_dispatch2(const ref_t
<Message
> &m
) {
28 bool handled
= ms_dispatch2(m
);
32 bool MetricsHandler::ms_dispatch2(const ref_t
<Message
> &m
) {
33 if (m
->get_type() == CEPH_MSG_CLIENT_METRICS
&&
34 m
->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT
) {
35 handle_client_metrics(ref_cast
<MClientMetrics
>(m
));
37 } else if (m
->get_type() == MSG_MDS_PING
&&
38 m
->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS
) {
39 const Message
*msg
= m
.get();
40 const MMDSOp
*op
= dynamic_cast<const MMDSOp
*>(msg
);
42 dout(0) << typeid(*msg
).name() << " is not an MMDSOp type" << dendl
;
44 handle_mds_ping(ref_cast
<MMDSPing
>(m
));
50 void MetricsHandler::init() {
53 updater
= std::thread([this]() {
54 std::unique_lock
locker(lock
);
56 double after
= g_conf().get_val
<std::chrono::seconds
>("mds_metrics_update_interval").count();
65 void MetricsHandler::shutdown() {
69 std::scoped_lock
locker(lock
);
70 ceph_assert(!stopping
);
74 if (updater
.joinable()) {
80 void MetricsHandler::add_session(Session
*session
) {
81 ceph_assert(session
!= nullptr);
83 auto &client
= session
->info
.inst
;
84 dout(10) << ": session=" << session
<< ", client=" << client
<< dendl
;
86 std::scoped_lock
locker(lock
);
88 auto p
= client_metrics_map
.emplace(client
, std::pair(last_updated_seq
, Metrics())).first
;
89 auto &metrics
= p
->second
.second
;
90 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
91 dout(20) << ": metrics=" << metrics
<< dendl
;
94 void MetricsHandler::remove_session(Session
*session
) {
95 ceph_assert(session
!= nullptr);
97 auto &client
= session
->info
.inst
;
98 dout(10) << ": session=" << session
<< ", client=" << client
<< dendl
;
100 std::scoped_lock
locker(lock
);
102 auto it
= client_metrics_map
.find(client
);
103 if (it
== client_metrics_map
.end()) {
107 // if a session got removed before rank 0 saw at least one refresh
108 // update from us or if we will send a remove type update as the
109 // the first "real" update (with an incoming sequence number), then
110 // cut short the update as rank 0 has not witnessed this client session
112 auto lus
= it
->second
.first
;
113 if (lus
== last_updated_seq
) {
114 dout(10) << ": metric lus=" << lus
<< ", last_updated_seq=" << last_updated_seq
116 client_metrics_map
.erase(it
);
120 // zero out all metrics
121 auto &metrics
= it
->second
.second
;
122 metrics
.cap_hit_metric
= { };
123 metrics
.read_latency_metric
= { };
124 metrics
.write_latency_metric
= { };
125 metrics
.metadata_latency_metric
= { };
126 metrics
.dentry_lease_metric
= { };
127 metrics
.opened_files_metric
= { };
128 metrics
.pinned_icaps_metric
= { };
129 metrics
.opened_inodes_metric
= { };
130 metrics
.read_io_sizes_metric
= { };
131 metrics
.write_io_sizes_metric
= { };
132 metrics
.update_type
= UPDATE_TYPE_REMOVE
;
135 void MetricsHandler::set_next_seq(version_t seq
) {
136 dout(20) << ": current sequence number " << next_seq
<< ", setting next sequence number "
141 void MetricsHandler::reset_seq() {
142 dout(10) << ": last_updated_seq=" << last_updated_seq
<< dendl
;
145 for (auto &[client
, metrics_v
] : client_metrics_map
) {
146 dout(10) << ": reset last updated seq for client addr=" << client
<< dendl
;
147 metrics_v
.first
= last_updated_seq
;
151 void MetricsHandler::handle_payload(Session
*session
, const CapInfoPayload
&payload
) {
152 dout(20) << ": type=" << payload
.get_type()
153 << ", session=" << session
<< ", hits=" << payload
.cap_hits
<< ", misses="
154 << payload
.cap_misses
<< dendl
;
156 auto it
= client_metrics_map
.find(session
->info
.inst
);
157 if (it
== client_metrics_map
.end()) {
161 auto &metrics
= it
->second
.second
;
162 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
163 metrics
.cap_hit_metric
.hits
= payload
.cap_hits
;
164 metrics
.cap_hit_metric
.misses
= payload
.cap_misses
;
167 void MetricsHandler::handle_payload(Session
*session
, const ReadLatencyPayload
&payload
) {
168 dout(20) << ": type=" << payload
.get_type()
169 << ", session=" << session
<< ", latency=" << payload
.lat
170 << ", avg=" << payload
.mean
<< ", sq_sum=" << payload
.sq_sum
171 << ", count=" << payload
.count
<< dendl
;
173 auto it
= client_metrics_map
.find(session
->info
.inst
);
174 if (it
== client_metrics_map
.end()) {
178 auto &metrics
= it
->second
.second
;
179 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
180 metrics
.read_latency_metric
.lat
= payload
.lat
;
181 metrics
.read_latency_metric
.mean
= payload
.mean
;
182 metrics
.read_latency_metric
.sq_sum
= payload
.sq_sum
;
183 metrics
.read_latency_metric
.count
= payload
.count
;
184 metrics
.read_latency_metric
.updated
= true;
187 void MetricsHandler::handle_payload(Session
*session
, const WriteLatencyPayload
&payload
) {
188 dout(20) << ": type=" << payload
.get_type()
189 << ", session=" << session
<< ", latency=" << payload
.lat
190 << ", avg=" << payload
.mean
<< ", sq_sum=" << payload
.sq_sum
191 << ", count=" << payload
.count
<< dendl
;
193 auto it
= client_metrics_map
.find(session
->info
.inst
);
194 if (it
== client_metrics_map
.end()) {
198 auto &metrics
= it
->second
.second
;
199 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
200 metrics
.write_latency_metric
.lat
= payload
.lat
;
201 metrics
.write_latency_metric
.mean
= payload
.mean
;
202 metrics
.write_latency_metric
.sq_sum
= payload
.sq_sum
;
203 metrics
.write_latency_metric
.count
= payload
.count
;
204 metrics
.write_latency_metric
.updated
= true;
207 void MetricsHandler::handle_payload(Session
*session
, const MetadataLatencyPayload
&payload
) {
208 dout(20) << ": type=" << payload
.get_type()
209 << ", session=" << session
<< ", latency=" << payload
.lat
210 << ", avg=" << payload
.mean
<< ", sq_sum=" << payload
.sq_sum
211 << ", count=" << payload
.count
<< dendl
;
213 auto it
= client_metrics_map
.find(session
->info
.inst
);
214 if (it
== client_metrics_map
.end()) {
218 auto &metrics
= it
->second
.second
;
219 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
220 metrics
.metadata_latency_metric
.lat
= payload
.lat
;
221 metrics
.metadata_latency_metric
.mean
= payload
.mean
;
222 metrics
.metadata_latency_metric
.sq_sum
= payload
.sq_sum
;
223 metrics
.metadata_latency_metric
.count
= payload
.count
;
224 metrics
.metadata_latency_metric
.updated
= true;
227 void MetricsHandler::handle_payload(Session
*session
, const DentryLeasePayload
&payload
) {
228 dout(20) << ": type=" << payload
.get_type()
229 << ", session=" << session
<< ", hits=" << payload
.dlease_hits
<< ", misses="
230 << payload
.dlease_misses
<< dendl
;
232 auto it
= client_metrics_map
.find(session
->info
.inst
);
233 if (it
== client_metrics_map
.end()) {
237 auto &metrics
= it
->second
.second
;
238 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
239 metrics
.dentry_lease_metric
.hits
= payload
.dlease_hits
;
240 metrics
.dentry_lease_metric
.misses
= payload
.dlease_misses
;
241 metrics
.dentry_lease_metric
.updated
= true;
244 void MetricsHandler::handle_payload(Session
*session
, const OpenedFilesPayload
&payload
) {
245 dout(20) << ": type=" << payload
.get_type()
246 << ", session=" << session
<< ", opened_files=" << payload
.opened_files
247 << ", total_inodes=" << payload
.total_inodes
<< dendl
;
249 auto it
= client_metrics_map
.find(session
->info
.inst
);
250 if (it
== client_metrics_map
.end()) {
254 auto &metrics
= it
->second
.second
;
255 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
256 metrics
.opened_files_metric
.opened_files
= payload
.opened_files
;
257 metrics
.opened_files_metric
.total_inodes
= payload
.total_inodes
;
258 metrics
.opened_files_metric
.updated
= true;
261 void MetricsHandler::handle_payload(Session
*session
, const PinnedIcapsPayload
&payload
) {
262 dout(20) << ": type=" << payload
.get_type()
263 << ", session=" << session
<< ", pinned_icaps=" << payload
.pinned_icaps
264 << ", total_inodes=" << payload
.total_inodes
<< dendl
;
266 auto it
= client_metrics_map
.find(session
->info
.inst
);
267 if (it
== client_metrics_map
.end()) {
271 auto &metrics
= it
->second
.second
;
272 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
273 metrics
.pinned_icaps_metric
.pinned_icaps
= payload
.pinned_icaps
;
274 metrics
.pinned_icaps_metric
.total_inodes
= payload
.total_inodes
;
275 metrics
.pinned_icaps_metric
.updated
= true;
278 void MetricsHandler::handle_payload(Session
*session
, const OpenedInodesPayload
&payload
) {
279 dout(20) << ": type=" << payload
.get_type()
280 << ", session=" << session
<< ", opened_inodes=" << payload
.opened_inodes
281 << ", total_inodes=" << payload
.total_inodes
<< dendl
;
283 auto it
= client_metrics_map
.find(session
->info
.inst
);
284 if (it
== client_metrics_map
.end()) {
288 auto &metrics
= it
->second
.second
;
289 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
290 metrics
.opened_inodes_metric
.opened_inodes
= payload
.opened_inodes
;
291 metrics
.opened_inodes_metric
.total_inodes
= payload
.total_inodes
;
292 metrics
.opened_inodes_metric
.updated
= true;
295 void MetricsHandler::handle_payload(Session
*session
, const ReadIoSizesPayload
&payload
) {
296 dout(20) << ": type=" << payload
.get_type()
297 << ", session=" << session
<< ", total_ops=" << payload
.total_ops
298 << ", total_size=" << payload
.total_size
<< dendl
;
300 auto it
= client_metrics_map
.find(session
->info
.inst
);
301 if (it
== client_metrics_map
.end()) {
305 auto &metrics
= it
->second
.second
;
306 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
307 metrics
.read_io_sizes_metric
.total_ops
= payload
.total_ops
;
308 metrics
.read_io_sizes_metric
.total_size
= payload
.total_size
;
309 metrics
.read_io_sizes_metric
.updated
= true;
312 void MetricsHandler::handle_payload(Session
*session
, const WriteIoSizesPayload
&payload
) {
313 dout(20) << ": type=" << payload
.get_type()
314 << ", session=" << session
<< ", total_ops=" << payload
.total_ops
315 << ", total_size=" << payload
.total_size
<< dendl
;
317 auto it
= client_metrics_map
.find(session
->info
.inst
);
318 if (it
== client_metrics_map
.end()) {
322 auto &metrics
= it
->second
.second
;
323 metrics
.update_type
= UPDATE_TYPE_REFRESH
;
324 metrics
.write_io_sizes_metric
.total_ops
= payload
.total_ops
;
325 metrics
.write_io_sizes_metric
.total_size
= payload
.total_size
;
326 metrics
.write_io_sizes_metric
.updated
= true;
329 void MetricsHandler::handle_payload(Session
*session
, const UnknownPayload
&payload
) {
330 dout(5) << ": type=Unknown, session=" << session
<< ", ignoring unknown payload" << dendl
;
333 void MetricsHandler::handle_client_metrics(const cref_t
<MClientMetrics
> &m
) {
334 std::scoped_lock
locker(lock
);
336 Session
*session
= mds
->get_session(m
);
337 dout(20) << ": session=" << session
<< dendl
;
339 if (session
== nullptr) {
340 dout(10) << ": ignoring session less message" << dendl
;
344 for (auto &metric
: m
->updates
) {
345 boost::apply_visitor(HandlePayloadVisitor(this, session
), metric
.payload
);
349 void MetricsHandler::handle_mds_ping(const cref_t
<MMDSPing
> &m
) {
350 std::scoped_lock
locker(lock
);
351 set_next_seq(m
->seq
);
354 void MetricsHandler::notify_mdsmap(const MDSMap
&mdsmap
) {
357 std::set
<mds_rank_t
> active_set
;
359 std::scoped_lock
locker(lock
);
361 // reset sequence number when rank0 is unavailable or a new
362 // rank0 mds is chosen -- new rank0 will assign a starting
363 // sequence number when it is ready to process metric updates.
364 // this also allows to cut-short metric remove operations to
365 // be satisfied locally in many cases.
367 // update new rank0 address
368 mdsmap
.get_active_mds_set(active_set
);
369 if (!active_set
.count((mds_rank_t
)0)) {
370 dout(10) << ": rank0 is unavailable" << dendl
;
371 addr_rank0
= boost::none
;
376 dout(10) << ": rank0 is mds." << mdsmap
.get_mds_info((mds_rank_t
)0).name
<< dendl
;
378 auto new_rank0_addr
= mdsmap
.get_addrs((mds_rank_t
)0);
379 if (addr_rank0
!= new_rank0_addr
) {
380 dout(10) << ": rank0 addr is now " << new_rank0_addr
<< dendl
;
381 addr_rank0
= new_rank0_addr
;
386 void MetricsHandler::update_rank0() {
390 dout(20) << ": not yet notified with rank0 address, ignoring" << dendl
;
394 metrics_message_t metrics_message
;
395 auto &update_client_metrics_map
= metrics_message
.client_metrics_map
;
397 metrics_message
.seq
= next_seq
;
398 metrics_message
.rank
= mds
->get_nodeid();
400 for (auto p
= client_metrics_map
.begin(); p
!= client_metrics_map
.end();) {
401 // copy metrics and update local metrics map as required
402 auto &metrics
= p
->second
.second
;
403 update_client_metrics_map
.emplace(p
->first
, metrics
);
404 if (metrics
.update_type
== UPDATE_TYPE_REFRESH
) {
408 p
= client_metrics_map
.erase(p
);
412 // only start incrementing when its kicked via set_next_seq()
417 dout(20) << ": sending metric updates for " << update_client_metrics_map
.size()
418 << " clients to rank 0 (address: " << *addr_rank0
<< ") with sequence number "
419 << next_seq
<< ", last updated sequence number " << last_updated_seq
<< dendl
;
421 mds
->send_message_mds(make_message
<MMDSMetrics
>(std::move(metrics_message
)), *addr_rank0
);