]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/stats/fs/perf_stats.py
4e06f1fc64db23702ffc17f4b236ed6dd2d0aaff
[ceph.git] / ceph / src / pybind / mgr / stats / fs / perf_stats.py
1 import re
2 import json
3 import time
4 import uuid
5 import errno
6 import traceback
7 import logging
8 from collections import OrderedDict
9 from typing import List, Dict, Set
10
11 from mgr_module import CommandResult
12
13 from datetime import datetime, timedelta
14 from threading import Lock, Condition, Thread, Timer
15 from ipaddress import ip_address
16
17 PERF_STATS_VERSION = 1
18
19 QUERY_IDS = "query_ids"
20 GLOBAL_QUERY_ID = "global_query_id"
21 QUERY_LAST_REQUEST = "last_time_stamp"
22 QUERY_RAW_COUNTERS = "query_raw_counters"
23 QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
24
25 MDS_RANK_ALL = (-1,)
26 CLIENT_ID_ALL = "\d*"
27 CLIENT_IP_ALL = ".*"
28
29 MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
30 MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
31 MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
32 'read_latency': 1,
33 'write_latency': 2,
34 'metadata_latency': 3,
35 'dentry_lease': 4,
36 'opened_files': 5,
37 'pinned_icaps': 6,
38 'opened_inodes': 7,
39 'read_io_sizes': 8,
40 'write_io_sizes': 9,
41 'avg_read_latency': 10,
42 'stdev_read_latency': 11,
43 'avg_write_latency': 12,
44 'stdev_write_latency': 13,
45 'avg_metadata_latency': 14,
46 'stdev_metadata_latency': 15})
47 MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
48 MDS_GLOBAL_PERF_QUERY_COUNTERS = list(MDS_PERF_QUERY_COUNTERS_MAP.keys())
49
50 QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
51 REREGISTER_TIMER_INTERVAL = 1
52
53 CLIENT_METADATA_KEY = "client_metadata"
54 CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
55 CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
56
57 NON_EXISTENT_KEY_STR = "N/A"
58
59 logger = logging.getLogger(__name__)
60
61 class FilterSpec(object):
62 """
63 query filters encapsulated and used as key for query map
64 """
65 def __init__(self, mds_ranks, client_id, client_ip):
66 self.mds_ranks = mds_ranks
67 self.client_id = client_id
68 self.client_ip = client_ip
69
70 def __hash__(self):
71 return hash((self.mds_ranks, self.client_id, self.client_ip))
72
73 def __eq__(self, other):
74 return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip)
75
76 def __ne__(self, other):
77 return not(self == other)
78
79 def extract_mds_ranks_from_spec(mds_rank_spec):
80 if not mds_rank_spec:
81 return MDS_RANK_ALL
82 match = re.match(r'^\d+(,\d+)*$', mds_rank_spec)
83 if not match:
84 raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec))
85 return tuple(int(mds_rank) for mds_rank in match.group(0).split(','))
86
87 def extract_client_id_from_spec(client_id_spec):
88 if not client_id_spec:
89 return CLIENT_ID_ALL
90 # the client id is the spec itself since it'll be a part
91 # of client filter regex.
92 if not client_id_spec.isdigit():
93 raise ValueError('invalid client_id filter spec: {}'.format(client_id_spec))
94 return client_id_spec
95
96 def extract_client_ip_from_spec(client_ip_spec):
97 if not client_ip_spec:
98 return CLIENT_IP_ALL
99
100 client_ip = client_ip_spec
101 if client_ip.startswith('v1:'):
102 client_ip = client_ip.replace('v1:', '')
103 elif client_ip.startswith('v2:'):
104 client_ip = client_ip.replace('v2:', '')
105
106 try:
107 ip_address(client_ip)
108 return client_ip_spec
109 except ValueError:
110 raise ValueError('invalid client_ip filter spec: {}'.format(client_ip_spec))
111
112 def extract_mds_ranks_from_report(mds_ranks_str):
113 if not mds_ranks_str:
114 return []
115 return [int(x) for x in mds_ranks_str.split(',')]
116
117 def extract_client_id_and_ip(client):
118 match = re.match(r'^(client\.\d+)\s(.*)', client)
119 if match:
120 return match.group(1), match.group(2)
121 return None, None
122
123 class FSPerfStats(object):
124 lock = Lock()
125 q_cv = Condition(lock)
126 r_cv = Condition(lock)
127
128 user_queries = {} # type: Dict[str, Dict]
129
130 meta_lock = Lock()
131 rqtimer = None
132 client_metadata = {
133 'metadata' : {},
134 'to_purge' : set(),
135 'in_progress' : {},
136 } # type: Dict
137
138 def __init__(self, module):
139 self.module = module
140 self.log = module.log
141 self.prev_rank0_gid = None
142 # report processor thread
143 self.report_processor = Thread(target=self.run)
144 self.report_processor.start()
145
146 def set_client_metadata(self, client_id, key, meta):
147 result = self.client_metadata['metadata'].setdefault(client_id, {})
148 if not key in result or not result[key] == meta:
149 result[key] = meta
150
151 def notify_cmd(self, cmdtag):
152 self.log.debug("cmdtag={0}".format(cmdtag))
153 with self.meta_lock:
154 try:
155 result = self.client_metadata['in_progress'].pop(cmdtag)
156 except KeyError:
157 self.log.warn(f"cmdtag {cmdtag} not found in client metadata")
158 return
159
160 client_meta = result[1].wait()
161 if client_meta[0] != 0:
162 self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format(
163 result[0], client_meta[2]))
164 return
165 self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
166 for metadata in json.loads(client_meta[1]):
167 client_id = "client.{0}".format(metadata['id'])
168 result = self.client_metadata['metadata'].setdefault(client_id, {})
169 for subkey in CLIENT_METADATA_SUBKEYS:
170 self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
171 for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
172 self.set_client_metadata(client_id, subkey,
173 metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
174 metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
175 supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
176 self.set_client_metadata(client_id, "valid_metrics", supported_metrics)
177 kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None)
178 if kver:
179 self.set_client_metadata(client_id, "kernel_version", kver)
180 # when all async requests are done, purge clients metadata if any.
181 if not self.client_metadata['in_progress']:
182 for client in self.client_metadata['to_purge']:
183 try:
184 self.log.info("purge client metadata for {0}".format(client))
185 self.client_metadata['metadata'].remove(client)
186 except:
187 pass
188 self.client_metadata['to_purge'].clear()
189 self.log.debug("client_metadata={0}, to_purge={1}".format(
190 self.client_metadata['metadata'], self.client_metadata['to_purge']))
191
192 def notify_fsmap(self):
193 #Reregister the user queries when there is a new rank0 mds
194 with self.lock:
195 gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map'))
196 if not gid_state:
197 return
198 rank0_gid, state = gid_state
199 if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'):
200 #the new rank0 MDS is up:active
201 ua_last_updated = time.monotonic()
202 if (self.rqtimer and self.rqtimer.is_alive()):
203 self.rqtimer.cancel()
204 self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
205 self.re_register_queries, args=(rank0_gid, ua_last_updated,))
206 self.rqtimer.start()
207
208 def re_register_queries(self, rank0_gid, ua_last_updated):
209 #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
210 #wait for the empty metrics
211 with self.lock:
212 if self.mx_last_updated >= ua_last_updated:
213 self.log.debug("reregistering queries...")
214 self.module.reregister_mds_perf_queries()
215 self.prev_rank0_gid = rank0_gid
216 else:
217 #reschedule the timer
218 self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
219 self.re_register_queries, args=(rank0_gid, ua_last_updated,))
220 self.rqtimer.start()
221
222 @staticmethod
223 def get_rank0_mds_gid_state(fsmap):
224 for fs in fsmap['filesystems']:
225 mds_map = fs['mdsmap']
226 if mds_map is not None:
227 for mds_id, mds_status in mds_map['info'].items():
228 if mds_status['rank'] == 0:
229 return mds_status['gid'], mds_status['state']
230 logger.warn("No rank0 mds in the fsmap")
231
232 def update_client_meta(self):
233 new_updates = {}
234 pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
235 with self.meta_lock:
236 fsmap = self.module.get('fs_map')
237 for fs in fsmap['filesystems']:
238 mdsmap = fs['mdsmap']
239 gid = mdsmap['up']["mds_0"]
240 if gid in pending_updates:
241 continue
242 tag = str(uuid.uuid4())
243 result = CommandResult(tag)
244 new_updates[tag] = (gid, result)
245 self.client_metadata['in_progress'].update(new_updates)
246
247 self.log.debug(f"updating client metadata from {new_updates}")
248
249 cmd_dict = {'prefix': 'client ls'}
250 for tag,val in new_updates.items():
251 self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag)
252
253 def run(self):
254 try:
255 self.log.info("FSPerfStats::report_processor starting...")
256 while True:
257 with self.lock:
258 self.scrub_expired_queries()
259 self.process_mds_reports()
260 self.r_cv.notify()
261
262 stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
263 self.q_cv.wait(stats_period)
264 self.log.debug("FSPerfStats::tick")
265 except Exception as e:
266 self.log.fatal("fatal error: {}".format(traceback.format_exc()))
267
268 def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
269 # this is pretty straight forward -- find what MDSs are missing from
270 # what is tracked vs what we received in incoming report and purge
271 # the whole bunch.
272 tracked_ranks = raw_perf_counters.keys()
273 available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics]
274 for rank in set(tracked_ranks) - set(available_ranks):
275 culled = raw_perf_counters.pop(rank)
276 self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format(
277 len(culled[1]), rank, "yes" if culled[0] else "no"))
278 missing_clients.update(list(culled[1].keys()))
279
280 def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
281 # this is a bit more involed -- for each rank figure out what clients
282 # are missing in incoming report and purge them from our tracked map.
283 # but, if this is invoked _after_ cull_mds_entries(), the rank set
284 # is same, so we can loop based on that assumption.
285 ranks = raw_perf_counters.keys()
286 for rank in ranks:
287 tracked_clients = raw_perf_counters[rank][1].keys()
288 available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics]
289 for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]):
290 raw_perf_counters[rank][1].pop(client)
291 self.log.info("culled {0} from rank {1}".format(client, rank))
292 missing_clients.add(client)
293
294 def cull_missing_entries(self, raw_perf_counters, incoming_metrics):
295 missing_clients = set() # type: Set[str]
296 self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients)
297 self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients)
298
299 self.log.debug("missing_clients={0}".format(missing_clients))
300 with self.meta_lock:
301 if self.client_metadata['in_progress']:
302 self.client_metadata['to_purge'].update(missing_clients)
303 self.log.info("deferring client metadata purge (now {0} client(s))".format(
304 len(self.client_metadata['to_purge'])))
305 else:
306 for client in missing_clients:
307 try:
308 self.log.info("purge client metadata for {0}".format(client))
309 self.client_metadata['metadata'].pop(client)
310 except KeyError:
311 pass
312 self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
313
314 def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
315 tracked_clients = raw_perf_counters.keys()
316 available_clients = [counter['k'][0][0] for counter in incoming_metrics]
317 for client in set(tracked_clients) - set(available_clients):
318 raw_perf_counters.pop(client)
319
320 def get_raw_perf_counters(self, query):
321 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {})
322
323 for query_id in query[QUERY_IDS]:
324 result = self.module.get_mds_perf_counters(query_id)
325 self.log.debug("raw_perf_counters={}".format(raw_perf_counters))
326 self.log.debug("get_raw_perf_counters={}".format(result))
327
328 # extract passed in delayed ranks. metrics for delayed ranks are tagged
329 # as stale.
330 delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0])
331
332 # what's received from MDS
333 incoming_metrics = result['metrics'][1]
334
335 # metrics updated (monotonic) time
336 self.mx_last_updated = result['metrics'][2][0]
337
338 # cull missing MDSs and clients
339 self.cull_missing_entries(raw_perf_counters, incoming_metrics)
340
341 # iterate over metrics list and update our copy (note that we have
342 # already culled the differences).
343 for counter in incoming_metrics:
344 mds_rank = int(counter['k'][0][0])
345 client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
346 if client_id is not None or not client_ip: # client_id _could_ be 0
347 with self.meta_lock:
348 self.set_client_metadata(client_id, "IP", client_ip)
349 else:
350 self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
351
352 raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
353 raw_counters[0] = True if mds_rank in delayed_ranks else False
354 raw_client_counters = raw_counters[1].setdefault(client_id, [])
355
356 del raw_client_counters[:]
357 raw_client_counters.extend(counter['c'])
358 # send an asynchronous client metadata refresh
359 self.update_client_meta()
360
361 def get_raw_perf_counters_global(self, query):
362 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
363 result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
364
365 self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters))
366 self.log.debug("get_raw_perf_counters_global={}".format(result))
367
368 global_metrics = result['metrics'][1]
369 self.cull_global_metrics(raw_perf_counters, global_metrics)
370 for counter in global_metrics:
371 client_id, _ = extract_client_id_and_ip(counter['k'][0][0])
372 raw_client_counters = raw_perf_counters.setdefault(client_id, [])
373 del raw_client_counters[:]
374 raw_client_counters.extend(counter['c'])
375
376 def process_mds_reports(self):
377 for query in self.user_queries.values():
378 self.get_raw_perf_counters(query)
379 self.get_raw_perf_counters_global(query)
380
381 def scrub_expired_queries(self):
382 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
383 for filter_spec in list(self.user_queries.keys()):
384 user_query = self.user_queries[filter_spec]
385 self.log.debug("scrubbing query={}".format(user_query))
386 if user_query[QUERY_LAST_REQUEST] < expire_time:
387 expired_query_ids = user_query[QUERY_IDS].copy()
388 expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
389 self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
390 self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
391 del self.user_queries[filter_spec]
392
393 def prepare_mds_perf_query(self, rank, client_id, client_ip):
394 mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
395 if not rank == -1:
396 mds_rank_regex = '^({})$'.format(rank)
397 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
398 return {
399 'key_descriptor' : [
400 {'type' : 'mds_rank', 'regex' : mds_rank_regex},
401 {'type' : 'client_id', 'regex' : client_regex},
402 ],
403 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
404 }
405
406 def prepare_global_perf_query(self, client_id, client_ip):
407 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
408 return {
409 'key_descriptor' : [
410 {'type' : 'client_id', 'regex' : client_regex},
411 ],
412 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS,
413 }
414
415 def unregister_mds_perf_queries(self, filter_spec, query_ids):
416 self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
417 filter_spec, query_ids))
418 for query_id in query_ids:
419 self.module.remove_mds_perf_query(query_id)
420
421 def register_mds_perf_query(self, filter_spec):
422 mds_ranks = filter_spec.mds_ranks
423 client_id = filter_spec.client_id
424 client_ip = filter_spec.client_ip
425
426 query_ids = []
427 try:
428 # register per-mds perf query
429 for rank in mds_ranks:
430 query = self.prepare_mds_perf_query(rank, client_id, client_ip)
431 self.log.info("register_mds_perf_query: {}".format(query))
432
433 query_id = self.module.add_mds_perf_query(query)
434 if query_id is None: # query id can be 0
435 raise RuntimeError("failed to add MDS perf query: {}".format(query))
436 query_ids.append(query_id)
437 except Exception:
438 for query_id in query_ids:
439 self.module.remove_mds_perf_query(query_id)
440 raise
441 return query_ids
442
443 def register_global_perf_query(self, filter_spec):
444 client_id = filter_spec.client_id
445 client_ip = filter_spec.client_ip
446
447 # register a global perf query for metrics
448 query = self.prepare_global_perf_query(client_id, client_ip)
449 self.log.info("register_global_perf_query: {}".format(query))
450
451 query_id = self.module.add_mds_perf_query(query)
452 if query_id is None: # query id can be 0
453 raise RuntimeError("failed to add global perf query: {}".format(query))
454 return query_id
455
456 def register_query(self, filter_spec):
457 user_query = self.user_queries.get(filter_spec, None)
458 if not user_query:
459 user_query = {
460 QUERY_IDS : self.register_mds_perf_query(filter_spec),
461 GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
462 QUERY_LAST_REQUEST : datetime.now(),
463 }
464 self.user_queries[filter_spec] = user_query
465
466 self.q_cv.notify()
467 self.r_cv.wait(5)
468 else:
469 user_query[QUERY_LAST_REQUEST] = datetime.now()
470 return user_query
471
472 def generate_report(self, user_query):
473 result = {} # type: Dict
474 # start with counter info -- metrics that are global and per mds
475 result["version"] = PERF_STATS_VERSION
476 result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
477 result["counters"] = MDS_PERF_QUERY_COUNTERS
478
479 # fill in client metadata
480 raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
481 with self.meta_lock:
482 result_meta = result.setdefault("client_metadata", {})
483 for client_id in raw_perfs.keys():
484 if client_id in self.client_metadata["metadata"]:
485 client_meta = result_meta.setdefault(client_id, {})
486 client_meta.update(self.client_metadata["metadata"][client_id])
487
488 # start populating global perf metrics w/ client metadata
489 metrics = result.setdefault("global_metrics", {})
490 for client_id, counters in raw_perfs.items():
491 global_client_metrics = metrics.setdefault(client_id, [])
492 del global_client_metrics[:]
493 global_client_metrics.extend(counters)
494
495 # and, now per-mds metrics keyed by mds rank along with delayed ranks
496 raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
497 metrics = result.setdefault("metrics", {})
498
499 metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]]
500 for rank, counters in raw_perfs.items():
501 mds_key = "mds.{}".format(rank)
502 mds_metrics = metrics.setdefault(mds_key, {})
503 mds_metrics.update(counters[1])
504 return result
505
506 def extract_query_filters(self, cmd):
507 mds_rank_spec = cmd.get('mds_rank', None)
508 client_id_spec = cmd.get('client_id', None)
509 client_ip_spec = cmd.get('client_ip', None)
510
511 self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
512 mds_rank_spec, client_id_spec, client_ip_spec))
513
514 mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec)
515 client_id = extract_client_id_from_spec(client_id_spec)
516 client_ip = extract_client_ip_from_spec(client_ip_spec)
517
518 return FilterSpec(mds_ranks, client_id, client_ip)
519
520 def get_perf_data(self, cmd):
521 try:
522 filter_spec = self.extract_query_filters(cmd)
523 except ValueError as e:
524 return -errno.EINVAL, "", str(e)
525
526 counters = {}
527 with self.lock:
528 user_query = self.register_query(filter_spec)
529 result = self.generate_report(user_query)
530 return 0, json.dumps(result), ""