]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | import re |
2 | import json | |
3 | import time | |
4 | import uuid | |
5 | import errno | |
6 | import traceback | |
7 | from collections import OrderedDict | |
8 | from typing import List, Dict, Set | |
9 | ||
10 | from mgr_module import CommandResult | |
11 | ||
12 | from datetime import datetime, timedelta | |
13 | from threading import Lock, Condition, Thread | |
14 | ||
15 | PERF_STATS_VERSION = 1 | |
16 | ||
17 | QUERY_IDS = "query_ids" | |
18 | GLOBAL_QUERY_ID = "global_query_id" | |
19 | QUERY_LAST_REQUEST = "last_time_stamp" | |
20 | QUERY_RAW_COUNTERS = "query_raw_counters" | |
21 | QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global" | |
22 | ||
23 | MDS_RANK_ALL = (-1,) | |
24 | CLIENT_ID_ALL = "\d*" | |
25 | CLIENT_IP_ALL = ".*" | |
26 | ||
27 | MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$' | |
28 | MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*' | |
29 | MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0, | |
30 | 'read_latency': 1, | |
31 | 'write_latency': 2, | |
32 | 'metadata_latency': 3, | |
33 | 'dentry_lease': 4, | |
34 | 'opened_files': 5, | |
35 | 'pinned_icaps': 6, | |
36 | 'opened_inodes': 7}) | |
37 | MDS_PERF_QUERY_COUNTERS = [] # type: List[str] | |
38 | MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes'] # type: List[str] | |
39 | ||
40 | QUERY_EXPIRE_INTERVAL = timedelta(minutes=1) | |
41 | ||
42 | CLIENT_METADATA_KEY = "client_metadata" | |
43 | CLIENT_METADATA_SUBKEYS = ["hostname", "root"] | |
44 | CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"] | |
45 | ||
46 | NON_EXISTENT_KEY_STR = "N/A" | |
47 | ||
48 | class FilterSpec(object): | |
49 | """ | |
50 | query filters encapsulated and used as key for query map | |
51 | """ | |
52 | def __init__(self, mds_ranks, client_id, client_ip): | |
53 | self.mds_ranks = mds_ranks | |
54 | self.client_id = client_id | |
55 | self.client_ip = client_ip | |
56 | ||
57 | def __hash__(self): | |
58 | return hash((self.mds_ranks, self.client_id, self.client_ip)) | |
59 | ||
60 | def __eq__(self, other): | |
61 | return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip) | |
62 | ||
63 | def __ne__(self, other): | |
64 | return not(self == other) | |
65 | ||
66 | def extract_mds_ranks_from_spec(mds_rank_spec): | |
67 | if not mds_rank_spec: | |
68 | return MDS_RANK_ALL | |
69 | match = re.match(r'^(\d[,\d]*)$', mds_rank_spec) | |
70 | if not match: | |
71 | raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec)) | |
72 | return tuple(int(mds_rank) for mds_rank in match.group(0).split(',')) | |
73 | ||
74 | def extract_client_id_from_spec(client_id_spec): | |
75 | if not client_id_spec: | |
76 | return CLIENT_ID_ALL | |
77 | # the client id is the spec itself since it'll be a part | |
78 | # of client filter regex. | |
79 | return client_id_spec | |
80 | ||
81 | def extract_client_ip_from_spec(client_ip_spec): | |
82 | if not client_ip_spec: | |
83 | return CLIENT_IP_ALL | |
84 | # TODO: validate if it is an ip address (or a subset of it). | |
85 | # the client ip is the spec itself since it'll be a part | |
86 | # of client filter regex. | |
87 | return client_ip_spec | |
88 | ||
89 | def extract_mds_ranks_from_report(mds_ranks_str): | |
90 | if not mds_ranks_str: | |
91 | return [] | |
92 | return [int(x) for x in mds_ranks_str.split(',')] | |
93 | ||
94 | def extract_client_id_and_ip(client): | |
95 | match = re.match(r'^(client\.\d+)\s(.*)', client) | |
96 | if match: | |
97 | return match.group(1), match.group(2) | |
98 | return None, None | |
99 | ||
100 | class FSPerfStats(object): | |
101 | lock = Lock() | |
102 | q_cv = Condition(lock) | |
103 | r_cv = Condition(lock) | |
104 | ||
105 | user_queries = {} # type: Dict[str, Dict] | |
106 | ||
107 | meta_lock = Lock() | |
108 | client_metadata = { | |
109 | 'metadata' : {}, | |
110 | 'to_purge' : set(), | |
111 | 'in_progress' : {}, | |
112 | } # type: Dict | |
113 | ||
114 | def __init__(self, module): | |
115 | self.module = module | |
116 | self.log = module.log | |
117 | # report processor thread | |
118 | self.report_processor = Thread(target=self.run) | |
119 | self.report_processor.start() | |
120 | ||
121 | def set_client_metadata(self, client_id, key, meta): | |
122 | result = self.client_metadata['metadata'].setdefault(client_id, {}) | |
123 | if not key in result or not result[key] == meta: | |
124 | result[key] = meta | |
125 | ||
126 | def notify(self, cmdtag): | |
127 | self.log.debug("cmdtag={0}".format(cmdtag)) | |
128 | with self.meta_lock: | |
129 | result = self.client_metadata['in_progress'].pop(cmdtag) | |
130 | client_meta = result[1].wait() | |
131 | if client_meta[0] != 0: | |
132 | self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format( | |
133 | result[0], client_meta[2])) | |
134 | return | |
135 | self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1]))) | |
136 | for metadata in json.loads(client_meta[1]): | |
137 | client_id = "client.{0}".format(metadata['id']) | |
138 | result = self.client_metadata['metadata'].setdefault(client_id, {}) | |
139 | for subkey in CLIENT_METADATA_SUBKEYS: | |
140 | self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey]) | |
141 | for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL: | |
142 | self.set_client_metadata(client_id, subkey, | |
143 | metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR)) | |
144 | metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16) | |
145 | supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)] | |
146 | self.set_client_metadata(client_id, "valid_metrics", supported_metrics) | |
147 | kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None) | |
148 | if kver: | |
149 | self.set_client_metadata(client_id, "kernel_version", kver) | |
150 | # when all async requests are done, purge clients metadata if any. | |
151 | if not self.client_metadata['in_progress']: | |
152 | for client in self.client_metadata['to_purge']: | |
153 | try: | |
154 | self.log.info("purge client metadata for {0}".format(client)) | |
155 | self.client_metadata['metadata'].remove(client) | |
156 | except: | |
157 | pass | |
158 | self.client_metadata['to_purge'].clear() | |
159 | self.log.debug("client_metadata={0}, to_purge={1}".format( | |
160 | self.client_metadata['metadata'], self.client_metadata['to_purge'])) | |
161 | ||
162 | def update_client_meta(self, rank_set): | |
163 | new_updates = {} | |
164 | pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()] | |
165 | with self.meta_lock: | |
166 | for rank in rank_set: | |
167 | if rank in pending_updates: | |
168 | continue | |
169 | tag = str(uuid.uuid4()) | |
170 | result = CommandResult(tag) | |
171 | new_updates[tag] = (rank, result) | |
172 | self.client_metadata['in_progress'].update(new_updates) | |
173 | ||
174 | self.log.debug("updating client metadata from {0}".format(new_updates)) | |
175 | ||
176 | cmd_dict = {'prefix': 'client ls'} | |
177 | for tag,val in new_updates.items(): | |
178 | self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag) | |
179 | ||
180 | def run(self): | |
181 | try: | |
182 | self.log.info("FSPerfStats::report_processor starting...") | |
183 | while True: | |
184 | with self.lock: | |
185 | self.scrub_expired_queries() | |
186 | self.process_mds_reports() | |
187 | self.r_cv.notify() | |
188 | ||
189 | stats_period = int(self.module.get_ceph_option("mgr_stats_period")) | |
190 | self.q_cv.wait(stats_period) | |
191 | self.log.debug("FSPerfStats::tick") | |
192 | except Exception as e: | |
193 | self.log.fatal("fatal error: {}".format(traceback.format_exc())) | |
194 | ||
195 | def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients): | |
196 | # this is pretty straight forward -- find what MDSs are missing from | |
197 | # what is tracked vs what we received in incoming report and purge | |
198 | # the whole bunch. | |
199 | tracked_ranks = raw_perf_counters.keys() | |
200 | available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics] | |
201 | for rank in set(tracked_ranks) - set(available_ranks): | |
202 | culled = raw_perf_counters.pop(rank) | |
203 | self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format( | |
204 | len(culled[1]), rank, "yes" if culled[0] else "no")) | |
205 | missing_clients.update(list(culled[1].keys())) | |
206 | ||
207 | def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients): | |
208 | # this is a bit more involed -- for each rank figure out what clients | |
209 | # are missing in incoming report and purge them from our tracked map. | |
210 | # but, if this is invoked _after_ cull_mds_entries(), the rank set | |
211 | # is same, so we can loop based on that assumption. | |
212 | ranks = raw_perf_counters.keys() | |
213 | for rank in ranks: | |
214 | tracked_clients = raw_perf_counters[rank][1].keys() | |
215 | available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics] | |
216 | for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]): | |
217 | raw_perf_counters[rank][1].pop(client) | |
218 | self.log.info("culled {0} from rank {1}".format(client, rank)) | |
219 | missing_clients.add(client) | |
220 | ||
221 | def cull_missing_entries(self, raw_perf_counters, incoming_metrics): | |
222 | missing_clients = set() # type: Set[str] | |
223 | self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients) | |
224 | self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients) | |
225 | ||
226 | self.log.debug("missing_clients={0}".format(missing_clients)) | |
227 | with self.meta_lock: | |
228 | if self.client_metadata['in_progress']: | |
229 | self.client_metadata['to_purge'].update(missing_clients) | |
230 | self.log.info("deferring client metadata purge (now {0} client(s))".format( | |
231 | len(self.client_metadata['to_purge']))) | |
232 | else: | |
233 | for client in missing_clients: | |
234 | try: | |
235 | self.log.info("purge client metadata for {0}".format(client)) | |
236 | self.client_metadata['metadata'].pop(client) | |
237 | except KeyError: | |
238 | pass | |
239 | self.log.debug("client_metadata={0}".format(self.client_metadata['metadata'])) | |
240 | ||
241 | def cull_global_metrics(self, raw_perf_counters, incoming_metrics): | |
242 | tracked_clients = raw_perf_counters.keys() | |
243 | available_clients = [counter['k'][0][0] for counter in incoming_metrics] | |
244 | for client in set(tracked_clients) - set(available_clients): | |
245 | raw_perf_counters.pop(client) | |
246 | ||
247 | def get_raw_perf_counters(self, query): | |
248 | raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {}) | |
249 | ||
250 | for query_id in query[QUERY_IDS]: | |
251 | result = self.module.get_mds_perf_counters(query_id) | |
252 | self.log.debug("raw_perf_counters={}".format(raw_perf_counters)) | |
253 | self.log.debug("get_raw_perf_counters={}".format(result)) | |
254 | ||
255 | # extract passed in delayed ranks. metrics for delayed ranks are tagged | |
256 | # as stale. | |
257 | delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0]) | |
258 | ||
259 | # what's received from MDS | |
260 | incoming_metrics = result['metrics'][1] | |
261 | ||
262 | # cull missing MDSs and clients | |
263 | self.cull_missing_entries(raw_perf_counters, incoming_metrics) | |
264 | ||
265 | # iterate over metrics list and update our copy (note that we have | |
266 | # already culled the differences). | |
267 | meta_refresh_ranks = set() | |
268 | for counter in incoming_metrics: | |
269 | mds_rank = int(counter['k'][0][0]) | |
270 | client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0]) | |
271 | if client_id is not None or not client_ip: # client_id _could_ be 0 | |
272 | with self.meta_lock: | |
273 | if not client_id in self.client_metadata['metadata']: | |
274 | meta_refresh_ranks.add(mds_rank) | |
275 | self.set_client_metadata(client_id, "IP", client_ip) | |
276 | else: | |
277 | self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id)) | |
278 | ||
279 | raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}]) | |
280 | raw_counters[0] = True if mds_rank in delayed_ranks else False | |
281 | raw_client_counters = raw_counters[1].setdefault(client_id, []) | |
282 | ||
283 | del raw_client_counters[:] | |
284 | raw_client_counters.extend(counter['c']) | |
285 | # send an asynchronous client metadata refresh | |
286 | self.update_client_meta(meta_refresh_ranks) | |
287 | ||
288 | def get_raw_perf_counters_global(self, query): | |
289 | raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) | |
290 | result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID]) | |
291 | ||
292 | self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters)) | |
293 | self.log.debug("get_raw_perf_counters_global={}".format(result)) | |
294 | ||
295 | global_metrics = result['metrics'][1] | |
296 | self.cull_global_metrics(raw_perf_counters, global_metrics) | |
297 | for counter in global_metrics: | |
298 | client_id, _ = extract_client_id_and_ip(counter['k'][0][0]) | |
299 | raw_client_counters = raw_perf_counters.setdefault(client_id, []) | |
300 | del raw_client_counters[:] | |
301 | raw_client_counters.extend(counter['c']) | |
302 | ||
303 | def process_mds_reports(self): | |
304 | for query in self.user_queries.values(): | |
305 | self.get_raw_perf_counters(query) | |
306 | self.get_raw_perf_counters_global(query) | |
307 | ||
308 | def scrub_expired_queries(self): | |
309 | expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL | |
310 | for filter_spec in list(self.user_queries.keys()): | |
311 | user_query = self.user_queries[filter_spec] | |
312 | self.log.debug("scrubbing query={}".format(user_query)) | |
313 | if user_query[QUERY_LAST_REQUEST] < expire_time: | |
314 | expired_query_ids = user_query[QUERY_IDS].copy() | |
315 | expired_query_ids.append(user_query[GLOBAL_QUERY_ID]) | |
316 | self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids)) | |
317 | self.unregister_mds_perf_queries(filter_spec, expired_query_ids) | |
318 | del self.user_queries[filter_spec] | |
319 | ||
320 | def prepare_mds_perf_query(self, rank, client_id, client_ip): | |
321 | mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS | |
322 | if not rank == -1: | |
323 | mds_rank_regex = '^({})$'.format(rank) | |
324 | client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) | |
325 | return { | |
326 | 'key_descriptor' : [ | |
327 | {'type' : 'mds_rank', 'regex' : mds_rank_regex}, | |
328 | {'type' : 'client_id', 'regex' : client_regex}, | |
329 | ], | |
330 | 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS, | |
331 | } | |
332 | ||
333 | def prepare_global_perf_query(self, client_id, client_ip): | |
334 | client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) | |
335 | return { | |
336 | 'key_descriptor' : [ | |
337 | {'type' : 'client_id', 'regex' : client_regex}, | |
338 | ], | |
339 | 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS, | |
340 | } | |
341 | ||
342 | def unregister_mds_perf_queries(self, filter_spec, query_ids): | |
343 | self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format( | |
344 | filter_spec, query_ids)) | |
345 | for query_id in query_ids: | |
346 | self.module.remove_mds_perf_query(query_id) | |
347 | ||
348 | def register_mds_perf_query(self, filter_spec): | |
349 | mds_ranks = filter_spec.mds_ranks | |
350 | client_id = filter_spec.client_id | |
351 | client_ip = filter_spec.client_ip | |
352 | ||
353 | query_ids = [] | |
354 | try: | |
355 | # register per-mds perf query | |
356 | for rank in mds_ranks: | |
357 | query = self.prepare_mds_perf_query(rank, client_id, client_ip) | |
358 | self.log.info("register_mds_perf_query: {}".format(query)) | |
359 | ||
360 | query_id = self.module.add_mds_perf_query(query) | |
361 | if query_id is None: # query id can be 0 | |
362 | raise RuntimeError("failed to add MDS perf query: {}".format(query)) | |
363 | query_ids.append(query_id) | |
364 | except Exception: | |
365 | for query_id in query_ids: | |
366 | self.module.remove_mds_perf_query(query_id) | |
367 | raise | |
368 | return query_ids | |
369 | ||
370 | def register_global_perf_query(self, filter_spec): | |
371 | client_id = filter_spec.client_id | |
372 | client_ip = filter_spec.client_ip | |
373 | ||
374 | # register a global perf query for metrics | |
375 | query = self.prepare_global_perf_query(client_id, client_ip) | |
376 | self.log.info("register_global_perf_query: {}".format(query)) | |
377 | ||
378 | query_id = self.module.add_mds_perf_query(query) | |
379 | if query_id is None: # query id can be 0 | |
380 | raise RuntimeError("failed to add global perf query: {}".format(query)) | |
381 | return query_id | |
382 | ||
383 | def register_query(self, filter_spec): | |
384 | user_query = self.user_queries.get(filter_spec, None) | |
385 | if not user_query: | |
386 | user_query = { | |
387 | QUERY_IDS : self.register_mds_perf_query(filter_spec), | |
388 | GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec), | |
389 | QUERY_LAST_REQUEST : datetime.now(), | |
390 | } | |
391 | self.user_queries[filter_spec] = user_query | |
392 | ||
393 | self.q_cv.notify() | |
394 | self.r_cv.wait(5) | |
395 | else: | |
396 | user_query[QUERY_LAST_REQUEST] = datetime.now() | |
397 | return user_query | |
398 | ||
399 | def generate_report(self, user_query): | |
400 | result = {} # type: Dict | |
401 | # start with counter info -- metrics that are global and per mds | |
402 | result["version"] = PERF_STATS_VERSION | |
403 | result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS | |
404 | result["counters"] = MDS_PERF_QUERY_COUNTERS | |
405 | ||
406 | # fill in client metadata | |
407 | raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) | |
408 | with self.meta_lock: | |
409 | result_meta = result.setdefault("client_metadata", {}) | |
410 | for client_id in raw_perfs.keys(): | |
411 | if client_id in self.client_metadata["metadata"]: | |
412 | client_meta = result_meta.setdefault(client_id, {}) | |
413 | client_meta.update(self.client_metadata["metadata"][client_id]) | |
414 | ||
415 | # start populating global perf metrics w/ client metadata | |
416 | metrics = result.setdefault("global_metrics", {}) | |
417 | for client_id, counters in raw_perfs.items(): | |
418 | global_client_metrics = metrics.setdefault(client_id, []) | |
419 | del global_client_metrics[:] | |
420 | global_client_metrics.extend(counters) | |
421 | ||
422 | # and, now per-mds metrics keyed by mds rank along with delayed ranks | |
423 | raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {}) | |
424 | metrics = result.setdefault("metrics", {}) | |
425 | ||
426 | metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]] | |
427 | for rank, counters in raw_perfs.items(): | |
428 | mds_key = "mds.{}".format(rank) | |
429 | mds_metrics = metrics.setdefault(mds_key, {}) | |
430 | mds_metrics.update(counters[1]) | |
431 | return result | |
432 | ||
433 | def extract_query_filters(self, cmd): | |
434 | mds_rank_spec = cmd.get('mds_rank', None) | |
435 | client_id_spec = cmd.get('client_id', None) | |
436 | client_ip_spec = cmd.get('client_ip', None) | |
437 | ||
438 | self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format( | |
439 | mds_rank_spec, client_id_spec, client_ip_spec)) | |
440 | ||
441 | mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec) | |
442 | client_id = extract_client_id_from_spec(client_id_spec) | |
443 | client_ip = extract_client_ip_from_spec(client_ip_spec) | |
444 | ||
445 | return FilterSpec(mds_ranks, client_id, client_ip) | |
446 | ||
447 | def get_perf_data(self, cmd): | |
448 | filter_spec = self.extract_query_filters(cmd) | |
449 | ||
450 | counters = {} | |
451 | with self.lock: | |
452 | user_query = self.register_query(filter_spec) | |
453 | result = self.generate_report(user_query) | |
454 | return 0, json.dumps(result), "" |