]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/stats/fs/perf_stats.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / stats / fs / perf_stats.py
CommitLineData
f67539c2
TL
1import re
2import json
3import time
4import uuid
5import errno
6import traceback
33c7a0ef 7import logging
f67539c2
TL
8from collections import OrderedDict
9from typing import List, Dict, Set
10
11from mgr_module import CommandResult
12
13from datetime import datetime, timedelta
33c7a0ef 14from threading import Lock, Condition, Thread, Timer
20effc67 15from ipaddress import ip_address
f67539c2 16
39ae355f 17PERF_STATS_VERSION = 2
f67539c2
TL
18
19QUERY_IDS = "query_ids"
20GLOBAL_QUERY_ID = "global_query_id"
21QUERY_LAST_REQUEST = "last_time_stamp"
22QUERY_RAW_COUNTERS = "query_raw_counters"
23QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
24
25MDS_RANK_ALL = (-1,)
26CLIENT_ID_ALL = "\d*"
27CLIENT_IP_ALL = ".*"
28
39ae355f
TL
29fs_list = [] # type: List[str]
30
f67539c2
TL
31MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
32MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
33MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
34 'read_latency': 1,
35 'write_latency': 2,
36 'metadata_latency': 3,
37 'dentry_lease': 4,
38 'opened_files': 5,
39 'pinned_icaps': 6,
a4b75251
TL
40 'opened_inodes': 7,
41 'read_io_sizes': 8,
2a845540
TL
42 'write_io_sizes': 9,
43 'avg_read_latency': 10,
44 'stdev_read_latency': 11,
45 'avg_write_latency': 12,
46 'stdev_write_latency': 13,
47 'avg_metadata_latency': 14,
48 'stdev_metadata_latency': 15})
f67539c2 49MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
2a845540 50MDS_GLOBAL_PERF_QUERY_COUNTERS = list(MDS_PERF_QUERY_COUNTERS_MAP.keys())
f67539c2
TL
51
52QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
33c7a0ef 53REREGISTER_TIMER_INTERVAL = 1
f67539c2
TL
54
55CLIENT_METADATA_KEY = "client_metadata"
56CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
57CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
58
59NON_EXISTENT_KEY_STR = "N/A"
60
33c7a0ef
TL
61logger = logging.getLogger(__name__)
62
f67539c2
TL
63class FilterSpec(object):
64 """
65 query filters encapsulated and used as key for query map
66 """
67 def __init__(self, mds_ranks, client_id, client_ip):
68 self.mds_ranks = mds_ranks
69 self.client_id = client_id
70 self.client_ip = client_ip
71
72 def __hash__(self):
73 return hash((self.mds_ranks, self.client_id, self.client_ip))
74
75 def __eq__(self, other):
76 return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip)
77
78 def __ne__(self, other):
79 return not(self == other)
80
81def extract_mds_ranks_from_spec(mds_rank_spec):
82 if not mds_rank_spec:
83 return MDS_RANK_ALL
20effc67 84 match = re.match(r'^\d+(,\d+)*$', mds_rank_spec)
f67539c2
TL
85 if not match:
86 raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec))
87 return tuple(int(mds_rank) for mds_rank in match.group(0).split(','))
88
89def extract_client_id_from_spec(client_id_spec):
90 if not client_id_spec:
91 return CLIENT_ID_ALL
92 # the client id is the spec itself since it'll be a part
93 # of client filter regex.
20effc67
TL
94 if not client_id_spec.isdigit():
95 raise ValueError('invalid client_id filter spec: {}'.format(client_id_spec))
f67539c2
TL
96 return client_id_spec
97
98def extract_client_ip_from_spec(client_ip_spec):
99 if not client_ip_spec:
100 return CLIENT_IP_ALL
20effc67
TL
101
102 client_ip = client_ip_spec
103 if client_ip.startswith('v1:'):
104 client_ip = client_ip.replace('v1:', '')
105 elif client_ip.startswith('v2:'):
106 client_ip = client_ip.replace('v2:', '')
107
108 try:
109 ip_address(client_ip)
110 return client_ip_spec
111 except ValueError:
112 raise ValueError('invalid client_ip filter spec: {}'.format(client_ip_spec))
f67539c2
TL
113
114def extract_mds_ranks_from_report(mds_ranks_str):
115 if not mds_ranks_str:
116 return []
117 return [int(x) for x in mds_ranks_str.split(',')]
118
119def extract_client_id_and_ip(client):
120 match = re.match(r'^(client\.\d+)\s(.*)', client)
121 if match:
122 return match.group(1), match.group(2)
123 return None, None
124
125class FSPerfStats(object):
126 lock = Lock()
127 q_cv = Condition(lock)
128 r_cv = Condition(lock)
129
130 user_queries = {} # type: Dict[str, Dict]
131
132 meta_lock = Lock()
33c7a0ef 133 rqtimer = None
f67539c2
TL
134 client_metadata = {
135 'metadata' : {},
136 'to_purge' : set(),
137 'in_progress' : {},
138 } # type: Dict
139
140 def __init__(self, module):
141 self.module = module
142 self.log = module.log
33c7a0ef 143 self.prev_rank0_gid = None
f67539c2
TL
144 # report processor thread
145 self.report_processor = Thread(target=self.run)
146 self.report_processor.start()
147
39ae355f
TL
148 def set_client_metadata(self, fs_name, client_id, key, meta):
149 result = (self.client_metadata['metadata'].setdefault(
150 fs_name, {})).setdefault(client_id, {})
f67539c2
TL
151 if not key in result or not result[key] == meta:
152 result[key] = meta
153
33c7a0ef 154 def notify_cmd(self, cmdtag):
f67539c2
TL
155 self.log.debug("cmdtag={0}".format(cmdtag))
156 with self.meta_lock:
522d829b
TL
157 try:
158 result = self.client_metadata['in_progress'].pop(cmdtag)
159 except KeyError:
160 self.log.warn(f"cmdtag {cmdtag} not found in client metadata")
161 return
39ae355f
TL
162 fs_name = result[0]
163 client_meta = result[2].wait()
f67539c2 164 if client_meta[0] != 0:
39ae355f
TL
165 self.log.warn("failed to fetch client metadata from gid {0}, err={1}".format(
166 result[1], client_meta[2]))
f67539c2
TL
167 return
168 self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
169 for metadata in json.loads(client_meta[1]):
170 client_id = "client.{0}".format(metadata['id'])
39ae355f 171 result = (self.client_metadata['metadata'].setdefault(fs_name, {})).setdefault(client_id, {})
f67539c2 172 for subkey in CLIENT_METADATA_SUBKEYS:
39ae355f 173 self.set_client_metadata(fs_name, client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
f67539c2 174 for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
39ae355f 175 self.set_client_metadata(fs_name, client_id, subkey,
f67539c2
TL
176 metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
177 metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
178 supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
39ae355f 179 self.set_client_metadata(fs_name, client_id, "valid_metrics", supported_metrics)
f67539c2
TL
180 kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None)
181 if kver:
39ae355f 182 self.set_client_metadata(fs_name, client_id, "kernel_version", kver)
f67539c2
TL
183 # when all async requests are done, purge clients metadata if any.
184 if not self.client_metadata['in_progress']:
39ae355f
TL
185 global fs_list
186 for fs_name in fs_list:
187 for client in self.client_metadata['to_purge']:
188 try:
189 if client in self.client_metadata['metadata'][fs_name]:
190 self.log.info("purge client metadata for {0}".format(client))
191 self.client_metadata['metadata'][fs_name].pop(client)
192 except:
193 pass
194 if fs_name in self.client_metadata['metadata'] and not bool(self.client_metadata['metadata'][fs_name]):
195 self.client_metadata['metadata'].pop(fs_name)
f67539c2
TL
196 self.client_metadata['to_purge'].clear()
197 self.log.debug("client_metadata={0}, to_purge={1}".format(
198 self.client_metadata['metadata'], self.client_metadata['to_purge']))
199
33c7a0ef
TL
200 def notify_fsmap(self):
201 #Reregister the user queries when there is a new rank0 mds
202 with self.lock:
203 gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map'))
204 if not gid_state:
205 return
39ae355f
TL
206 for value in gid_state:
207 rank0_gid, state = value
208 if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'):
209 #the new rank0 MDS is up:active
210 ua_last_updated = time.monotonic()
211 if (self.rqtimer and self.rqtimer.is_alive()):
212 self.rqtimer.cancel()
213 self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
214 self.re_register_queries,
215 args=(rank0_gid, ua_last_updated,))
216 self.rqtimer.start()
33c7a0ef
TL
217
218 def re_register_queries(self, rank0_gid, ua_last_updated):
219 #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
220 #wait for the empty metrics
221 with self.lock:
222 if self.mx_last_updated >= ua_last_updated:
223 self.log.debug("reregistering queries...")
224 self.module.reregister_mds_perf_queries()
225 self.prev_rank0_gid = rank0_gid
226 else:
227 #reschedule the timer
228 self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
229 self.re_register_queries, args=(rank0_gid, ua_last_updated,))
230 self.rqtimer.start()
231
232 @staticmethod
233 def get_rank0_mds_gid_state(fsmap):
39ae355f 234 gid_state = []
33c7a0ef
TL
235 for fs in fsmap['filesystems']:
236 mds_map = fs['mdsmap']
237 if mds_map is not None:
238 for mds_id, mds_status in mds_map['info'].items():
239 if mds_status['rank'] == 0:
39ae355f
TL
240 gid_state.append([mds_status['gid'], mds_status['state']])
241 if gid_state:
242 return gid_state
33c7a0ef
TL
243 logger.warn("No rank0 mds in the fsmap")
244
245 def update_client_meta(self):
f67539c2
TL
246 new_updates = {}
247 pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
39ae355f
TL
248 global fs_list
249 fs_list.clear()
f67539c2 250 with self.meta_lock:
33c7a0ef
TL
251 fsmap = self.module.get('fs_map')
252 for fs in fsmap['filesystems']:
39ae355f
TL
253 mds_map = fs['mdsmap']
254 if mds_map is not None:
255 fsname = mds_map['fs_name']
256 for mds_id, mds_status in mds_map['info'].items():
257 if mds_status['rank'] == 0:
258 fs_list.append(fsname)
259 rank0_gid = mds_status['gid']
260 tag = str(uuid.uuid4())
261 result = CommandResult(tag)
262 new_updates[tag] = (fsname, rank0_gid, result)
263 self.client_metadata['in_progress'].update(new_updates)
f67539c2 264
33c7a0ef 265 self.log.debug(f"updating client metadata from {new_updates}")
f67539c2
TL
266
267 cmd_dict = {'prefix': 'client ls'}
268 for tag,val in new_updates.items():
39ae355f 269 self.module.send_command(val[2], "mds", str(val[1]), json.dumps(cmd_dict), tag)
f67539c2
TL
270
271 def run(self):
272 try:
273 self.log.info("FSPerfStats::report_processor starting...")
274 while True:
275 with self.lock:
276 self.scrub_expired_queries()
277 self.process_mds_reports()
278 self.r_cv.notify()
279
280 stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
281 self.q_cv.wait(stats_period)
282 self.log.debug("FSPerfStats::tick")
283 except Exception as e:
284 self.log.fatal("fatal error: {}".format(traceback.format_exc()))
285
286 def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
287 # this is pretty straight forward -- find what MDSs are missing from
288 # what is tracked vs what we received in incoming report and purge
289 # the whole bunch.
290 tracked_ranks = raw_perf_counters.keys()
291 available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics]
292 for rank in set(tracked_ranks) - set(available_ranks):
293 culled = raw_perf_counters.pop(rank)
294 self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format(
295 len(culled[1]), rank, "yes" if culled[0] else "no"))
296 missing_clients.update(list(culled[1].keys()))
297
298 def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
299 # this is a bit more involed -- for each rank figure out what clients
300 # are missing in incoming report and purge them from our tracked map.
39ae355f 301 # but, if this is invoked after cull_mds_entries(), the rank set
f67539c2
TL
302 # is same, so we can loop based on that assumption.
303 ranks = raw_perf_counters.keys()
304 for rank in ranks:
305 tracked_clients = raw_perf_counters[rank][1].keys()
306 available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics]
307 for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]):
308 raw_perf_counters[rank][1].pop(client)
309 self.log.info("culled {0} from rank {1}".format(client, rank))
310 missing_clients.add(client)
311
312 def cull_missing_entries(self, raw_perf_counters, incoming_metrics):
313 missing_clients = set() # type: Set[str]
314 self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients)
315 self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients)
316
317 self.log.debug("missing_clients={0}".format(missing_clients))
318 with self.meta_lock:
319 if self.client_metadata['in_progress']:
320 self.client_metadata['to_purge'].update(missing_clients)
321 self.log.info("deferring client metadata purge (now {0} client(s))".format(
322 len(self.client_metadata['to_purge'])))
323 else:
39ae355f
TL
324 global fs_list
325 for fs_name in fs_list:
326 for client in missing_clients:
327 try:
328 self.log.info("purge client metadata for {0}".format(client))
329 if client in self.client_metadata['metadata'][fs_name]:
330 self.client_metadata['metadata'][fs_name].pop(client)
331 except KeyError:
332 pass
333 self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
f67539c2
TL
334
335 def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
336 tracked_clients = raw_perf_counters.keys()
337 available_clients = [counter['k'][0][0] for counter in incoming_metrics]
338 for client in set(tracked_clients) - set(available_clients):
339 raw_perf_counters.pop(client)
340
341 def get_raw_perf_counters(self, query):
342 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {})
343
344 for query_id in query[QUERY_IDS]:
345 result = self.module.get_mds_perf_counters(query_id)
346 self.log.debug("raw_perf_counters={}".format(raw_perf_counters))
347 self.log.debug("get_raw_perf_counters={}".format(result))
348
349 # extract passed in delayed ranks. metrics for delayed ranks are tagged
350 # as stale.
351 delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0])
352
353 # what's received from MDS
354 incoming_metrics = result['metrics'][1]
355
33c7a0ef
TL
356 # metrics updated (monotonic) time
357 self.mx_last_updated = result['metrics'][2][0]
358
f67539c2
TL
359 # cull missing MDSs and clients
360 self.cull_missing_entries(raw_perf_counters, incoming_metrics)
361
362 # iterate over metrics list and update our copy (note that we have
363 # already culled the differences).
39ae355f
TL
364 global fs_list
365 for fs_name in fs_list:
366 for counter in incoming_metrics:
367 mds_rank = int(counter['k'][0][0])
368 client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
369 if self.client_metadata['metadata'].get(fs_name):
370 if (client_id is not None or not client_ip) and\
371 self.client_metadata["metadata"][fs_name].get(client_id): # client_id _could_ be 0
372 with self.meta_lock:
373 self.set_client_metadata(fs_name, client_id, "IP", client_ip)
374 else:
375 self.log.warn(f"client metadata for client_id={client_id} might be unavailable")
376 else:
377 self.log.warn(f"client metadata for filesystem={fs_name} might be unavailable")
378
379 raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
380 raw_counters[0] = True if mds_rank in delayed_ranks else False
381 raw_client_counters = raw_counters[1].setdefault(client_id, [])
382
383 del raw_client_counters[:]
384 raw_client_counters.extend(counter['c'])
f67539c2 385 # send an asynchronous client metadata refresh
33c7a0ef 386 self.update_client_meta()
f67539c2
TL
387
388 def get_raw_perf_counters_global(self, query):
389 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
390 result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
391
392 self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters))
393 self.log.debug("get_raw_perf_counters_global={}".format(result))
394
395 global_metrics = result['metrics'][1]
396 self.cull_global_metrics(raw_perf_counters, global_metrics)
397 for counter in global_metrics:
398 client_id, _ = extract_client_id_and_ip(counter['k'][0][0])
399 raw_client_counters = raw_perf_counters.setdefault(client_id, [])
400 del raw_client_counters[:]
401 raw_client_counters.extend(counter['c'])
402
403 def process_mds_reports(self):
404 for query in self.user_queries.values():
405 self.get_raw_perf_counters(query)
406 self.get_raw_perf_counters_global(query)
407
408 def scrub_expired_queries(self):
409 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
410 for filter_spec in list(self.user_queries.keys()):
411 user_query = self.user_queries[filter_spec]
412 self.log.debug("scrubbing query={}".format(user_query))
413 if user_query[QUERY_LAST_REQUEST] < expire_time:
414 expired_query_ids = user_query[QUERY_IDS].copy()
415 expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
416 self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
417 self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
418 del self.user_queries[filter_spec]
419
420 def prepare_mds_perf_query(self, rank, client_id, client_ip):
421 mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
422 if not rank == -1:
423 mds_rank_regex = '^({})$'.format(rank)
424 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
425 return {
426 'key_descriptor' : [
427 {'type' : 'mds_rank', 'regex' : mds_rank_regex},
428 {'type' : 'client_id', 'regex' : client_regex},
429 ],
430 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
431 }
432
433 def prepare_global_perf_query(self, client_id, client_ip):
434 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
435 return {
436 'key_descriptor' : [
437 {'type' : 'client_id', 'regex' : client_regex},
438 ],
439 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS,
440 }
441
442 def unregister_mds_perf_queries(self, filter_spec, query_ids):
443 self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
444 filter_spec, query_ids))
445 for query_id in query_ids:
446 self.module.remove_mds_perf_query(query_id)
447
448 def register_mds_perf_query(self, filter_spec):
449 mds_ranks = filter_spec.mds_ranks
450 client_id = filter_spec.client_id
451 client_ip = filter_spec.client_ip
452
453 query_ids = []
454 try:
455 # register per-mds perf query
456 for rank in mds_ranks:
457 query = self.prepare_mds_perf_query(rank, client_id, client_ip)
458 self.log.info("register_mds_perf_query: {}".format(query))
459
460 query_id = self.module.add_mds_perf_query(query)
461 if query_id is None: # query id can be 0
462 raise RuntimeError("failed to add MDS perf query: {}".format(query))
463 query_ids.append(query_id)
464 except Exception:
465 for query_id in query_ids:
466 self.module.remove_mds_perf_query(query_id)
467 raise
468 return query_ids
469
470 def register_global_perf_query(self, filter_spec):
471 client_id = filter_spec.client_id
472 client_ip = filter_spec.client_ip
473
474 # register a global perf query for metrics
475 query = self.prepare_global_perf_query(client_id, client_ip)
476 self.log.info("register_global_perf_query: {}".format(query))
477
478 query_id = self.module.add_mds_perf_query(query)
479 if query_id is None: # query id can be 0
480 raise RuntimeError("failed to add global perf query: {}".format(query))
481 return query_id
482
483 def register_query(self, filter_spec):
484 user_query = self.user_queries.get(filter_spec, None)
485 if not user_query:
486 user_query = {
487 QUERY_IDS : self.register_mds_perf_query(filter_spec),
488 GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
489 QUERY_LAST_REQUEST : datetime.now(),
490 }
491 self.user_queries[filter_spec] = user_query
492
493 self.q_cv.notify()
494 self.r_cv.wait(5)
495 else:
496 user_query[QUERY_LAST_REQUEST] = datetime.now()
497 return user_query
498
499 def generate_report(self, user_query):
500 result = {} # type: Dict
39ae355f 501 global fs_list
f67539c2
TL
502 # start with counter info -- metrics that are global and per mds
503 result["version"] = PERF_STATS_VERSION
504 result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
505 result["counters"] = MDS_PERF_QUERY_COUNTERS
506
507 # fill in client metadata
39ae355f
TL
508 raw_perfs_global = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
509 raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
f67539c2 510 with self.meta_lock:
39ae355f
TL
511 raw_counters_clients = []
512 for val in raw_perfs.values():
513 raw_counters_clients.extend(list(val[1]))
f67539c2 514 result_meta = result.setdefault("client_metadata", {})
39ae355f
TL
515 for fs_name in fs_list:
516 meta = self.client_metadata["metadata"]
517 if fs_name in meta and len(meta[fs_name]):
518 for client_id in raw_perfs_global.keys():
519 if client_id in meta[fs_name] and client_id in raw_counters_clients:
520 client_meta = (result_meta.setdefault(fs_name, {})).setdefault(client_id, {})
521 client_meta.update(meta[fs_name][client_id])
f67539c2
TL
522
523 # start populating global perf metrics w/ client metadata
524 metrics = result.setdefault("global_metrics", {})
39ae355f
TL
525 for fs_name in fs_list:
526 if fs_name in meta and len(meta[fs_name]):
527 for client_id, counters in raw_perfs_global.items():
528 if client_id in meta[fs_name] and client_id in raw_counters_clients:
529 global_client_metrics = (metrics.setdefault(fs_name, {})).setdefault(client_id, [])
530 del global_client_metrics[:]
531 global_client_metrics.extend(counters)
f67539c2
TL
532
533 # and, now per-mds metrics keyed by mds rank along with delayed ranks
f67539c2
TL
534 metrics = result.setdefault("metrics", {})
535
39ae355f 536 metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]]
f67539c2
TL
537 for rank, counters in raw_perfs.items():
538 mds_key = "mds.{}".format(rank)
539 mds_metrics = metrics.setdefault(mds_key, {})
540 mds_metrics.update(counters[1])
541 return result
542
543 def extract_query_filters(self, cmd):
544 mds_rank_spec = cmd.get('mds_rank', None)
545 client_id_spec = cmd.get('client_id', None)
546 client_ip_spec = cmd.get('client_ip', None)
547
548 self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
549 mds_rank_spec, client_id_spec, client_ip_spec))
550
551 mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec)
552 client_id = extract_client_id_from_spec(client_id_spec)
553 client_ip = extract_client_ip_from_spec(client_ip_spec)
554
555 return FilterSpec(mds_ranks, client_id, client_ip)
556
557 def get_perf_data(self, cmd):
20effc67
TL
558 try:
559 filter_spec = self.extract_query_filters(cmd)
560 except ValueError as e:
561 return -errno.EINVAL, "", str(e)
f67539c2
TL
562
563 counters = {}
564 with self.lock:
565 user_query = self.register_query(filter_spec)
566 result = self.generate_report(user_query)
567 return 0, json.dumps(result), ""