]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/stats/fs/perf_stats.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / stats / fs / perf_stats.py
CommitLineData
f67539c2
TL
1import re
2import json
3import time
4import uuid
5import errno
6import traceback
7from collections import OrderedDict
8from typing import List, Dict, Set
9
10from mgr_module import CommandResult
11
12from datetime import datetime, timedelta
13from threading import Lock, Condition, Thread
14
15PERF_STATS_VERSION = 1
16
17QUERY_IDS = "query_ids"
18GLOBAL_QUERY_ID = "global_query_id"
19QUERY_LAST_REQUEST = "last_time_stamp"
20QUERY_RAW_COUNTERS = "query_raw_counters"
21QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
22
23MDS_RANK_ALL = (-1,)
24CLIENT_ID_ALL = "\d*"
25CLIENT_IP_ALL = ".*"
26
27MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
28MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
29MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
30 'read_latency': 1,
31 'write_latency': 2,
32 'metadata_latency': 3,
33 'dentry_lease': 4,
34 'opened_files': 5,
35 'pinned_icaps': 6,
36 'opened_inodes': 7})
37MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
38MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes'] # type: List[str]
39
40QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
41
42CLIENT_METADATA_KEY = "client_metadata"
43CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
44CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
45
46NON_EXISTENT_KEY_STR = "N/A"
47
48class FilterSpec(object):
49 """
50 query filters encapsulated and used as key for query map
51 """
52 def __init__(self, mds_ranks, client_id, client_ip):
53 self.mds_ranks = mds_ranks
54 self.client_id = client_id
55 self.client_ip = client_ip
56
57 def __hash__(self):
58 return hash((self.mds_ranks, self.client_id, self.client_ip))
59
60 def __eq__(self, other):
61 return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip)
62
63 def __ne__(self, other):
64 return not(self == other)
65
66def extract_mds_ranks_from_spec(mds_rank_spec):
67 if not mds_rank_spec:
68 return MDS_RANK_ALL
69 match = re.match(r'^(\d[,\d]*)$', mds_rank_spec)
70 if not match:
71 raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec))
72 return tuple(int(mds_rank) for mds_rank in match.group(0).split(','))
73
74def extract_client_id_from_spec(client_id_spec):
75 if not client_id_spec:
76 return CLIENT_ID_ALL
77 # the client id is the spec itself since it'll be a part
78 # of client filter regex.
79 return client_id_spec
80
81def extract_client_ip_from_spec(client_ip_spec):
82 if not client_ip_spec:
83 return CLIENT_IP_ALL
84 # TODO: validate if it is an ip address (or a subset of it).
85 # the client ip is the spec itself since it'll be a part
86 # of client filter regex.
87 return client_ip_spec
88
89def extract_mds_ranks_from_report(mds_ranks_str):
90 if not mds_ranks_str:
91 return []
92 return [int(x) for x in mds_ranks_str.split(',')]
93
94def extract_client_id_and_ip(client):
95 match = re.match(r'^(client\.\d+)\s(.*)', client)
96 if match:
97 return match.group(1), match.group(2)
98 return None, None
99
100class FSPerfStats(object):
101 lock = Lock()
102 q_cv = Condition(lock)
103 r_cv = Condition(lock)
104
105 user_queries = {} # type: Dict[str, Dict]
106
107 meta_lock = Lock()
108 client_metadata = {
109 'metadata' : {},
110 'to_purge' : set(),
111 'in_progress' : {},
112 } # type: Dict
113
114 def __init__(self, module):
115 self.module = module
116 self.log = module.log
117 # report processor thread
118 self.report_processor = Thread(target=self.run)
119 self.report_processor.start()
120
121 def set_client_metadata(self, client_id, key, meta):
122 result = self.client_metadata['metadata'].setdefault(client_id, {})
123 if not key in result or not result[key] == meta:
124 result[key] = meta
125
126 def notify(self, cmdtag):
127 self.log.debug("cmdtag={0}".format(cmdtag))
128 with self.meta_lock:
129 result = self.client_metadata['in_progress'].pop(cmdtag)
130 client_meta = result[1].wait()
131 if client_meta[0] != 0:
132 self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format(
133 result[0], client_meta[2]))
134 return
135 self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
136 for metadata in json.loads(client_meta[1]):
137 client_id = "client.{0}".format(metadata['id'])
138 result = self.client_metadata['metadata'].setdefault(client_id, {})
139 for subkey in CLIENT_METADATA_SUBKEYS:
140 self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
141 for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
142 self.set_client_metadata(client_id, subkey,
143 metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
144 metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
145 supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
146 self.set_client_metadata(client_id, "valid_metrics", supported_metrics)
147 kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None)
148 if kver:
149 self.set_client_metadata(client_id, "kernel_version", kver)
150 # when all async requests are done, purge clients metadata if any.
151 if not self.client_metadata['in_progress']:
152 for client in self.client_metadata['to_purge']:
153 try:
154 self.log.info("purge client metadata for {0}".format(client))
155 self.client_metadata['metadata'].remove(client)
156 except:
157 pass
158 self.client_metadata['to_purge'].clear()
159 self.log.debug("client_metadata={0}, to_purge={1}".format(
160 self.client_metadata['metadata'], self.client_metadata['to_purge']))
161
162 def update_client_meta(self, rank_set):
163 new_updates = {}
164 pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
165 with self.meta_lock:
166 for rank in rank_set:
167 if rank in pending_updates:
168 continue
169 tag = str(uuid.uuid4())
170 result = CommandResult(tag)
171 new_updates[tag] = (rank, result)
172 self.client_metadata['in_progress'].update(new_updates)
173
174 self.log.debug("updating client metadata from {0}".format(new_updates))
175
176 cmd_dict = {'prefix': 'client ls'}
177 for tag,val in new_updates.items():
178 self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag)
179
180 def run(self):
181 try:
182 self.log.info("FSPerfStats::report_processor starting...")
183 while True:
184 with self.lock:
185 self.scrub_expired_queries()
186 self.process_mds_reports()
187 self.r_cv.notify()
188
189 stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
190 self.q_cv.wait(stats_period)
191 self.log.debug("FSPerfStats::tick")
192 except Exception as e:
193 self.log.fatal("fatal error: {}".format(traceback.format_exc()))
194
195 def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
196 # this is pretty straight forward -- find what MDSs are missing from
197 # what is tracked vs what we received in incoming report and purge
198 # the whole bunch.
199 tracked_ranks = raw_perf_counters.keys()
200 available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics]
201 for rank in set(tracked_ranks) - set(available_ranks):
202 culled = raw_perf_counters.pop(rank)
203 self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format(
204 len(culled[1]), rank, "yes" if culled[0] else "no"))
205 missing_clients.update(list(culled[1].keys()))
206
207 def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
208 # this is a bit more involed -- for each rank figure out what clients
209 # are missing in incoming report and purge them from our tracked map.
210 # but, if this is invoked _after_ cull_mds_entries(), the rank set
211 # is same, so we can loop based on that assumption.
212 ranks = raw_perf_counters.keys()
213 for rank in ranks:
214 tracked_clients = raw_perf_counters[rank][1].keys()
215 available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics]
216 for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]):
217 raw_perf_counters[rank][1].pop(client)
218 self.log.info("culled {0} from rank {1}".format(client, rank))
219 missing_clients.add(client)
220
221 def cull_missing_entries(self, raw_perf_counters, incoming_metrics):
222 missing_clients = set() # type: Set[str]
223 self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients)
224 self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients)
225
226 self.log.debug("missing_clients={0}".format(missing_clients))
227 with self.meta_lock:
228 if self.client_metadata['in_progress']:
229 self.client_metadata['to_purge'].update(missing_clients)
230 self.log.info("deferring client metadata purge (now {0} client(s))".format(
231 len(self.client_metadata['to_purge'])))
232 else:
233 for client in missing_clients:
234 try:
235 self.log.info("purge client metadata for {0}".format(client))
236 self.client_metadata['metadata'].pop(client)
237 except KeyError:
238 pass
239 self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
240
241 def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
242 tracked_clients = raw_perf_counters.keys()
243 available_clients = [counter['k'][0][0] for counter in incoming_metrics]
244 for client in set(tracked_clients) - set(available_clients):
245 raw_perf_counters.pop(client)
246
247 def get_raw_perf_counters(self, query):
248 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {})
249
250 for query_id in query[QUERY_IDS]:
251 result = self.module.get_mds_perf_counters(query_id)
252 self.log.debug("raw_perf_counters={}".format(raw_perf_counters))
253 self.log.debug("get_raw_perf_counters={}".format(result))
254
255 # extract passed in delayed ranks. metrics for delayed ranks are tagged
256 # as stale.
257 delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0])
258
259 # what's received from MDS
260 incoming_metrics = result['metrics'][1]
261
262 # cull missing MDSs and clients
263 self.cull_missing_entries(raw_perf_counters, incoming_metrics)
264
265 # iterate over metrics list and update our copy (note that we have
266 # already culled the differences).
267 meta_refresh_ranks = set()
268 for counter in incoming_metrics:
269 mds_rank = int(counter['k'][0][0])
270 client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
271 if client_id is not None or not client_ip: # client_id _could_ be 0
272 with self.meta_lock:
273 if not client_id in self.client_metadata['metadata']:
274 meta_refresh_ranks.add(mds_rank)
275 self.set_client_metadata(client_id, "IP", client_ip)
276 else:
277 self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
278
279 raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
280 raw_counters[0] = True if mds_rank in delayed_ranks else False
281 raw_client_counters = raw_counters[1].setdefault(client_id, [])
282
283 del raw_client_counters[:]
284 raw_client_counters.extend(counter['c'])
285 # send an asynchronous client metadata refresh
286 self.update_client_meta(meta_refresh_ranks)
287
288 def get_raw_perf_counters_global(self, query):
289 raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
290 result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
291
292 self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters))
293 self.log.debug("get_raw_perf_counters_global={}".format(result))
294
295 global_metrics = result['metrics'][1]
296 self.cull_global_metrics(raw_perf_counters, global_metrics)
297 for counter in global_metrics:
298 client_id, _ = extract_client_id_and_ip(counter['k'][0][0])
299 raw_client_counters = raw_perf_counters.setdefault(client_id, [])
300 del raw_client_counters[:]
301 raw_client_counters.extend(counter['c'])
302
303 def process_mds_reports(self):
304 for query in self.user_queries.values():
305 self.get_raw_perf_counters(query)
306 self.get_raw_perf_counters_global(query)
307
308 def scrub_expired_queries(self):
309 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
310 for filter_spec in list(self.user_queries.keys()):
311 user_query = self.user_queries[filter_spec]
312 self.log.debug("scrubbing query={}".format(user_query))
313 if user_query[QUERY_LAST_REQUEST] < expire_time:
314 expired_query_ids = user_query[QUERY_IDS].copy()
315 expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
316 self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
317 self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
318 del self.user_queries[filter_spec]
319
320 def prepare_mds_perf_query(self, rank, client_id, client_ip):
321 mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
322 if not rank == -1:
323 mds_rank_regex = '^({})$'.format(rank)
324 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
325 return {
326 'key_descriptor' : [
327 {'type' : 'mds_rank', 'regex' : mds_rank_regex},
328 {'type' : 'client_id', 'regex' : client_regex},
329 ],
330 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
331 }
332
333 def prepare_global_perf_query(self, client_id, client_ip):
334 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
335 return {
336 'key_descriptor' : [
337 {'type' : 'client_id', 'regex' : client_regex},
338 ],
339 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS,
340 }
341
342 def unregister_mds_perf_queries(self, filter_spec, query_ids):
343 self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
344 filter_spec, query_ids))
345 for query_id in query_ids:
346 self.module.remove_mds_perf_query(query_id)
347
348 def register_mds_perf_query(self, filter_spec):
349 mds_ranks = filter_spec.mds_ranks
350 client_id = filter_spec.client_id
351 client_ip = filter_spec.client_ip
352
353 query_ids = []
354 try:
355 # register per-mds perf query
356 for rank in mds_ranks:
357 query = self.prepare_mds_perf_query(rank, client_id, client_ip)
358 self.log.info("register_mds_perf_query: {}".format(query))
359
360 query_id = self.module.add_mds_perf_query(query)
361 if query_id is None: # query id can be 0
362 raise RuntimeError("failed to add MDS perf query: {}".format(query))
363 query_ids.append(query_id)
364 except Exception:
365 for query_id in query_ids:
366 self.module.remove_mds_perf_query(query_id)
367 raise
368 return query_ids
369
370 def register_global_perf_query(self, filter_spec):
371 client_id = filter_spec.client_id
372 client_ip = filter_spec.client_ip
373
374 # register a global perf query for metrics
375 query = self.prepare_global_perf_query(client_id, client_ip)
376 self.log.info("register_global_perf_query: {}".format(query))
377
378 query_id = self.module.add_mds_perf_query(query)
379 if query_id is None: # query id can be 0
380 raise RuntimeError("failed to add global perf query: {}".format(query))
381 return query_id
382
383 def register_query(self, filter_spec):
384 user_query = self.user_queries.get(filter_spec, None)
385 if not user_query:
386 user_query = {
387 QUERY_IDS : self.register_mds_perf_query(filter_spec),
388 GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
389 QUERY_LAST_REQUEST : datetime.now(),
390 }
391 self.user_queries[filter_spec] = user_query
392
393 self.q_cv.notify()
394 self.r_cv.wait(5)
395 else:
396 user_query[QUERY_LAST_REQUEST] = datetime.now()
397 return user_query
398
399 def generate_report(self, user_query):
400 result = {} # type: Dict
401 # start with counter info -- metrics that are global and per mds
402 result["version"] = PERF_STATS_VERSION
403 result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
404 result["counters"] = MDS_PERF_QUERY_COUNTERS
405
406 # fill in client metadata
407 raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
408 with self.meta_lock:
409 result_meta = result.setdefault("client_metadata", {})
410 for client_id in raw_perfs.keys():
411 if client_id in self.client_metadata["metadata"]:
412 client_meta = result_meta.setdefault(client_id, {})
413 client_meta.update(self.client_metadata["metadata"][client_id])
414
415 # start populating global perf metrics w/ client metadata
416 metrics = result.setdefault("global_metrics", {})
417 for client_id, counters in raw_perfs.items():
418 global_client_metrics = metrics.setdefault(client_id, [])
419 del global_client_metrics[:]
420 global_client_metrics.extend(counters)
421
422 # and, now per-mds metrics keyed by mds rank along with delayed ranks
423 raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
424 metrics = result.setdefault("metrics", {})
425
426 metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]]
427 for rank, counters in raw_perfs.items():
428 mds_key = "mds.{}".format(rank)
429 mds_metrics = metrics.setdefault(mds_key, {})
430 mds_metrics.update(counters[1])
431 return result
432
433 def extract_query_filters(self, cmd):
434 mds_rank_spec = cmd.get('mds_rank', None)
435 client_id_spec = cmd.get('client_id', None)
436 client_ip_spec = cmd.get('client_ip', None)
437
438 self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
439 mds_rank_spec, client_id_spec, client_ip_spec))
440
441 mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec)
442 client_id = extract_client_id_from_spec(client_id_spec)
443 client_ip = extract_client_ip_from_spec(client_ip_spec)
444
445 return FilterSpec(mds_ranks, client_id, client_ip)
446
447 def get_perf_data(self, cmd):
448 filter_spec = self.extract_query_filters(cmd)
449
450 counters = {}
451 with self.lock:
452 user_query = self.register_query(filter_spec)
453 result = self.generate_report(user_query)
454 return 0, json.dumps(result), ""