7 from collections
import OrderedDict
8 from typing
import List
, Dict
, Set
10 from mgr_module
import CommandResult
12 from datetime
import datetime
, timedelta
13 from threading
import Lock
, Condition
, Thread
15 PERF_STATS_VERSION
= 1
17 QUERY_IDS
= "query_ids"
18 GLOBAL_QUERY_ID
= "global_query_id"
19 QUERY_LAST_REQUEST
= "last_time_stamp"
20 QUERY_RAW_COUNTERS
= "query_raw_counters"
21 QUERY_RAW_COUNTERS_GLOBAL
= "query_raw_counters_global"
27 MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
= '^(.*)$'
28 MDS_PERF_QUERY_REGEX_MATCH_CLIENTS
= '^(client.{0}\s+{1}):.*'
29 MDS_PERF_QUERY_COUNTERS_MAP
= OrderedDict({'cap_hit': 0,
32 'metadata_latency': 3,
39 MDS_PERF_QUERY_COUNTERS
= [] # type: List[str]
40 MDS_GLOBAL_PERF_QUERY_COUNTERS
= ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes', 'read_io_sizes', 'write_io_sizes'] # type: List[str]
42 QUERY_EXPIRE_INTERVAL
= timedelta(minutes
=1)
44 CLIENT_METADATA_KEY
= "client_metadata"
45 CLIENT_METADATA_SUBKEYS
= ["hostname", "root"]
46 CLIENT_METADATA_SUBKEYS_OPTIONAL
= ["mount_point"]
48 NON_EXISTENT_KEY_STR
= "N/A"
50 class FilterSpec(object):
52 query filters encapsulated and used as key for query map
54 def __init__(self
, mds_ranks
, client_id
, client_ip
):
55 self
.mds_ranks
= mds_ranks
56 self
.client_id
= client_id
57 self
.client_ip
= client_ip
60 return hash((self
.mds_ranks
, self
.client_id
, self
.client_ip
))
62 def __eq__(self
, other
):
63 return (self
.mds_ranks
, self
.client_id
, self
.client_ip
) == (other
.mds_ranks
, other
.client_id
, self
.client_ip
)
65 def __ne__(self
, other
):
66 return not(self
== other
)
68 def extract_mds_ranks_from_spec(mds_rank_spec
):
71 match
= re
.match(r
'^(\d[,\d]*)$', mds_rank_spec
)
73 raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec
))
74 return tuple(int(mds_rank
) for mds_rank
in match
.group(0).split(','))
76 def extract_client_id_from_spec(client_id_spec
):
77 if not client_id_spec
:
79 # the client id is the spec itself since it'll be a part
80 # of client filter regex.
83 def extract_client_ip_from_spec(client_ip_spec
):
84 if not client_ip_spec
:
86 # TODO: validate if it is an ip address (or a subset of it).
87 # the client ip is the spec itself since it'll be a part
88 # of client filter regex.
91 def extract_mds_ranks_from_report(mds_ranks_str
):
94 return [int(x
) for x
in mds_ranks_str
.split(',')]
96 def extract_client_id_and_ip(client
):
97 match
= re
.match(r
'^(client\.\d+)\s(.*)', client
)
99 return match
.group(1), match
.group(2)
102 class FSPerfStats(object):
104 q_cv
= Condition(lock
)
105 r_cv
= Condition(lock
)
107 user_queries
= {} # type: Dict[str, Dict]
116 def __init__(self
, module
):
118 self
.log
= module
.log
119 # report processor thread
120 self
.report_processor
= Thread(target
=self
.run
)
121 self
.report_processor
.start()
123 def set_client_metadata(self
, client_id
, key
, meta
):
124 result
= self
.client_metadata
['metadata'].setdefault(client_id
, {})
125 if not key
in result
or not result
[key
] == meta
:
128 def notify(self
, cmdtag
):
129 self
.log
.debug("cmdtag={0}".format(cmdtag
))
132 result
= self
.client_metadata
['in_progress'].pop(cmdtag
)
134 self
.log
.warn(f
"cmdtag {cmdtag} not found in client metadata")
137 client_meta
= result
[1].wait()
138 if client_meta
[0] != 0:
139 self
.log
.warn("failed to fetch client metadata from rank {0}, err={1}".format(
140 result
[0], client_meta
[2]))
142 self
.log
.debug("notify: client metadata={0}".format(json
.loads(client_meta
[1])))
143 for metadata
in json
.loads(client_meta
[1]):
144 client_id
= "client.{0}".format(metadata
['id'])
145 result
= self
.client_metadata
['metadata'].setdefault(client_id
, {})
146 for subkey
in CLIENT_METADATA_SUBKEYS
:
147 self
.set_client_metadata(client_id
, subkey
, metadata
[CLIENT_METADATA_KEY
][subkey
])
148 for subkey
in CLIENT_METADATA_SUBKEYS_OPTIONAL
:
149 self
.set_client_metadata(client_id
, subkey
,
150 metadata
[CLIENT_METADATA_KEY
].get(subkey
, NON_EXISTENT_KEY_STR
))
151 metric_features
= int(metadata
[CLIENT_METADATA_KEY
]["metric_spec"]["metric_flags"]["feature_bits"], 16)
152 supported_metrics
= [metric
for metric
, bit
in MDS_PERF_QUERY_COUNTERS_MAP
.items() if metric_features
& (1 << bit
)]
153 self
.set_client_metadata(client_id
, "valid_metrics", supported_metrics
)
154 kver
= metadata
[CLIENT_METADATA_KEY
].get("kernel_version", None)
156 self
.set_client_metadata(client_id
, "kernel_version", kver
)
157 # when all async requests are done, purge clients metadata if any.
158 if not self
.client_metadata
['in_progress']:
159 for client
in self
.client_metadata
['to_purge']:
161 self
.log
.info("purge client metadata for {0}".format(client
))
162 self
.client_metadata
['metadata'].remove(client
)
165 self
.client_metadata
['to_purge'].clear()
166 self
.log
.debug("client_metadata={0}, to_purge={1}".format(
167 self
.client_metadata
['metadata'], self
.client_metadata
['to_purge']))
169 def update_client_meta(self
, rank_set
):
171 pending_updates
= [v
[0] for v
in self
.client_metadata
['in_progress'].values()]
173 for rank
in rank_set
:
174 if rank
in pending_updates
:
176 tag
= str(uuid
.uuid4())
177 result
= CommandResult(tag
)
178 new_updates
[tag
] = (rank
, result
)
179 self
.client_metadata
['in_progress'].update(new_updates
)
181 self
.log
.debug("updating client metadata from {0}".format(new_updates
))
183 cmd_dict
= {'prefix': 'client ls'}
184 for tag
,val
in new_updates
.items():
185 self
.module
.send_command(val
[1], "mds", str(val
[0]), json
.dumps(cmd_dict
), tag
)
189 self
.log
.info("FSPerfStats::report_processor starting...")
192 self
.scrub_expired_queries()
193 self
.process_mds_reports()
196 stats_period
= int(self
.module
.get_ceph_option("mgr_stats_period"))
197 self
.q_cv
.wait(stats_period
)
198 self
.log
.debug("FSPerfStats::tick")
199 except Exception as e
:
200 self
.log
.fatal("fatal error: {}".format(traceback
.format_exc()))
202 def cull_mds_entries(self
, raw_perf_counters
, incoming_metrics
, missing_clients
):
203 # this is pretty straight forward -- find what MDSs are missing from
204 # what is tracked vs what we received in incoming report and purge
206 tracked_ranks
= raw_perf_counters
.keys()
207 available_ranks
= [int(counter
['k'][0][0]) for counter
in incoming_metrics
]
208 for rank
in set(tracked_ranks
) - set(available_ranks
):
209 culled
= raw_perf_counters
.pop(rank
)
210 self
.log
.info("culled {0} client entries from rank {1} (laggy: {2})".format(
211 len(culled
[1]), rank
, "yes" if culled
[0] else "no"))
212 missing_clients
.update(list(culled
[1].keys()))
214 def cull_client_entries(self
, raw_perf_counters
, incoming_metrics
, missing_clients
):
215 # this is a bit more involed -- for each rank figure out what clients
216 # are missing in incoming report and purge them from our tracked map.
217 # but, if this is invoked _after_ cull_mds_entries(), the rank set
218 # is same, so we can loop based on that assumption.
219 ranks
= raw_perf_counters
.keys()
221 tracked_clients
= raw_perf_counters
[rank
][1].keys()
222 available_clients
= [extract_client_id_and_ip(counter
['k'][1][0]) for counter
in incoming_metrics
]
223 for client
in set(tracked_clients
) - set([c
[0] for c
in available_clients
if c
[0] is not None]):
224 raw_perf_counters
[rank
][1].pop(client
)
225 self
.log
.info("culled {0} from rank {1}".format(client
, rank
))
226 missing_clients
.add(client
)
228 def cull_missing_entries(self
, raw_perf_counters
, incoming_metrics
):
229 missing_clients
= set() # type: Set[str]
230 self
.cull_mds_entries(raw_perf_counters
, incoming_metrics
, missing_clients
)
231 self
.cull_client_entries(raw_perf_counters
, incoming_metrics
, missing_clients
)
233 self
.log
.debug("missing_clients={0}".format(missing_clients
))
235 if self
.client_metadata
['in_progress']:
236 self
.client_metadata
['to_purge'].update(missing_clients
)
237 self
.log
.info("deferring client metadata purge (now {0} client(s))".format(
238 len(self
.client_metadata
['to_purge'])))
240 for client
in missing_clients
:
242 self
.log
.info("purge client metadata for {0}".format(client
))
243 self
.client_metadata
['metadata'].pop(client
)
246 self
.log
.debug("client_metadata={0}".format(self
.client_metadata
['metadata']))
248 def cull_global_metrics(self
, raw_perf_counters
, incoming_metrics
):
249 tracked_clients
= raw_perf_counters
.keys()
250 available_clients
= [counter
['k'][0][0] for counter
in incoming_metrics
]
251 for client
in set(tracked_clients
) - set(available_clients
):
252 raw_perf_counters
.pop(client
)
254 def get_raw_perf_counters(self
, query
):
255 raw_perf_counters
= query
.setdefault(QUERY_RAW_COUNTERS
, {})
257 for query_id
in query
[QUERY_IDS
]:
258 result
= self
.module
.get_mds_perf_counters(query_id
)
259 self
.log
.debug("raw_perf_counters={}".format(raw_perf_counters
))
260 self
.log
.debug("get_raw_perf_counters={}".format(result
))
262 # extract passed in delayed ranks. metrics for delayed ranks are tagged
264 delayed_ranks
= extract_mds_ranks_from_report(result
['metrics'][0][0])
266 # what's received from MDS
267 incoming_metrics
= result
['metrics'][1]
269 # cull missing MDSs and clients
270 self
.cull_missing_entries(raw_perf_counters
, incoming_metrics
)
272 # iterate over metrics list and update our copy (note that we have
273 # already culled the differences).
274 meta_refresh_ranks
= set()
275 for counter
in incoming_metrics
:
276 mds_rank
= int(counter
['k'][0][0])
277 client_id
, client_ip
= extract_client_id_and_ip(counter
['k'][1][0])
278 if client_id
is not None or not client_ip
: # client_id _could_ be 0
280 if not client_id
in self
.client_metadata
['metadata']:
281 meta_refresh_ranks
.add(mds_rank
)
282 self
.set_client_metadata(client_id
, "IP", client_ip
)
284 self
.log
.warn("client metadata for client_id={0} might be unavailable".format(client_id
))
286 raw_counters
= raw_perf_counters
.setdefault(mds_rank
, [False, {}])
287 raw_counters
[0] = True if mds_rank
in delayed_ranks
else False
288 raw_client_counters
= raw_counters
[1].setdefault(client_id
, [])
290 del raw_client_counters
[:]
291 raw_client_counters
.extend(counter
['c'])
292 # send an asynchronous client metadata refresh
293 self
.update_client_meta(meta_refresh_ranks
)
295 def get_raw_perf_counters_global(self
, query
):
296 raw_perf_counters
= query
.setdefault(QUERY_RAW_COUNTERS_GLOBAL
, {})
297 result
= self
.module
.get_mds_perf_counters(query
[GLOBAL_QUERY_ID
])
299 self
.log
.debug("raw_perf_counters_global={}".format(raw_perf_counters
))
300 self
.log
.debug("get_raw_perf_counters_global={}".format(result
))
302 global_metrics
= result
['metrics'][1]
303 self
.cull_global_metrics(raw_perf_counters
, global_metrics
)
304 for counter
in global_metrics
:
305 client_id
, _
= extract_client_id_and_ip(counter
['k'][0][0])
306 raw_client_counters
= raw_perf_counters
.setdefault(client_id
, [])
307 del raw_client_counters
[:]
308 raw_client_counters
.extend(counter
['c'])
310 def process_mds_reports(self
):
311 for query
in self
.user_queries
.values():
312 self
.get_raw_perf_counters(query
)
313 self
.get_raw_perf_counters_global(query
)
315 def scrub_expired_queries(self
):
316 expire_time
= datetime
.now() - QUERY_EXPIRE_INTERVAL
317 for filter_spec
in list(self
.user_queries
.keys()):
318 user_query
= self
.user_queries
[filter_spec
]
319 self
.log
.debug("scrubbing query={}".format(user_query
))
320 if user_query
[QUERY_LAST_REQUEST
] < expire_time
:
321 expired_query_ids
= user_query
[QUERY_IDS
].copy()
322 expired_query_ids
.append(user_query
[GLOBAL_QUERY_ID
])
323 self
.log
.debug("unregistering query={} ids={}".format(user_query
, expired_query_ids
))
324 self
.unregister_mds_perf_queries(filter_spec
, expired_query_ids
)
325 del self
.user_queries
[filter_spec
]
327 def prepare_mds_perf_query(self
, rank
, client_id
, client_ip
):
328 mds_rank_regex
= MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
330 mds_rank_regex
= '^({})$'.format(rank
)
331 client_regex
= MDS_PERF_QUERY_REGEX_MATCH_CLIENTS
.format(client_id
, client_ip
)
334 {'type' : 'mds_rank', 'regex' : mds_rank_regex
},
335 {'type' : 'client_id', 'regex' : client_regex
},
337 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS
,
340 def prepare_global_perf_query(self
, client_id
, client_ip
):
341 client_regex
= MDS_PERF_QUERY_REGEX_MATCH_CLIENTS
.format(client_id
, client_ip
)
344 {'type' : 'client_id', 'regex' : client_regex
},
346 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS
,
349 def unregister_mds_perf_queries(self
, filter_spec
, query_ids
):
350 self
.log
.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
351 filter_spec
, query_ids
))
352 for query_id
in query_ids
:
353 self
.module
.remove_mds_perf_query(query_id
)
355 def register_mds_perf_query(self
, filter_spec
):
356 mds_ranks
= filter_spec
.mds_ranks
357 client_id
= filter_spec
.client_id
358 client_ip
= filter_spec
.client_ip
362 # register per-mds perf query
363 for rank
in mds_ranks
:
364 query
= self
.prepare_mds_perf_query(rank
, client_id
, client_ip
)
365 self
.log
.info("register_mds_perf_query: {}".format(query
))
367 query_id
= self
.module
.add_mds_perf_query(query
)
368 if query_id
is None: # query id can be 0
369 raise RuntimeError("failed to add MDS perf query: {}".format(query
))
370 query_ids
.append(query_id
)
372 for query_id
in query_ids
:
373 self
.module
.remove_mds_perf_query(query_id
)
377 def register_global_perf_query(self
, filter_spec
):
378 client_id
= filter_spec
.client_id
379 client_ip
= filter_spec
.client_ip
381 # register a global perf query for metrics
382 query
= self
.prepare_global_perf_query(client_id
, client_ip
)
383 self
.log
.info("register_global_perf_query: {}".format(query
))
385 query_id
= self
.module
.add_mds_perf_query(query
)
386 if query_id
is None: # query id can be 0
387 raise RuntimeError("failed to add global perf query: {}".format(query
))
390 def register_query(self
, filter_spec
):
391 user_query
= self
.user_queries
.get(filter_spec
, None)
394 QUERY_IDS
: self
.register_mds_perf_query(filter_spec
),
395 GLOBAL_QUERY_ID
: self
.register_global_perf_query(filter_spec
),
396 QUERY_LAST_REQUEST
: datetime
.now(),
398 self
.user_queries
[filter_spec
] = user_query
403 user_query
[QUERY_LAST_REQUEST
] = datetime
.now()
406 def generate_report(self
, user_query
):
407 result
= {} # type: Dict
408 # start with counter info -- metrics that are global and per mds
409 result
["version"] = PERF_STATS_VERSION
410 result
["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
411 result
["counters"] = MDS_PERF_QUERY_COUNTERS
413 # fill in client metadata
414 raw_perfs
= user_query
.setdefault(QUERY_RAW_COUNTERS_GLOBAL
, {})
416 result_meta
= result
.setdefault("client_metadata", {})
417 for client_id
in raw_perfs
.keys():
418 if client_id
in self
.client_metadata
["metadata"]:
419 client_meta
= result_meta
.setdefault(client_id
, {})
420 client_meta
.update(self
.client_metadata
["metadata"][client_id
])
422 # start populating global perf metrics w/ client metadata
423 metrics
= result
.setdefault("global_metrics", {})
424 for client_id
, counters
in raw_perfs
.items():
425 global_client_metrics
= metrics
.setdefault(client_id
, [])
426 del global_client_metrics
[:]
427 global_client_metrics
.extend(counters
)
429 # and, now per-mds metrics keyed by mds rank along with delayed ranks
430 raw_perfs
= user_query
.setdefault(QUERY_RAW_COUNTERS
, {})
431 metrics
= result
.setdefault("metrics", {})
433 metrics
["delayed_ranks"] = [rank
for rank
,counters
in raw_perfs
.items() if counters
[0]]
434 for rank
, counters
in raw_perfs
.items():
435 mds_key
= "mds.{}".format(rank
)
436 mds_metrics
= metrics
.setdefault(mds_key
, {})
437 mds_metrics
.update(counters
[1])
440 def extract_query_filters(self
, cmd
):
441 mds_rank_spec
= cmd
.get('mds_rank', None)
442 client_id_spec
= cmd
.get('client_id', None)
443 client_ip_spec
= cmd
.get('client_ip', None)
445 self
.log
.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
446 mds_rank_spec
, client_id_spec
, client_ip_spec
))
448 mds_ranks
= extract_mds_ranks_from_spec(mds_rank_spec
)
449 client_id
= extract_client_id_from_spec(client_id_spec
)
450 client_ip
= extract_client_ip_from_spec(client_ip_spec
)
452 return FilterSpec(mds_ranks
, client_id
, client_ip
)
454 def get_perf_data(self
, cmd
):
455 filter_spec
= self
.extract_query_filters(cmd
)
459 user_query
= self
.register_query(filter_spec
)
460 result
= self
.generate_report(user_query
)
461 return 0, json
.dumps(result
), ""