]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MetricsHandler.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / mds / MetricsHandler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/debug.h"
5 #include "common/errno.h"
6
7 #include "messages/MMDSMetrics.h"
8
9 #include "MDSRank.h"
10 #include "SessionMap.h"
11 #include "MetricsHandler.h"
12
13 #define dout_context g_ceph_context
14 #define dout_subsys ceph_subsys_mds
15 #undef dout_prefix
16 #define dout_prefix *_dout << __func__ << ": mds.metrics"
17
18 MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds)
19 : Dispatcher(cct),
20 mds(mds) {
21 }
22
23 bool MetricsHandler::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
24 return m->get_type() == CEPH_MSG_CLIENT_METRICS || m->get_type() == MSG_MDS_PING;
25 }
26
27 void MetricsHandler::ms_fast_dispatch2(const ref_t<Message> &m) {
28 bool handled = ms_dispatch2(m);
29 ceph_assert(handled);
30 }
31
32 bool MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
33 if (m->get_type() == CEPH_MSG_CLIENT_METRICS &&
34 m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
35 handle_client_metrics(ref_cast<MClientMetrics>(m));
36 return true;
37 } else if (m->get_type() == MSG_MDS_PING &&
38 m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
39 const Message *msg = m.get();
40 const MMDSOp *op = dynamic_cast<const MMDSOp*>(msg);
41 if (!op)
42 dout(0) << typeid(*msg).name() << " is not an MMDSOp type" << dendl;
43 ceph_assert(op);
44 handle_mds_ping(ref_cast<MMDSPing>(m));
45 return true;
46 }
47 return false;
48 }
49
50 void MetricsHandler::init() {
51 dout(10) << dendl;
52
53 updater = std::thread([this]() {
54 std::unique_lock locker(lock);
55 while (!stopping) {
56 double after = g_conf().get_val<std::chrono::seconds>("mds_metrics_update_interval").count();
57 locker.unlock();
58 sleep(after);
59 locker.lock();
60 update_rank0();
61 }
62 });
63 }
64
65 void MetricsHandler::shutdown() {
66 dout(10) << dendl;
67
68 {
69 std::scoped_lock locker(lock);
70 ceph_assert(!stopping);
71 stopping = true;
72 }
73
74 if (updater.joinable()) {
75 updater.join();
76 }
77 }
78
79
80 void MetricsHandler::add_session(Session *session) {
81 ceph_assert(session != nullptr);
82
83 auto &client = session->info.inst;
84 dout(10) << ": session=" << session << ", client=" << client << dendl;
85
86 std::scoped_lock locker(lock);
87
88 auto p = client_metrics_map.emplace(client, std::pair(last_updated_seq, Metrics())).first;
89 auto &metrics = p->second.second;
90 metrics.update_type = UPDATE_TYPE_REFRESH;
91 dout(20) << ": metrics=" << metrics << dendl;
92 }
93
94 void MetricsHandler::remove_session(Session *session) {
95 ceph_assert(session != nullptr);
96
97 auto &client = session->info.inst;
98 dout(10) << ": session=" << session << ", client=" << client << dendl;
99
100 std::scoped_lock locker(lock);
101
102 auto it = client_metrics_map.find(client);
103 if (it == client_metrics_map.end()) {
104 return;
105 }
106
107 // if a session got removed before rank 0 saw at least one refresh
108 // update from us or if we will send a remove type update as the
109 // the first "real" update (with an incoming sequence number), then
110 // cut short the update as rank 0 has not witnessed this client session
111 // update this rank.
112 auto lus = it->second.first;
113 if (lus == last_updated_seq) {
114 dout(10) << ": metric lus=" << lus << ", last_updated_seq=" << last_updated_seq
115 << dendl;
116 client_metrics_map.erase(it);
117 return;
118 }
119
120 // zero out all metrics
121 auto &metrics = it->second.second;
122 metrics.cap_hit_metric = { };
123 metrics.read_latency_metric = { };
124 metrics.write_latency_metric = { };
125 metrics.metadata_latency_metric = { };
126 metrics.dentry_lease_metric = { };
127 metrics.opened_files_metric = { };
128 metrics.pinned_icaps_metric = { };
129 metrics.opened_inodes_metric = { };
130 metrics.read_io_sizes_metric = { };
131 metrics.write_io_sizes_metric = { };
132 metrics.update_type = UPDATE_TYPE_REMOVE;
133 }
134
135 void MetricsHandler::set_next_seq(version_t seq) {
136 dout(20) << ": current sequence number " << next_seq << ", setting next sequence number "
137 << seq << dendl;
138 next_seq = seq;
139 }
140
141 void MetricsHandler::reset_seq() {
142 dout(10) << ": last_updated_seq=" << last_updated_seq << dendl;
143
144 set_next_seq(0);
145 for (auto &[client, metrics_v] : client_metrics_map) {
146 dout(10) << ": reset last updated seq for client addr=" << client << dendl;
147 metrics_v.first = last_updated_seq;
148 }
149 }
150
151 void MetricsHandler::handle_payload(Session *session, const CapInfoPayload &payload) {
152 dout(20) << ": type=" << payload.get_type()
153 << ", session=" << session << ", hits=" << payload.cap_hits << ", misses="
154 << payload.cap_misses << dendl;
155
156 auto it = client_metrics_map.find(session->info.inst);
157 if (it == client_metrics_map.end()) {
158 return;
159 }
160
161 auto &metrics = it->second.second;
162 metrics.update_type = UPDATE_TYPE_REFRESH;
163 metrics.cap_hit_metric.hits = payload.cap_hits;
164 metrics.cap_hit_metric.misses = payload.cap_misses;
165 }
166
167 void MetricsHandler::handle_payload(Session *session, const ReadLatencyPayload &payload) {
168 dout(20) << ": type=" << payload.get_type()
169 << ", session=" << session << ", latency=" << payload.lat
170 << ", avg=" << payload.mean << ", sq_sum=" << payload.sq_sum
171 << ", count=" << payload.count << dendl;
172
173 auto it = client_metrics_map.find(session->info.inst);
174 if (it == client_metrics_map.end()) {
175 return;
176 }
177
178 auto &metrics = it->second.second;
179 metrics.update_type = UPDATE_TYPE_REFRESH;
180 metrics.read_latency_metric.lat = payload.lat;
181 metrics.read_latency_metric.mean = payload.mean;
182 metrics.read_latency_metric.sq_sum = payload.sq_sum;
183 metrics.read_latency_metric.count = payload.count;
184 metrics.read_latency_metric.updated = true;
185 }
186
187 void MetricsHandler::handle_payload(Session *session, const WriteLatencyPayload &payload) {
188 dout(20) << ": type=" << payload.get_type()
189 << ", session=" << session << ", latency=" << payload.lat
190 << ", avg=" << payload.mean << ", sq_sum=" << payload.sq_sum
191 << ", count=" << payload.count << dendl;
192
193 auto it = client_metrics_map.find(session->info.inst);
194 if (it == client_metrics_map.end()) {
195 return;
196 }
197
198 auto &metrics = it->second.second;
199 metrics.update_type = UPDATE_TYPE_REFRESH;
200 metrics.write_latency_metric.lat = payload.lat;
201 metrics.write_latency_metric.mean = payload.mean;
202 metrics.write_latency_metric.sq_sum = payload.sq_sum;
203 metrics.write_latency_metric.count = payload.count;
204 metrics.write_latency_metric.updated = true;
205 }
206
207 void MetricsHandler::handle_payload(Session *session, const MetadataLatencyPayload &payload) {
208 dout(20) << ": type=" << payload.get_type()
209 << ", session=" << session << ", latency=" << payload.lat
210 << ", avg=" << payload.mean << ", sq_sum=" << payload.sq_sum
211 << ", count=" << payload.count << dendl;
212
213 auto it = client_metrics_map.find(session->info.inst);
214 if (it == client_metrics_map.end()) {
215 return;
216 }
217
218 auto &metrics = it->second.second;
219 metrics.update_type = UPDATE_TYPE_REFRESH;
220 metrics.metadata_latency_metric.lat = payload.lat;
221 metrics.metadata_latency_metric.mean = payload.mean;
222 metrics.metadata_latency_metric.sq_sum = payload.sq_sum;
223 metrics.metadata_latency_metric.count = payload.count;
224 metrics.metadata_latency_metric.updated = true;
225 }
226
227 void MetricsHandler::handle_payload(Session *session, const DentryLeasePayload &payload) {
228 dout(20) << ": type=" << payload.get_type()
229 << ", session=" << session << ", hits=" << payload.dlease_hits << ", misses="
230 << payload.dlease_misses << dendl;
231
232 auto it = client_metrics_map.find(session->info.inst);
233 if (it == client_metrics_map.end()) {
234 return;
235 }
236
237 auto &metrics = it->second.second;
238 metrics.update_type = UPDATE_TYPE_REFRESH;
239 metrics.dentry_lease_metric.hits = payload.dlease_hits;
240 metrics.dentry_lease_metric.misses = payload.dlease_misses;
241 metrics.dentry_lease_metric.updated = true;
242 }
243
244 void MetricsHandler::handle_payload(Session *session, const OpenedFilesPayload &payload) {
245 dout(20) << ": type=" << payload.get_type()
246 << ", session=" << session << ", opened_files=" << payload.opened_files
247 << ", total_inodes=" << payload.total_inodes << dendl;
248
249 auto it = client_metrics_map.find(session->info.inst);
250 if (it == client_metrics_map.end()) {
251 return;
252 }
253
254 auto &metrics = it->second.second;
255 metrics.update_type = UPDATE_TYPE_REFRESH;
256 metrics.opened_files_metric.opened_files = payload.opened_files;
257 metrics.opened_files_metric.total_inodes = payload.total_inodes;
258 metrics.opened_files_metric.updated = true;
259 }
260
261 void MetricsHandler::handle_payload(Session *session, const PinnedIcapsPayload &payload) {
262 dout(20) << ": type=" << payload.get_type()
263 << ", session=" << session << ", pinned_icaps=" << payload.pinned_icaps
264 << ", total_inodes=" << payload.total_inodes << dendl;
265
266 auto it = client_metrics_map.find(session->info.inst);
267 if (it == client_metrics_map.end()) {
268 return;
269 }
270
271 auto &metrics = it->second.second;
272 metrics.update_type = UPDATE_TYPE_REFRESH;
273 metrics.pinned_icaps_metric.pinned_icaps = payload.pinned_icaps;
274 metrics.pinned_icaps_metric.total_inodes = payload.total_inodes;
275 metrics.pinned_icaps_metric.updated = true;
276 }
277
278 void MetricsHandler::handle_payload(Session *session, const OpenedInodesPayload &payload) {
279 dout(20) << ": type=" << payload.get_type()
280 << ", session=" << session << ", opened_inodes=" << payload.opened_inodes
281 << ", total_inodes=" << payload.total_inodes << dendl;
282
283 auto it = client_metrics_map.find(session->info.inst);
284 if (it == client_metrics_map.end()) {
285 return;
286 }
287
288 auto &metrics = it->second.second;
289 metrics.update_type = UPDATE_TYPE_REFRESH;
290 metrics.opened_inodes_metric.opened_inodes = payload.opened_inodes;
291 metrics.opened_inodes_metric.total_inodes = payload.total_inodes;
292 metrics.opened_inodes_metric.updated = true;
293 }
294
295 void MetricsHandler::handle_payload(Session *session, const ReadIoSizesPayload &payload) {
296 dout(20) << ": type=" << payload.get_type()
297 << ", session=" << session << ", total_ops=" << payload.total_ops
298 << ", total_size=" << payload.total_size << dendl;
299
300 auto it = client_metrics_map.find(session->info.inst);
301 if (it == client_metrics_map.end()) {
302 return;
303 }
304
305 auto &metrics = it->second.second;
306 metrics.update_type = UPDATE_TYPE_REFRESH;
307 metrics.read_io_sizes_metric.total_ops = payload.total_ops;
308 metrics.read_io_sizes_metric.total_size = payload.total_size;
309 metrics.read_io_sizes_metric.updated = true;
310 }
311
312 void MetricsHandler::handle_payload(Session *session, const WriteIoSizesPayload &payload) {
313 dout(20) << ": type=" << payload.get_type()
314 << ", session=" << session << ", total_ops=" << payload.total_ops
315 << ", total_size=" << payload.total_size << dendl;
316
317 auto it = client_metrics_map.find(session->info.inst);
318 if (it == client_metrics_map.end()) {
319 return;
320 }
321
322 auto &metrics = it->second.second;
323 metrics.update_type = UPDATE_TYPE_REFRESH;
324 metrics.write_io_sizes_metric.total_ops = payload.total_ops;
325 metrics.write_io_sizes_metric.total_size = payload.total_size;
326 metrics.write_io_sizes_metric.updated = true;
327 }
328
329 void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) {
330 dout(5) << ": type=Unknown, session=" << session << ", ignoring unknown payload" << dendl;
331 }
332
333 void MetricsHandler::handle_client_metrics(const cref_t<MClientMetrics> &m) {
334 std::scoped_lock locker(lock);
335
336 Session *session = mds->get_session(m);
337 dout(20) << ": session=" << session << dendl;
338
339 if (session == nullptr) {
340 dout(10) << ": ignoring session less message" << dendl;
341 return;
342 }
343
344 for (auto &metric : m->updates) {
345 boost::apply_visitor(HandlePayloadVisitor(this, session), metric.payload);
346 }
347 }
348
349 void MetricsHandler::handle_mds_ping(const cref_t<MMDSPing> &m) {
350 std::scoped_lock locker(lock);
351 set_next_seq(m->seq);
352 }
353
354 void MetricsHandler::notify_mdsmap(const MDSMap &mdsmap) {
355 dout(10) << dendl;
356
357 std::set<mds_rank_t> active_set;
358
359 std::scoped_lock locker(lock);
360
361 // reset sequence number when rank0 is unavailable or a new
362 // rank0 mds is chosen -- new rank0 will assign a starting
363 // sequence number when it is ready to process metric updates.
364 // this also allows to cut-short metric remove operations to
365 // be satisfied locally in many cases.
366
367 // update new rank0 address
368 mdsmap.get_active_mds_set(active_set);
369 if (!active_set.count((mds_rank_t)0)) {
370 dout(10) << ": rank0 is unavailable" << dendl;
371 addr_rank0 = boost::none;
372 reset_seq();
373 return;
374 }
375
376 dout(10) << ": rank0 is mds." << mdsmap.get_mds_info((mds_rank_t)0).name << dendl;
377
378 auto new_rank0_addr = mdsmap.get_addrs((mds_rank_t)0);
379 if (addr_rank0 != new_rank0_addr) {
380 dout(10) << ": rank0 addr is now " << new_rank0_addr << dendl;
381 addr_rank0 = new_rank0_addr;
382 reset_seq();
383 }
384 }
385
386 void MetricsHandler::update_rank0() {
387 dout(20) << dendl;
388
389 if (!addr_rank0) {
390 dout(20) << ": not yet notified with rank0 address, ignoring" << dendl;
391 return;
392 }
393
394 metrics_message_t metrics_message;
395 auto &update_client_metrics_map = metrics_message.client_metrics_map;
396
397 metrics_message.seq = next_seq;
398 metrics_message.rank = mds->get_nodeid();
399
400 for (auto p = client_metrics_map.begin(); p != client_metrics_map.end();) {
401 // copy metrics and update local metrics map as required
402 auto &metrics = p->second.second;
403 update_client_metrics_map.emplace(p->first, metrics);
404 if (metrics.update_type == UPDATE_TYPE_REFRESH) {
405 metrics = {};
406 ++p;
407 } else {
408 p = client_metrics_map.erase(p);
409 }
410 }
411
412 // only start incrementing when its kicked via set_next_seq()
413 if (next_seq != 0) {
414 ++last_updated_seq;
415 }
416
417 dout(20) << ": sending metric updates for " << update_client_metrics_map.size()
418 << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number "
419 << next_seq << ", last updated sequence number " << last_updated_seq << dendl;
420
421 mds->send_message_mds(make_message<MMDSMetrics>(std::move(metrics_message)), *addr_rank0);
422 }