]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/stats/fs/perf_stats.py
a66bc7bef7173d3e908023c23bbe0fa5b06359f7
[ceph.git] / ceph / src / pybind / mgr / stats / fs / perf_stats.py
1 import re
2 import json
3 import time
4 import uuid
5 import errno
6 import traceback
7 from collections import OrderedDict
8 from typing import List, Dict, Set
9
10 from mgr_module import CommandResult
11
12 from datetime import datetime, timedelta
13 from threading import Lock, Condition, Thread
14
15 PERF_STATS_VERSION = 1
16
17 QUERY_IDS = "query_ids"
18 GLOBAL_QUERY_ID = "global_query_id"
19 QUERY_LAST_REQUEST = "last_time_stamp"
20 QUERY_RAW_COUNTERS = "query_raw_counters"
21 QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
22
23 MDS_RANK_ALL = (-1,)
24 CLIENT_ID_ALL = "\d*"
25 CLIENT_IP_ALL = ".*"
26
27 MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
28 MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
29 MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
30 'read_latency': 1,
31 'write_latency': 2,
32 'metadata_latency': 3,
33 'dentry_lease': 4,
34 'opened_files': 5,
35 'pinned_icaps': 6,
36 'opened_inodes': 7,
37 'read_io_sizes': 8,
38 'write_io_sizes': 9})
39 MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
40 MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes', 'read_io_sizes', 'write_io_sizes'] # type: List[str]
41
42 QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
43
44 CLIENT_METADATA_KEY = "client_metadata"
45 CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
46 CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
47
48 NON_EXISTENT_KEY_STR = "N/A"
49
50 class FilterSpec(object):
51 """
52 query filters encapsulated and used as key for query map
53 """
54 def __init__(self, mds_ranks, client_id, client_ip):
55 self.mds_ranks = mds_ranks
56 self.client_id = client_id
57 self.client_ip = client_ip
58
59 def __hash__(self):
60 return hash((self.mds_ranks, self.client_id, self.client_ip))
61
62 def __eq__(self, other):
63 return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip)
64
65 def __ne__(self, other):
66 return not(self == other)
67
68 def extract_mds_ranks_from_spec(mds_rank_spec):
69 if not mds_rank_spec:
70 return MDS_RANK_ALL
71 match = re.match(r'^(\d[,\d]*)$', mds_rank_spec)
72 if not match:
73 raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec))
74 return tuple(int(mds_rank) for mds_rank in match.group(0).split(','))
75
76 def extract_client_id_from_spec(client_id_spec):
77 if not client_id_spec:
78 return CLIENT_ID_ALL
79 # the client id is the spec itself since it'll be a part
80 # of client filter regex.
81 return client_id_spec
82
83 def extract_client_ip_from_spec(client_ip_spec):
84 if not client_ip_spec:
85 return CLIENT_IP_ALL
86 # TODO: validate if it is an ip address (or a subset of it).
87 # the client ip is the spec itself since it'll be a part
88 # of client filter regex.
89 return client_ip_spec
90
91 def extract_mds_ranks_from_report(mds_ranks_str):
92 if not mds_ranks_str:
93 return []
94 return [int(x) for x in mds_ranks_str.split(',')]
95
96 def extract_client_id_and_ip(client):
97 match = re.match(r'^(client\.\d+)\s(.*)', client)
98 if match:
99 return match.group(1), match.group(2)
100 return None, None
101
102 class FSPerfStats(object):
103 lock = Lock()
104 q_cv = Condition(lock)
105 r_cv = Condition(lock)
106
107 user_queries = {} # type: Dict[str, Dict]
108
109 meta_lock = Lock()
110 client_metadata = {
111 'metadata' : {},
112 'to_purge' : set(),
113 'in_progress' : {},
114 } # type: Dict
115
116 def __init__(self, module):
117 self.module = module
118 self.log = module.log
119 # report processor thread
120 self.report_processor = Thread(target=self.run)
121 self.report_processor.start()
122
123 def set_client_metadata(self, client_id, key, meta):
124 result = self.client_metadata['metadata'].setdefault(client_id, {})
125 if not key in result or not result[key] == meta:
126 result[key] = meta
127
128 def notify(self, cmdtag):
129 self.log.debug("cmdtag={0}".format(cmdtag))
130 with self.meta_lock:
131 try:
132 result = self.client_metadata['in_progress'].pop(cmdtag)
133 except KeyError:
134 self.log.warn(f"cmdtag {cmdtag} not found in client metadata")
135 return
136
137 client_meta = result[1].wait()
138 if client_meta[0] != 0:
139 self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format(
140 result[0], client_meta[2]))
141 return
142 self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
143 for metadata in json.loads(client_meta[1]):
144 client_id = "client.{0}".format(metadata['id'])
145 result = self.client_metadata['metadata'].setdefault(client_id, {})
146 for subkey in CLIENT_METADATA_SUBKEYS:
147 self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
148 for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
149 self.set_client_metadata(client_id, subkey,
150 metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
151 metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
152 supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
153 self.set_client_metadata(client_id, "valid_metrics", supported_metrics)
154 kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None)
155 if kver:
156 self.set_client_metadata(client_id, "kernel_version", kver)
157 # when all async requests are done, purge clients metadata if any.
158 if not self.client_metadata['in_progress']:
159 for client in self.client_metadata['to_purge']:
160 try:
161 self.log.info("purge client metadata for {0}".format(client))
162 self.client_metadata['metadata'].remove(client)
163 except:
164 pass
165 self.client_metadata['to_purge'].clear()
166 self.log.debug("client_metadata={0}, to_purge={1}".format(
167 self.client_metadata['metadata'], self.client_metadata['to_purge']))
168
169 def update_client_meta(self, rank_set):
170 new_updates = {}
171 pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
172 with self.meta_lock:
173 for rank in rank_set:
174 if rank in pending_updates:
175 continue
176 tag = str(uuid.uuid4())
177 result = CommandResult(tag)
178 new_updates[tag] = (rank, result)
179 self.client_metadata['in_progress'].update(new_updates)
180
181 self.log.debug("updating client metadata from {0}".format(new_updates))
182
183 cmd_dict = {'prefix': 'client ls'}
184 for tag,val in new_updates.items():
185 self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag)
186
187 def run(self):
188 try:
189 self.log.info("FSPerfStats::report_processor starting...")
190 while True:
191 with self.lock:
192 self.scrub_expired_queries()
193 self.process_mds_reports()
194 self.r_cv.notify()
195
196 stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
197 self.q_cv.wait(stats_period)
198 self.log.debug("FSPerfStats::tick")
199 except Exception as e:
200 self.log.fatal("fatal error: {}".format(traceback.format_exc()))
201
202 def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
203 # this is pretty straight forward -- find what MDSs are missing from
204 # what is tracked vs what we received in incoming report and purge
205 # the whole bunch.
206 tracked_ranks = raw_perf_counters.keys()
207 available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics]
208 for rank in set(tracked_ranks) - set(available_ranks):
209 culled = raw_perf_counters.pop(rank)
210 self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format(
211 len(culled[1]), rank, "yes" if culled[0] else "no"))
212 missing_clients.update(list(culled[1].keys()))
213
214 def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
215 # this is a bit more involed -- for each rank figure out what clients
216 # are missing in incoming report and purge them from our tracked map.
217 # but, if this is invoked _after_ cull_mds_entries(), the rank set
218 # is same, so we can loop based on that assumption.
219 ranks = raw_perf_counters.keys()
220 for rank in ranks:
221 tracked_clients = raw_perf_counters[rank][1].keys()
222 available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics]
223 for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]):
224 raw_perf_counters[rank][1].pop(client)
225 self.log.info("culled {0} from rank {1}".format(client, rank))
226 missing_clients.add(client)
227
228 def cull_missing_entries(self, raw_perf_counters, incoming_metrics):
229 missing_clients = set() # type: Set[str]
230 self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients)
231 self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients)
232
233 self.log.debug("missing_clients={0}".format(missing_clients))
234 with self.meta_lock:
235 if self.client_metadata['in_progress']:
236 self.client_metadata['to_purge'].update(missing_clients)
237 self.log.info("deferring client metadata purge (now {0} client(s))".format(
238 len(self.client_metadata['to_purge'])))
239 else:
240 for client in missing_clients:
241 try:
242 self.log.info("purge client metadata for {0}".format(client))
243 self.client_metadata['metadata'].pop(client)
244 except KeyError:
245 pass
246 self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
247
248 def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
249 tracked_clients = raw_perf_counters.keys()
250 available_clients = [counter['k'][0][0] for counter in incoming_metrics]
251 for client in set(tracked_clients) - set(available_clients):
252 raw_perf_counters.pop(client)
253
254 def get_raw_perf_counters(self, query):
255 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {})
256
257 for query_id in query[QUERY_IDS]:
258 result = self.module.get_mds_perf_counters(query_id)
259 self.log.debug("raw_perf_counters={}".format(raw_perf_counters))
260 self.log.debug("get_raw_perf_counters={}".format(result))
261
262 # extract passed in delayed ranks. metrics for delayed ranks are tagged
263 # as stale.
264 delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0])
265
266 # what's received from MDS
267 incoming_metrics = result['metrics'][1]
268
269 # cull missing MDSs and clients
270 self.cull_missing_entries(raw_perf_counters, incoming_metrics)
271
272 # iterate over metrics list and update our copy (note that we have
273 # already culled the differences).
274 meta_refresh_ranks = set()
275 for counter in incoming_metrics:
276 mds_rank = int(counter['k'][0][0])
277 client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
278 if client_id is not None or not client_ip: # client_id _could_ be 0
279 with self.meta_lock:
280 if not client_id in self.client_metadata['metadata']:
281 meta_refresh_ranks.add(mds_rank)
282 self.set_client_metadata(client_id, "IP", client_ip)
283 else:
284 self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
285
286 raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
287 raw_counters[0] = True if mds_rank in delayed_ranks else False
288 raw_client_counters = raw_counters[1].setdefault(client_id, [])
289
290 del raw_client_counters[:]
291 raw_client_counters.extend(counter['c'])
292 # send an asynchronous client metadata refresh
293 self.update_client_meta(meta_refresh_ranks)
294
295 def get_raw_perf_counters_global(self, query):
296 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
297 result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
298
299 self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters))
300 self.log.debug("get_raw_perf_counters_global={}".format(result))
301
302 global_metrics = result['metrics'][1]
303 self.cull_global_metrics(raw_perf_counters, global_metrics)
304 for counter in global_metrics:
305 client_id, _ = extract_client_id_and_ip(counter['k'][0][0])
306 raw_client_counters = raw_perf_counters.setdefault(client_id, [])
307 del raw_client_counters[:]
308 raw_client_counters.extend(counter['c'])
309
310 def process_mds_reports(self):
311 for query in self.user_queries.values():
312 self.get_raw_perf_counters(query)
313 self.get_raw_perf_counters_global(query)
314
315 def scrub_expired_queries(self):
316 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
317 for filter_spec in list(self.user_queries.keys()):
318 user_query = self.user_queries[filter_spec]
319 self.log.debug("scrubbing query={}".format(user_query))
320 if user_query[QUERY_LAST_REQUEST] < expire_time:
321 expired_query_ids = user_query[QUERY_IDS].copy()
322 expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
323 self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
324 self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
325 del self.user_queries[filter_spec]
326
327 def prepare_mds_perf_query(self, rank, client_id, client_ip):
328 mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
329 if not rank == -1:
330 mds_rank_regex = '^({})$'.format(rank)
331 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
332 return {
333 'key_descriptor' : [
334 {'type' : 'mds_rank', 'regex' : mds_rank_regex},
335 {'type' : 'client_id', 'regex' : client_regex},
336 ],
337 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
338 }
339
340 def prepare_global_perf_query(self, client_id, client_ip):
341 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
342 return {
343 'key_descriptor' : [
344 {'type' : 'client_id', 'regex' : client_regex},
345 ],
346 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS,
347 }
348
349 def unregister_mds_perf_queries(self, filter_spec, query_ids):
350 self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
351 filter_spec, query_ids))
352 for query_id in query_ids:
353 self.module.remove_mds_perf_query(query_id)
354
355 def register_mds_perf_query(self, filter_spec):
356 mds_ranks = filter_spec.mds_ranks
357 client_id = filter_spec.client_id
358 client_ip = filter_spec.client_ip
359
360 query_ids = []
361 try:
362 # register per-mds perf query
363 for rank in mds_ranks:
364 query = self.prepare_mds_perf_query(rank, client_id, client_ip)
365 self.log.info("register_mds_perf_query: {}".format(query))
366
367 query_id = self.module.add_mds_perf_query(query)
368 if query_id is None: # query id can be 0
369 raise RuntimeError("failed to add MDS perf query: {}".format(query))
370 query_ids.append(query_id)
371 except Exception:
372 for query_id in query_ids:
373 self.module.remove_mds_perf_query(query_id)
374 raise
375 return query_ids
376
377 def register_global_perf_query(self, filter_spec):
378 client_id = filter_spec.client_id
379 client_ip = filter_spec.client_ip
380
381 # register a global perf query for metrics
382 query = self.prepare_global_perf_query(client_id, client_ip)
383 self.log.info("register_global_perf_query: {}".format(query))
384
385 query_id = self.module.add_mds_perf_query(query)
386 if query_id is None: # query id can be 0
387 raise RuntimeError("failed to add global perf query: {}".format(query))
388 return query_id
389
390 def register_query(self, filter_spec):
391 user_query = self.user_queries.get(filter_spec, None)
392 if not user_query:
393 user_query = {
394 QUERY_IDS : self.register_mds_perf_query(filter_spec),
395 GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
396 QUERY_LAST_REQUEST : datetime.now(),
397 }
398 self.user_queries[filter_spec] = user_query
399
400 self.q_cv.notify()
401 self.r_cv.wait(5)
402 else:
403 user_query[QUERY_LAST_REQUEST] = datetime.now()
404 return user_query
405
406 def generate_report(self, user_query):
407 result = {} # type: Dict
408 # start with counter info -- metrics that are global and per mds
409 result["version"] = PERF_STATS_VERSION
410 result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
411 result["counters"] = MDS_PERF_QUERY_COUNTERS
412
413 # fill in client metadata
414 raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
415 with self.meta_lock:
416 result_meta = result.setdefault("client_metadata", {})
417 for client_id in raw_perfs.keys():
418 if client_id in self.client_metadata["metadata"]:
419 client_meta = result_meta.setdefault(client_id, {})
420 client_meta.update(self.client_metadata["metadata"][client_id])
421
422 # start populating global perf metrics w/ client metadata
423 metrics = result.setdefault("global_metrics", {})
424 for client_id, counters in raw_perfs.items():
425 global_client_metrics = metrics.setdefault(client_id, [])
426 del global_client_metrics[:]
427 global_client_metrics.extend(counters)
428
429 # and, now per-mds metrics keyed by mds rank along with delayed ranks
430 raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
431 metrics = result.setdefault("metrics", {})
432
433 metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]]
434 for rank, counters in raw_perfs.items():
435 mds_key = "mds.{}".format(rank)
436 mds_metrics = metrics.setdefault(mds_key, {})
437 mds_metrics.update(counters[1])
438 return result
439
440 def extract_query_filters(self, cmd):
441 mds_rank_spec = cmd.get('mds_rank', None)
442 client_id_spec = cmd.get('client_id', None)
443 client_ip_spec = cmd.get('client_ip', None)
444
445 self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
446 mds_rank_spec, client_id_spec, client_ip_spec))
447
448 mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec)
449 client_id = extract_client_id_from_spec(client_id_spec)
450 client_ip = extract_client_ip_from_spec(client_ip_spec)
451
452 return FilterSpec(mds_ranks, client_id, client_ip)
453
454 def get_perf_data(self, cmd):
455 filter_spec = self.extract_query_filters(cmd)
456
457 counters = {}
458 with self.lock:
459 user_query = self.register_query(filter_spec)
460 result = self.generate_report(user_query)
461 return 0, json.dumps(result), ""