]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/MetricsHandler.cc
import ceph 16.2.6
[ceph.git] / ceph / src / mds / MetricsHandler.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "common/debug.h"
5#include "common/errno.h"
6
7#include "messages/MMDSMetrics.h"
8
9#include "MDSRank.h"
10#include "SessionMap.h"
11#include "MetricsHandler.h"
12
13#define dout_context g_ceph_context
14#define dout_subsys ceph_subsys_mds
15#undef dout_prefix
16#define dout_prefix *_dout << __func__ << ": mds.metrics"
17
18MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds)
19 : Dispatcher(cct),
20 mds(mds) {
21}
22
23bool MetricsHandler::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
24 return m->get_type() == CEPH_MSG_CLIENT_METRICS || m->get_type() == MSG_MDS_PING;
25}
26
27void MetricsHandler::ms_fast_dispatch2(const ref_t<Message> &m) {
28 bool handled = ms_dispatch2(m);
29 ceph_assert(handled);
30}
31
32bool MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
33 if (m->get_type() == CEPH_MSG_CLIENT_METRICS &&
34 m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
35 handle_client_metrics(ref_cast<MClientMetrics>(m));
36 return true;
37 } else if (m->get_type() == MSG_MDS_PING &&
38 m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
39 const Message *msg = m.get();
40 const MMDSOp *op = dynamic_cast<const MMDSOp*>(msg);
41 if (!op)
42 dout(0) << typeid(*msg).name() << " is not an MMDSOp type" << dendl;
43 ceph_assert(op);
44 handle_mds_ping(ref_cast<MMDSPing>(m));
45 return true;
46 }
47 return false;
48}
49
50void MetricsHandler::init() {
51 dout(10) << dendl;
52
53 updater = std::thread([this]() {
54 std::unique_lock locker(lock);
55 while (!stopping) {
56 double after = g_conf().get_val<std::chrono::seconds>("mds_metrics_update_interval").count();
57 locker.unlock();
58 sleep(after);
59 locker.lock();
60 update_rank0();
61 }
62 });
63}
64
65void MetricsHandler::shutdown() {
66 dout(10) << dendl;
67
68 {
69 std::scoped_lock locker(lock);
70 ceph_assert(!stopping);
71 stopping = true;
72 }
73
74 if (updater.joinable()) {
75 updater.join();
76 }
77}
78
79
80void MetricsHandler::add_session(Session *session) {
81 ceph_assert(session != nullptr);
82
83 auto &client = session->info.inst;
84 dout(10) << ": session=" << session << ", client=" << client << dendl;
85
86 std::scoped_lock locker(lock);
87
88 auto p = client_metrics_map.emplace(client, std::pair(last_updated_seq, Metrics())).first;
89 auto &metrics = p->second.second;
90 metrics.update_type = UPDATE_TYPE_REFRESH;
91 dout(20) << ": metrics=" << metrics << dendl;
92}
93
94void MetricsHandler::remove_session(Session *session) {
95 ceph_assert(session != nullptr);
96
97 auto &client = session->info.inst;
98 dout(10) << ": session=" << session << ", client=" << client << dendl;
99
100 std::scoped_lock locker(lock);
101
102 auto it = client_metrics_map.find(client);
103 if (it == client_metrics_map.end()) {
104 return;
105 }
106
107 // if a session got removed before rank 0 saw at least one refresh
108 // update from us or if we will send a remove type update as the
109 // the first "real" update (with an incoming sequence number), then
110 // cut short the update as rank 0 has not witnessed this client session
111 // update this rank.
112 auto lus = it->second.first;
113 if (lus == last_updated_seq) {
114 dout(10) << ": metric lus=" << lus << ", last_updated_seq=" << last_updated_seq
115 << dendl;
116 client_metrics_map.erase(it);
117 return;
118 }
119
120 // zero out all metrics
121 auto &metrics = it->second.second;
122 metrics.cap_hit_metric = { };
123 metrics.read_latency_metric = { };
124 metrics.write_latency_metric = { };
125 metrics.metadata_latency_metric = { };
126 metrics.dentry_lease_metric = { };
127 metrics.opened_files_metric = { };
128 metrics.pinned_icaps_metric = { };
129 metrics.opened_inodes_metric = { };
130 metrics.update_type = UPDATE_TYPE_REMOVE;
131}
132
133void MetricsHandler::set_next_seq(version_t seq) {
134 dout(20) << ": current sequence number " << next_seq << ", setting next sequence number "
135 << seq << dendl;
136 next_seq = seq;
137}
138
139void MetricsHandler::reset_seq() {
140 dout(10) << ": last_updated_seq=" << last_updated_seq << dendl;
141
142 set_next_seq(0);
143 for (auto &[client, metrics_v] : client_metrics_map) {
144 dout(10) << ": reset last updated seq for client addr=" << client << dendl;
145 metrics_v.first = last_updated_seq;
146 }
147}
148
149void MetricsHandler::handle_payload(Session *session, const CapInfoPayload &payload) {
522d829b 150 dout(20) << ": type=" << payload.get_type()
f67539c2
TL
151 << ", session=" << session << ", hits=" << payload.cap_hits << ", misses="
152 << payload.cap_misses << dendl;
153
154 auto it = client_metrics_map.find(session->info.inst);
155 if (it == client_metrics_map.end()) {
156 return;
157 }
158
159 auto &metrics = it->second.second;
160 metrics.update_type = UPDATE_TYPE_REFRESH;
161 metrics.cap_hit_metric.hits = payload.cap_hits;
162 metrics.cap_hit_metric.misses = payload.cap_misses;
163}
164
165void MetricsHandler::handle_payload(Session *session, const ReadLatencyPayload &payload) {
522d829b 166 dout(20) << ": type=" << payload.get_type()
f67539c2
TL
167 << ", session=" << session << ", latency=" << payload.lat << dendl;
168
169 auto it = client_metrics_map.find(session->info.inst);
170 if (it == client_metrics_map.end()) {
171 return;
172 }
173
174 auto &metrics = it->second.second;
175 metrics.update_type = UPDATE_TYPE_REFRESH;
176 metrics.read_latency_metric.lat = payload.lat;
177 metrics.read_latency_metric.updated = true;
178}
179
180void MetricsHandler::handle_payload(Session *session, const WriteLatencyPayload &payload) {
522d829b 181 dout(20) << ": type=" << payload.get_type()
f67539c2
TL
182 << ", session=" << session << ", latency=" << payload.lat << dendl;
183
184 auto it = client_metrics_map.find(session->info.inst);
185 if (it == client_metrics_map.end()) {
186 return;
187 }
188
189 auto &metrics = it->second.second;
190 metrics.update_type = UPDATE_TYPE_REFRESH;
191 metrics.write_latency_metric.lat = payload.lat;
192 metrics.write_latency_metric.updated = true;
193}
194
195void MetricsHandler::handle_payload(Session *session, const MetadataLatencyPayload &payload) {
522d829b
TL
196 dout(20) << ": type=" << payload.get_type()
197 << ", session=" << session << ", latency=" << payload.lat << dendl;
f67539c2
TL
198
199 auto it = client_metrics_map.find(session->info.inst);
200 if (it == client_metrics_map.end()) {
201 return;
202 }
203
204 auto &metrics = it->second.second;
205 metrics.update_type = UPDATE_TYPE_REFRESH;
206 metrics.metadata_latency_metric.lat = payload.lat;
207 metrics.metadata_latency_metric.updated = true;
208}
209
210void MetricsHandler::handle_payload(Session *session, const DentryLeasePayload &payload) {
522d829b 211 dout(20) << ": type=" << payload.get_type()
f67539c2
TL
212 << ", session=" << session << ", hits=" << payload.dlease_hits << ", misses="
213 << payload.dlease_misses << dendl;
214
215 auto it = client_metrics_map.find(session->info.inst);
216 if (it == client_metrics_map.end()) {
217 return;
218 }
219
220 auto &metrics = it->second.second;
221 metrics.update_type = UPDATE_TYPE_REFRESH;
222 metrics.dentry_lease_metric.hits = payload.dlease_hits;
223 metrics.dentry_lease_metric.misses = payload.dlease_misses;
224 metrics.dentry_lease_metric.updated = true;
225}
226
227void MetricsHandler::handle_payload(Session *session, const OpenedFilesPayload &payload) {
522d829b 228 dout(20) << ": type=" << payload.get_type()
f67539c2
TL
229 << ", session=" << session << ", opened_files=" << payload.opened_files
230 << ", total_inodes=" << payload.total_inodes << dendl;
231
232 auto it = client_metrics_map.find(session->info.inst);
233 if (it == client_metrics_map.end()) {
234 return;
235 }
236
237 auto &metrics = it->second.second;
238 metrics.update_type = UPDATE_TYPE_REFRESH;
239 metrics.opened_files_metric.opened_files = payload.opened_files;
240 metrics.opened_files_metric.total_inodes = payload.total_inodes;
241 metrics.opened_files_metric.updated = true;
242}
243
244void MetricsHandler::handle_payload(Session *session, const PinnedIcapsPayload &payload) {
522d829b 245 dout(20) << ": type=" << payload.get_type()
f67539c2
TL
246 << ", session=" << session << ", pinned_icaps=" << payload.pinned_icaps
247 << ", total_inodes=" << payload.total_inodes << dendl;
248
249 auto it = client_metrics_map.find(session->info.inst);
250 if (it == client_metrics_map.end()) {
251 return;
252 }
253
254 auto &metrics = it->second.second;
255 metrics.update_type = UPDATE_TYPE_REFRESH;
256 metrics.pinned_icaps_metric.pinned_icaps = payload.pinned_icaps;
257 metrics.pinned_icaps_metric.total_inodes = payload.total_inodes;
258 metrics.pinned_icaps_metric.updated = true;
259}
260
261void MetricsHandler::handle_payload(Session *session, const OpenedInodesPayload &payload) {
522d829b 262 dout(20) << ": type=" << payload.get_type()
f67539c2
TL
263 << ", session=" << session << ", opened_inodes=" << payload.opened_inodes
264 << ", total_inodes=" << payload.total_inodes << dendl;
265
266 auto it = client_metrics_map.find(session->info.inst);
267 if (it == client_metrics_map.end()) {
268 return;
269 }
270
271 auto &metrics = it->second.second;
272 metrics.update_type = UPDATE_TYPE_REFRESH;
273 metrics.opened_inodes_metric.opened_inodes = payload.opened_inodes;
274 metrics.opened_inodes_metric.total_inodes = payload.total_inodes;
275 metrics.opened_inodes_metric.updated = true;
276}
277
278void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) {
279 dout(5) << ": type=Unknown, session=" << session << ", ignoring unknown payload" << dendl;
280}
281
282void MetricsHandler::handle_client_metrics(const cref_t<MClientMetrics> &m) {
283 std::scoped_lock locker(lock);
284
285 Session *session = mds->get_session(m);
286 dout(20) << ": session=" << session << dendl;
287
288 if (session == nullptr) {
289 dout(10) << ": ignoring session less message" << dendl;
290 return;
291 }
292
293 for (auto &metric : m->updates) {
294 boost::apply_visitor(HandlePayloadVisitor(this, session), metric.payload);
295 }
296}
297
298void MetricsHandler::handle_mds_ping(const cref_t<MMDSPing> &m) {
299 std::scoped_lock locker(lock);
300 set_next_seq(m->seq);
301}
302
303void MetricsHandler::notify_mdsmap(const MDSMap &mdsmap) {
304 dout(10) << dendl;
305
306 std::set<mds_rank_t> active_set;
307
308 std::scoped_lock locker(lock);
309
310 // reset sequence number when rank0 is unavailable or a new
311 // rank0 mds is chosen -- new rank0 will assign a starting
312 // sequence number when it is ready to process metric updates.
313 // this also allows to cut-short metric remove operations to
314 // be satisfied locally in many cases.
315
316 // update new rank0 address
317 mdsmap.get_active_mds_set(active_set);
318 if (!active_set.count((mds_rank_t)0)) {
319 dout(10) << ": rank0 is unavailable" << dendl;
320 addr_rank0 = boost::none;
321 reset_seq();
322 return;
323 }
324
325 dout(10) << ": rank0 is mds." << mdsmap.get_mds_info((mds_rank_t)0).name << dendl;
326
327 auto new_rank0_addr = mdsmap.get_addrs((mds_rank_t)0);
328 if (addr_rank0 != new_rank0_addr) {
329 dout(10) << ": rank0 addr is now " << new_rank0_addr << dendl;
330 addr_rank0 = new_rank0_addr;
331 reset_seq();
332 }
333}
334
335void MetricsHandler::update_rank0() {
336 dout(20) << dendl;
337
338 if (!addr_rank0) {
339 dout(20) << ": not yet notified with rank0 address, ignoring" << dendl;
340 return;
341 }
342
343 metrics_message_t metrics_message;
344 auto &update_client_metrics_map = metrics_message.client_metrics_map;
345
346 metrics_message.seq = next_seq;
347 metrics_message.rank = mds->get_nodeid();
348
349 for (auto p = client_metrics_map.begin(); p != client_metrics_map.end();) {
350 // copy metrics and update local metrics map as required
351 auto &metrics = p->second.second;
352 update_client_metrics_map.emplace(p->first, metrics);
353 if (metrics.update_type == UPDATE_TYPE_REFRESH) {
354 metrics = {};
355 ++p;
356 } else {
357 p = client_metrics_map.erase(p);
358 }
359 }
360
361 // only start incrementing when its kicked via set_next_seq()
362 if (next_seq != 0) {
363 ++last_updated_seq;
364 }
365
366 dout(20) << ": sending metric updates for " << update_client_metrics_map.size()
367 << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number "
368 << next_seq << ", last updated sequence number " << last_updated_seq << dendl;
369
370 mds->send_message_mds(make_message<MMDSMetrics>(std::move(metrics_message)), *addr_rank0);
371}