]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | import re |
2 | import json | |
3 | import time | |
4 | import uuid | |
5 | import errno | |
6 | import traceback | |
33c7a0ef | 7 | import logging |
f67539c2 TL |
8 | from collections import OrderedDict |
9 | from typing import List, Dict, Set | |
10 | ||
11 | from mgr_module import CommandResult | |
12 | ||
13 | from datetime import datetime, timedelta | |
33c7a0ef | 14 | from threading import Lock, Condition, Thread, Timer |
20effc67 | 15 | from ipaddress import ip_address |
f67539c2 | 16 | |
39ae355f | 17 | PERF_STATS_VERSION = 2 |
f67539c2 TL |
18 | |
19 | QUERY_IDS = "query_ids" | |
20 | GLOBAL_QUERY_ID = "global_query_id" | |
21 | QUERY_LAST_REQUEST = "last_time_stamp" | |
22 | QUERY_RAW_COUNTERS = "query_raw_counters" | |
23 | QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global" | |
24 | ||
25 | MDS_RANK_ALL = (-1,) | |
26 | CLIENT_ID_ALL = "\d*" | |
27 | CLIENT_IP_ALL = ".*" | |
28 | ||
39ae355f TL |
29 | fs_list = [] # type: List[str] |
30 | ||
f67539c2 TL |
31 | MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$' |
32 | MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*' | |
33 | MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0, | |
34 | 'read_latency': 1, | |
35 | 'write_latency': 2, | |
36 | 'metadata_latency': 3, | |
37 | 'dentry_lease': 4, | |
38 | 'opened_files': 5, | |
39 | 'pinned_icaps': 6, | |
a4b75251 TL |
40 | 'opened_inodes': 7, |
41 | 'read_io_sizes': 8, | |
2a845540 TL |
42 | 'write_io_sizes': 9, |
43 | 'avg_read_latency': 10, | |
44 | 'stdev_read_latency': 11, | |
45 | 'avg_write_latency': 12, | |
46 | 'stdev_write_latency': 13, | |
47 | 'avg_metadata_latency': 14, | |
48 | 'stdev_metadata_latency': 15}) | |
f67539c2 | 49 | MDS_PERF_QUERY_COUNTERS = [] # type: List[str] |
2a845540 | 50 | MDS_GLOBAL_PERF_QUERY_COUNTERS = list(MDS_PERF_QUERY_COUNTERS_MAP.keys()) |
f67539c2 TL |
51 | |
52 | QUERY_EXPIRE_INTERVAL = timedelta(minutes=1) | |
33c7a0ef | 53 | REREGISTER_TIMER_INTERVAL = 1 |
f67539c2 TL |
54 | |
55 | CLIENT_METADATA_KEY = "client_metadata" | |
56 | CLIENT_METADATA_SUBKEYS = ["hostname", "root"] | |
57 | CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"] | |
58 | ||
59 | NON_EXISTENT_KEY_STR = "N/A" | |
60 | ||
33c7a0ef TL |
61 | logger = logging.getLogger(__name__) |
62 | ||
f67539c2 TL |
63 | class FilterSpec(object): |
64 | """ | |
65 | query filters encapsulated and used as key for query map | |
66 | """ | |
67 | def __init__(self, mds_ranks, client_id, client_ip): | |
68 | self.mds_ranks = mds_ranks | |
69 | self.client_id = client_id | |
70 | self.client_ip = client_ip | |
71 | ||
72 | def __hash__(self): | |
73 | return hash((self.mds_ranks, self.client_id, self.client_ip)) | |
74 | ||
75 | def __eq__(self, other): | |
76 | return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip) | |
77 | ||
78 | def __ne__(self, other): | |
79 | return not(self == other) | |
80 | ||
81 | def extract_mds_ranks_from_spec(mds_rank_spec): | |
82 | if not mds_rank_spec: | |
83 | return MDS_RANK_ALL | |
20effc67 | 84 | match = re.match(r'^\d+(,\d+)*$', mds_rank_spec) |
f67539c2 TL |
85 | if not match: |
86 | raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec)) | |
87 | return tuple(int(mds_rank) for mds_rank in match.group(0).split(',')) | |
88 | ||
89 | def extract_client_id_from_spec(client_id_spec): | |
90 | if not client_id_spec: | |
91 | return CLIENT_ID_ALL | |
92 | # the client id is the spec itself since it'll be a part | |
93 | # of client filter regex. | |
20effc67 TL |
94 | if not client_id_spec.isdigit(): |
95 | raise ValueError('invalid client_id filter spec: {}'.format(client_id_spec)) | |
f67539c2 TL |
96 | return client_id_spec |
97 | ||
98 | def extract_client_ip_from_spec(client_ip_spec): | |
99 | if not client_ip_spec: | |
100 | return CLIENT_IP_ALL | |
20effc67 TL |
101 | |
102 | client_ip = client_ip_spec | |
103 | if client_ip.startswith('v1:'): | |
104 | client_ip = client_ip.replace('v1:', '') | |
105 | elif client_ip.startswith('v2:'): | |
106 | client_ip = client_ip.replace('v2:', '') | |
107 | ||
108 | try: | |
109 | ip_address(client_ip) | |
110 | return client_ip_spec | |
111 | except ValueError: | |
112 | raise ValueError('invalid client_ip filter spec: {}'.format(client_ip_spec)) | |
f67539c2 TL |
113 | |
114 | def extract_mds_ranks_from_report(mds_ranks_str): | |
115 | if not mds_ranks_str: | |
116 | return [] | |
117 | return [int(x) for x in mds_ranks_str.split(',')] | |
118 | ||
119 | def extract_client_id_and_ip(client): | |
120 | match = re.match(r'^(client\.\d+)\s(.*)', client) | |
121 | if match: | |
122 | return match.group(1), match.group(2) | |
123 | return None, None | |
124 | ||
125 | class FSPerfStats(object): | |
126 | lock = Lock() | |
127 | q_cv = Condition(lock) | |
128 | r_cv = Condition(lock) | |
129 | ||
130 | user_queries = {} # type: Dict[str, Dict] | |
131 | ||
132 | meta_lock = Lock() | |
33c7a0ef | 133 | rqtimer = None |
f67539c2 TL |
134 | client_metadata = { |
135 | 'metadata' : {}, | |
136 | 'to_purge' : set(), | |
137 | 'in_progress' : {}, | |
138 | } # type: Dict | |
139 | ||
140 | def __init__(self, module): | |
141 | self.module = module | |
142 | self.log = module.log | |
33c7a0ef | 143 | self.prev_rank0_gid = None |
f67539c2 TL |
144 | # report processor thread |
145 | self.report_processor = Thread(target=self.run) | |
146 | self.report_processor.start() | |
147 | ||
39ae355f TL |
148 | def set_client_metadata(self, fs_name, client_id, key, meta): |
149 | result = (self.client_metadata['metadata'].setdefault( | |
150 | fs_name, {})).setdefault(client_id, {}) | |
f67539c2 TL |
151 | if not key in result or not result[key] == meta: |
152 | result[key] = meta | |
153 | ||
33c7a0ef | 154 | def notify_cmd(self, cmdtag): |
f67539c2 TL |
155 | self.log.debug("cmdtag={0}".format(cmdtag)) |
156 | with self.meta_lock: | |
522d829b TL |
157 | try: |
158 | result = self.client_metadata['in_progress'].pop(cmdtag) | |
159 | except KeyError: | |
160 | self.log.warn(f"cmdtag {cmdtag} not found in client metadata") | |
161 | return | |
39ae355f TL |
162 | fs_name = result[0] |
163 | client_meta = result[2].wait() | |
f67539c2 | 164 | if client_meta[0] != 0: |
39ae355f TL |
165 | self.log.warn("failed to fetch client metadata from gid {0}, err={1}".format( |
166 | result[1], client_meta[2])) | |
f67539c2 TL |
167 | return |
168 | self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1]))) | |
169 | for metadata in json.loads(client_meta[1]): | |
170 | client_id = "client.{0}".format(metadata['id']) | |
39ae355f | 171 | result = (self.client_metadata['metadata'].setdefault(fs_name, {})).setdefault(client_id, {}) |
f67539c2 | 172 | for subkey in CLIENT_METADATA_SUBKEYS: |
39ae355f | 173 | self.set_client_metadata(fs_name, client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey]) |
f67539c2 | 174 | for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL: |
39ae355f | 175 | self.set_client_metadata(fs_name, client_id, subkey, |
f67539c2 TL |
176 | metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR)) |
177 | metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16) | |
178 | supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)] | |
39ae355f | 179 | self.set_client_metadata(fs_name, client_id, "valid_metrics", supported_metrics) |
f67539c2 TL |
180 | kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None) |
181 | if kver: | |
39ae355f | 182 | self.set_client_metadata(fs_name, client_id, "kernel_version", kver) |
f67539c2 TL |
183 | # when all async requests are done, purge clients metadata if any. |
184 | if not self.client_metadata['in_progress']: | |
39ae355f TL |
185 | global fs_list |
186 | for fs_name in fs_list: | |
187 | for client in self.client_metadata['to_purge']: | |
188 | try: | |
189 | if client in self.client_metadata['metadata'][fs_name]: | |
190 | self.log.info("purge client metadata for {0}".format(client)) | |
191 | self.client_metadata['metadata'][fs_name].pop(client) | |
192 | except: | |
193 | pass | |
194 | if fs_name in self.client_metadata['metadata'] and not bool(self.client_metadata['metadata'][fs_name]): | |
195 | self.client_metadata['metadata'].pop(fs_name) | |
f67539c2 TL |
196 | self.client_metadata['to_purge'].clear() |
197 | self.log.debug("client_metadata={0}, to_purge={1}".format( | |
198 | self.client_metadata['metadata'], self.client_metadata['to_purge'])) | |
199 | ||
33c7a0ef TL |
200 | def notify_fsmap(self): |
201 | #Reregister the user queries when there is a new rank0 mds | |
202 | with self.lock: | |
203 | gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map')) | |
204 | if not gid_state: | |
205 | return | |
39ae355f TL |
206 | for value in gid_state: |
207 | rank0_gid, state = value | |
208 | if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'): | |
209 | #the new rank0 MDS is up:active | |
210 | ua_last_updated = time.monotonic() | |
211 | if (self.rqtimer and self.rqtimer.is_alive()): | |
212 | self.rqtimer.cancel() | |
213 | self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL, | |
214 | self.re_register_queries, | |
215 | args=(rank0_gid, ua_last_updated,)) | |
216 | self.rqtimer.start() | |
33c7a0ef TL |
217 | |
218 | def re_register_queries(self, rank0_gid, ua_last_updated): | |
219 | #reregister queries if the metrics are the latest. Otherwise reschedule the timer and | |
220 | #wait for the empty metrics | |
221 | with self.lock: | |
222 | if self.mx_last_updated >= ua_last_updated: | |
223 | self.log.debug("reregistering queries...") | |
224 | self.module.reregister_mds_perf_queries() | |
225 | self.prev_rank0_gid = rank0_gid | |
226 | else: | |
227 | #reschedule the timer | |
228 | self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL, | |
229 | self.re_register_queries, args=(rank0_gid, ua_last_updated,)) | |
230 | self.rqtimer.start() | |
231 | ||
232 | @staticmethod | |
233 | def get_rank0_mds_gid_state(fsmap): | |
39ae355f | 234 | gid_state = [] |
33c7a0ef TL |
235 | for fs in fsmap['filesystems']: |
236 | mds_map = fs['mdsmap'] | |
237 | if mds_map is not None: | |
238 | for mds_id, mds_status in mds_map['info'].items(): | |
239 | if mds_status['rank'] == 0: | |
39ae355f TL |
240 | gid_state.append([mds_status['gid'], mds_status['state']]) |
241 | if gid_state: | |
242 | return gid_state | |
33c7a0ef TL |
243 | logger.warn("No rank0 mds in the fsmap") |
244 | ||
245 | def update_client_meta(self): | |
f67539c2 TL |
246 | new_updates = {} |
247 | pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()] | |
39ae355f TL |
248 | global fs_list |
249 | fs_list.clear() | |
f67539c2 | 250 | with self.meta_lock: |
33c7a0ef TL |
251 | fsmap = self.module.get('fs_map') |
252 | for fs in fsmap['filesystems']: | |
39ae355f TL |
253 | mds_map = fs['mdsmap'] |
254 | if mds_map is not None: | |
255 | fsname = mds_map['fs_name'] | |
256 | for mds_id, mds_status in mds_map['info'].items(): | |
257 | if mds_status['rank'] == 0: | |
258 | fs_list.append(fsname) | |
259 | rank0_gid = mds_status['gid'] | |
260 | tag = str(uuid.uuid4()) | |
261 | result = CommandResult(tag) | |
262 | new_updates[tag] = (fsname, rank0_gid, result) | |
263 | self.client_metadata['in_progress'].update(new_updates) | |
f67539c2 | 264 | |
33c7a0ef | 265 | self.log.debug(f"updating client metadata from {new_updates}") |
f67539c2 TL |
266 | |
267 | cmd_dict = {'prefix': 'client ls'} | |
268 | for tag,val in new_updates.items(): | |
39ae355f | 269 | self.module.send_command(val[2], "mds", str(val[1]), json.dumps(cmd_dict), tag) |
f67539c2 TL |
270 | |
271 | def run(self): | |
272 | try: | |
273 | self.log.info("FSPerfStats::report_processor starting...") | |
274 | while True: | |
275 | with self.lock: | |
276 | self.scrub_expired_queries() | |
277 | self.process_mds_reports() | |
278 | self.r_cv.notify() | |
279 | ||
280 | stats_period = int(self.module.get_ceph_option("mgr_stats_period")) | |
281 | self.q_cv.wait(stats_period) | |
282 | self.log.debug("FSPerfStats::tick") | |
283 | except Exception as e: | |
284 | self.log.fatal("fatal error: {}".format(traceback.format_exc())) | |
285 | ||
286 | def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients): | |
287 | # this is pretty straight forward -- find what MDSs are missing from | |
288 | # what is tracked vs what we received in incoming report and purge | |
289 | # the whole bunch. | |
290 | tracked_ranks = raw_perf_counters.keys() | |
291 | available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics] | |
292 | for rank in set(tracked_ranks) - set(available_ranks): | |
293 | culled = raw_perf_counters.pop(rank) | |
294 | self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format( | |
295 | len(culled[1]), rank, "yes" if culled[0] else "no")) | |
296 | missing_clients.update(list(culled[1].keys())) | |
297 | ||
298 | def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients): | |
299 | # this is a bit more involed -- for each rank figure out what clients | |
300 | # are missing in incoming report and purge them from our tracked map. | |
39ae355f | 301 | # but, if this is invoked after cull_mds_entries(), the rank set |
f67539c2 TL |
302 | # is same, so we can loop based on that assumption. |
303 | ranks = raw_perf_counters.keys() | |
304 | for rank in ranks: | |
305 | tracked_clients = raw_perf_counters[rank][1].keys() | |
306 | available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics] | |
307 | for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]): | |
308 | raw_perf_counters[rank][1].pop(client) | |
309 | self.log.info("culled {0} from rank {1}".format(client, rank)) | |
310 | missing_clients.add(client) | |
311 | ||
312 | def cull_missing_entries(self, raw_perf_counters, incoming_metrics): | |
313 | missing_clients = set() # type: Set[str] | |
314 | self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients) | |
315 | self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients) | |
316 | ||
317 | self.log.debug("missing_clients={0}".format(missing_clients)) | |
318 | with self.meta_lock: | |
319 | if self.client_metadata['in_progress']: | |
320 | self.client_metadata['to_purge'].update(missing_clients) | |
321 | self.log.info("deferring client metadata purge (now {0} client(s))".format( | |
322 | len(self.client_metadata['to_purge']))) | |
323 | else: | |
39ae355f TL |
324 | global fs_list |
325 | for fs_name in fs_list: | |
326 | for client in missing_clients: | |
327 | try: | |
328 | self.log.info("purge client metadata for {0}".format(client)) | |
329 | if client in self.client_metadata['metadata'][fs_name]: | |
330 | self.client_metadata['metadata'][fs_name].pop(client) | |
331 | except KeyError: | |
332 | pass | |
333 | self.log.debug("client_metadata={0}".format(self.client_metadata['metadata'])) | |
f67539c2 TL |
334 | |
335 | def cull_global_metrics(self, raw_perf_counters, incoming_metrics): | |
336 | tracked_clients = raw_perf_counters.keys() | |
337 | available_clients = [counter['k'][0][0] for counter in incoming_metrics] | |
338 | for client in set(tracked_clients) - set(available_clients): | |
339 | raw_perf_counters.pop(client) | |
340 | ||
341 | def get_raw_perf_counters(self, query): | |
342 | raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {}) | |
343 | ||
344 | for query_id in query[QUERY_IDS]: | |
345 | result = self.module.get_mds_perf_counters(query_id) | |
346 | self.log.debug("raw_perf_counters={}".format(raw_perf_counters)) | |
347 | self.log.debug("get_raw_perf_counters={}".format(result)) | |
348 | ||
349 | # extract passed in delayed ranks. metrics for delayed ranks are tagged | |
350 | # as stale. | |
351 | delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0]) | |
352 | ||
353 | # what's received from MDS | |
354 | incoming_metrics = result['metrics'][1] | |
355 | ||
33c7a0ef TL |
356 | # metrics updated (monotonic) time |
357 | self.mx_last_updated = result['metrics'][2][0] | |
358 | ||
f67539c2 TL |
359 | # cull missing MDSs and clients |
360 | self.cull_missing_entries(raw_perf_counters, incoming_metrics) | |
361 | ||
362 | # iterate over metrics list and update our copy (note that we have | |
363 | # already culled the differences). | |
39ae355f TL |
364 | global fs_list |
365 | for fs_name in fs_list: | |
366 | for counter in incoming_metrics: | |
367 | mds_rank = int(counter['k'][0][0]) | |
368 | client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0]) | |
369 | if self.client_metadata['metadata'].get(fs_name): | |
370 | if (client_id is not None or not client_ip) and\ | |
371 | self.client_metadata["metadata"][fs_name].get(client_id): # client_id _could_ be 0 | |
372 | with self.meta_lock: | |
373 | self.set_client_metadata(fs_name, client_id, "IP", client_ip) | |
374 | else: | |
375 | self.log.warn(f"client metadata for client_id={client_id} might be unavailable") | |
376 | else: | |
377 | self.log.warn(f"client metadata for filesystem={fs_name} might be unavailable") | |
378 | ||
379 | raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}]) | |
380 | raw_counters[0] = True if mds_rank in delayed_ranks else False | |
381 | raw_client_counters = raw_counters[1].setdefault(client_id, []) | |
382 | ||
383 | del raw_client_counters[:] | |
384 | raw_client_counters.extend(counter['c']) | |
f67539c2 | 385 | # send an asynchronous client metadata refresh |
33c7a0ef | 386 | self.update_client_meta() |
f67539c2 TL |
387 | |
388 | def get_raw_perf_counters_global(self, query): | |
389 | raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) | |
390 | result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID]) | |
391 | ||
392 | self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters)) | |
393 | self.log.debug("get_raw_perf_counters_global={}".format(result)) | |
394 | ||
395 | global_metrics = result['metrics'][1] | |
396 | self.cull_global_metrics(raw_perf_counters, global_metrics) | |
397 | for counter in global_metrics: | |
398 | client_id, _ = extract_client_id_and_ip(counter['k'][0][0]) | |
399 | raw_client_counters = raw_perf_counters.setdefault(client_id, []) | |
400 | del raw_client_counters[:] | |
401 | raw_client_counters.extend(counter['c']) | |
402 | ||
403 | def process_mds_reports(self): | |
404 | for query in self.user_queries.values(): | |
405 | self.get_raw_perf_counters(query) | |
406 | self.get_raw_perf_counters_global(query) | |
407 | ||
408 | def scrub_expired_queries(self): | |
409 | expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL | |
410 | for filter_spec in list(self.user_queries.keys()): | |
411 | user_query = self.user_queries[filter_spec] | |
412 | self.log.debug("scrubbing query={}".format(user_query)) | |
413 | if user_query[QUERY_LAST_REQUEST] < expire_time: | |
414 | expired_query_ids = user_query[QUERY_IDS].copy() | |
415 | expired_query_ids.append(user_query[GLOBAL_QUERY_ID]) | |
416 | self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids)) | |
417 | self.unregister_mds_perf_queries(filter_spec, expired_query_ids) | |
418 | del self.user_queries[filter_spec] | |
419 | ||
420 | def prepare_mds_perf_query(self, rank, client_id, client_ip): | |
421 | mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS | |
422 | if not rank == -1: | |
423 | mds_rank_regex = '^({})$'.format(rank) | |
424 | client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) | |
425 | return { | |
426 | 'key_descriptor' : [ | |
427 | {'type' : 'mds_rank', 'regex' : mds_rank_regex}, | |
428 | {'type' : 'client_id', 'regex' : client_regex}, | |
429 | ], | |
430 | 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS, | |
431 | } | |
432 | ||
433 | def prepare_global_perf_query(self, client_id, client_ip): | |
434 | client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) | |
435 | return { | |
436 | 'key_descriptor' : [ | |
437 | {'type' : 'client_id', 'regex' : client_regex}, | |
438 | ], | |
439 | 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS, | |
440 | } | |
441 | ||
442 | def unregister_mds_perf_queries(self, filter_spec, query_ids): | |
443 | self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format( | |
444 | filter_spec, query_ids)) | |
445 | for query_id in query_ids: | |
446 | self.module.remove_mds_perf_query(query_id) | |
447 | ||
448 | def register_mds_perf_query(self, filter_spec): | |
449 | mds_ranks = filter_spec.mds_ranks | |
450 | client_id = filter_spec.client_id | |
451 | client_ip = filter_spec.client_ip | |
452 | ||
453 | query_ids = [] | |
454 | try: | |
455 | # register per-mds perf query | |
456 | for rank in mds_ranks: | |
457 | query = self.prepare_mds_perf_query(rank, client_id, client_ip) | |
458 | self.log.info("register_mds_perf_query: {}".format(query)) | |
459 | ||
460 | query_id = self.module.add_mds_perf_query(query) | |
461 | if query_id is None: # query id can be 0 | |
462 | raise RuntimeError("failed to add MDS perf query: {}".format(query)) | |
463 | query_ids.append(query_id) | |
464 | except Exception: | |
465 | for query_id in query_ids: | |
466 | self.module.remove_mds_perf_query(query_id) | |
467 | raise | |
468 | return query_ids | |
469 | ||
470 | def register_global_perf_query(self, filter_spec): | |
471 | client_id = filter_spec.client_id | |
472 | client_ip = filter_spec.client_ip | |
473 | ||
474 | # register a global perf query for metrics | |
475 | query = self.prepare_global_perf_query(client_id, client_ip) | |
476 | self.log.info("register_global_perf_query: {}".format(query)) | |
477 | ||
478 | query_id = self.module.add_mds_perf_query(query) | |
479 | if query_id is None: # query id can be 0 | |
480 | raise RuntimeError("failed to add global perf query: {}".format(query)) | |
481 | return query_id | |
482 | ||
483 | def register_query(self, filter_spec): | |
484 | user_query = self.user_queries.get(filter_spec, None) | |
485 | if not user_query: | |
486 | user_query = { | |
487 | QUERY_IDS : self.register_mds_perf_query(filter_spec), | |
488 | GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec), | |
489 | QUERY_LAST_REQUEST : datetime.now(), | |
490 | } | |
491 | self.user_queries[filter_spec] = user_query | |
492 | ||
493 | self.q_cv.notify() | |
494 | self.r_cv.wait(5) | |
495 | else: | |
496 | user_query[QUERY_LAST_REQUEST] = datetime.now() | |
497 | return user_query | |
498 | ||
499 | def generate_report(self, user_query): | |
500 | result = {} # type: Dict | |
39ae355f | 501 | global fs_list |
f67539c2 TL |
502 | # start with counter info -- metrics that are global and per mds |
503 | result["version"] = PERF_STATS_VERSION | |
504 | result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS | |
505 | result["counters"] = MDS_PERF_QUERY_COUNTERS | |
506 | ||
507 | # fill in client metadata | |
39ae355f TL |
508 | raw_perfs_global = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) |
509 | raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {}) | |
f67539c2 | 510 | with self.meta_lock: |
39ae355f TL |
511 | raw_counters_clients = [] |
512 | for val in raw_perfs.values(): | |
513 | raw_counters_clients.extend(list(val[1])) | |
f67539c2 | 514 | result_meta = result.setdefault("client_metadata", {}) |
39ae355f TL |
515 | for fs_name in fs_list: |
516 | meta = self.client_metadata["metadata"] | |
517 | if fs_name in meta and len(meta[fs_name]): | |
518 | for client_id in raw_perfs_global.keys(): | |
519 | if client_id in meta[fs_name] and client_id in raw_counters_clients: | |
520 | client_meta = (result_meta.setdefault(fs_name, {})).setdefault(client_id, {}) | |
521 | client_meta.update(meta[fs_name][client_id]) | |
f67539c2 TL |
522 | |
523 | # start populating global perf metrics w/ client metadata | |
524 | metrics = result.setdefault("global_metrics", {}) | |
39ae355f TL |
525 | for fs_name in fs_list: |
526 | if fs_name in meta and len(meta[fs_name]): | |
527 | for client_id, counters in raw_perfs_global.items(): | |
528 | if client_id in meta[fs_name] and client_id in raw_counters_clients: | |
529 | global_client_metrics = (metrics.setdefault(fs_name, {})).setdefault(client_id, []) | |
530 | del global_client_metrics[:] | |
531 | global_client_metrics.extend(counters) | |
f67539c2 TL |
532 | |
533 | # and, now per-mds metrics keyed by mds rank along with delayed ranks | |
f67539c2 TL |
534 | metrics = result.setdefault("metrics", {}) |
535 | ||
39ae355f | 536 | metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]] |
f67539c2 TL |
537 | for rank, counters in raw_perfs.items(): |
538 | mds_key = "mds.{}".format(rank) | |
539 | mds_metrics = metrics.setdefault(mds_key, {}) | |
540 | mds_metrics.update(counters[1]) | |
541 | return result | |
542 | ||
543 | def extract_query_filters(self, cmd): | |
544 | mds_rank_spec = cmd.get('mds_rank', None) | |
545 | client_id_spec = cmd.get('client_id', None) | |
546 | client_ip_spec = cmd.get('client_ip', None) | |
547 | ||
548 | self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format( | |
549 | mds_rank_spec, client_id_spec, client_ip_spec)) | |
550 | ||
551 | mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec) | |
552 | client_id = extract_client_id_from_spec(client_id_spec) | |
553 | client_ip = extract_client_ip_from_spec(client_ip_spec) | |
554 | ||
555 | return FilterSpec(mds_ranks, client_id, client_ip) | |
556 | ||
557 | def get_perf_data(self, cmd): | |
20effc67 TL |
558 | try: |
559 | filter_spec = self.extract_query_filters(cmd) | |
560 | except ValueError as e: | |
561 | return -errno.EINVAL, "", str(e) | |
f67539c2 TL |
562 | |
563 | counters = {} | |
564 | with self.lock: | |
565 | user_query = self.register_query(filter_spec) | |
566 | result = self.generate_report(user_query) | |
567 | return 0, json.dumps(result), "" |