8 from collections
import OrderedDict
9 from typing
import List
, Dict
, Set
11 from mgr_module
import CommandResult
13 from datetime
import datetime
, timedelta
14 from threading
import Lock
, Condition
, Thread
, Timer
15 from ipaddress
import ip_address
17 PERF_STATS_VERSION
= 1
19 QUERY_IDS
= "query_ids"
20 GLOBAL_QUERY_ID
= "global_query_id"
21 QUERY_LAST_REQUEST
= "last_time_stamp"
22 QUERY_RAW_COUNTERS
= "query_raw_counters"
23 QUERY_RAW_COUNTERS_GLOBAL
= "query_raw_counters_global"
29 MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
= '^(.*)$'
30 MDS_PERF_QUERY_REGEX_MATCH_CLIENTS
= '^(client.{0}\s+{1}):.*'
31 MDS_PERF_QUERY_COUNTERS_MAP
= OrderedDict({'cap_hit': 0,
34 'metadata_latency': 3,
41 'avg_read_latency': 10,
42 'stdev_read_latency': 11,
43 'avg_write_latency': 12,
44 'stdev_write_latency': 13,
45 'avg_metadata_latency': 14,
46 'stdev_metadata_latency': 15})
47 MDS_PERF_QUERY_COUNTERS
= [] # type: List[str]
48 MDS_GLOBAL_PERF_QUERY_COUNTERS
= list(MDS_PERF_QUERY_COUNTERS_MAP
.keys())
50 QUERY_EXPIRE_INTERVAL
= timedelta(minutes
=1)
51 REREGISTER_TIMER_INTERVAL
= 1
53 CLIENT_METADATA_KEY
= "client_metadata"
54 CLIENT_METADATA_SUBKEYS
= ["hostname", "root"]
55 CLIENT_METADATA_SUBKEYS_OPTIONAL
= ["mount_point"]
57 NON_EXISTENT_KEY_STR
= "N/A"
59 logger
= logging
.getLogger(__name__
)
61 class FilterSpec(object):
63 query filters encapsulated and used as key for query map
65 def __init__(self
, mds_ranks
, client_id
, client_ip
):
66 self
.mds_ranks
= mds_ranks
67 self
.client_id
= client_id
68 self
.client_ip
= client_ip
71 return hash((self
.mds_ranks
, self
.client_id
, self
.client_ip
))
73 def __eq__(self
, other
):
74 return (self
.mds_ranks
, self
.client_id
, self
.client_ip
) == (other
.mds_ranks
, other
.client_id
, self
.client_ip
)
76 def __ne__(self
, other
):
77 return not(self
== other
)
79 def extract_mds_ranks_from_spec(mds_rank_spec
):
82 match
= re
.match(r
'^\d+(,\d+)*$', mds_rank_spec
)
84 raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec
))
85 return tuple(int(mds_rank
) for mds_rank
in match
.group(0).split(','))
87 def extract_client_id_from_spec(client_id_spec
):
88 if not client_id_spec
:
90 # the client id is the spec itself since it'll be a part
91 # of client filter regex.
92 if not client_id_spec
.isdigit():
93 raise ValueError('invalid client_id filter spec: {}'.format(client_id_spec
))
96 def extract_client_ip_from_spec(client_ip_spec
):
97 if not client_ip_spec
:
100 client_ip
= client_ip_spec
101 if client_ip
.startswith('v1:'):
102 client_ip
= client_ip
.replace('v1:', '')
103 elif client_ip
.startswith('v2:'):
104 client_ip
= client_ip
.replace('v2:', '')
107 ip_address(client_ip
)
108 return client_ip_spec
110 raise ValueError('invalid client_ip filter spec: {}'.format(client_ip_spec
))
112 def extract_mds_ranks_from_report(mds_ranks_str
):
113 if not mds_ranks_str
:
115 return [int(x
) for x
in mds_ranks_str
.split(',')]
117 def extract_client_id_and_ip(client
):
118 match
= re
.match(r
'^(client\.\d+)\s(.*)', client
)
120 return match
.group(1), match
.group(2)
123 class FSPerfStats(object):
125 q_cv
= Condition(lock
)
126 r_cv
= Condition(lock
)
128 user_queries
= {} # type: Dict[str, Dict]
138 def __init__(self
, module
):
140 self
.log
= module
.log
141 self
.prev_rank0_gid
= None
142 # report processor thread
143 self
.report_processor
= Thread(target
=self
.run
)
144 self
.report_processor
.start()
146 def set_client_metadata(self
, client_id
, key
, meta
):
147 result
= self
.client_metadata
['metadata'].setdefault(client_id
, {})
148 if not key
in result
or not result
[key
] == meta
:
151 def notify_cmd(self
, cmdtag
):
152 self
.log
.debug("cmdtag={0}".format(cmdtag
))
155 result
= self
.client_metadata
['in_progress'].pop(cmdtag
)
157 self
.log
.warn(f
"cmdtag {cmdtag} not found in client metadata")
160 client_meta
= result
[1].wait()
161 if client_meta
[0] != 0:
162 self
.log
.warn("failed to fetch client metadata from rank {0}, err={1}".format(
163 result
[0], client_meta
[2]))
165 self
.log
.debug("notify: client metadata={0}".format(json
.loads(client_meta
[1])))
166 for metadata
in json
.loads(client_meta
[1]):
167 client_id
= "client.{0}".format(metadata
['id'])
168 result
= self
.client_metadata
['metadata'].setdefault(client_id
, {})
169 for subkey
in CLIENT_METADATA_SUBKEYS
:
170 self
.set_client_metadata(client_id
, subkey
, metadata
[CLIENT_METADATA_KEY
][subkey
])
171 for subkey
in CLIENT_METADATA_SUBKEYS_OPTIONAL
:
172 self
.set_client_metadata(client_id
, subkey
,
173 metadata
[CLIENT_METADATA_KEY
].get(subkey
, NON_EXISTENT_KEY_STR
))
174 metric_features
= int(metadata
[CLIENT_METADATA_KEY
]["metric_spec"]["metric_flags"]["feature_bits"], 16)
175 supported_metrics
= [metric
for metric
, bit
in MDS_PERF_QUERY_COUNTERS_MAP
.items() if metric_features
& (1 << bit
)]
176 self
.set_client_metadata(client_id
, "valid_metrics", supported_metrics
)
177 kver
= metadata
[CLIENT_METADATA_KEY
].get("kernel_version", None)
179 self
.set_client_metadata(client_id
, "kernel_version", kver
)
180 # when all async requests are done, purge clients metadata if any.
181 if not self
.client_metadata
['in_progress']:
182 for client
in self
.client_metadata
['to_purge']:
184 self
.log
.info("purge client metadata for {0}".format(client
))
185 self
.client_metadata
['metadata'].remove(client
)
188 self
.client_metadata
['to_purge'].clear()
189 self
.log
.debug("client_metadata={0}, to_purge={1}".format(
190 self
.client_metadata
['metadata'], self
.client_metadata
['to_purge']))
192 def notify_fsmap(self
):
193 #Reregister the user queries when there is a new rank0 mds
195 gid_state
= FSPerfStats
.get_rank0_mds_gid_state(self
.module
.get('fs_map'))
198 rank0_gid
, state
= gid_state
199 if (rank0_gid
and rank0_gid
!= self
.prev_rank0_gid
and state
== 'up:active'):
200 #the new rank0 MDS is up:active
201 ua_last_updated
= time
.monotonic()
202 if (self
.rqtimer
and self
.rqtimer
.is_alive()):
203 self
.rqtimer
.cancel()
204 self
.rqtimer
= Timer(REREGISTER_TIMER_INTERVAL
,
205 self
.re_register_queries
, args
=(rank0_gid
, ua_last_updated
,))
208 def re_register_queries(self
, rank0_gid
, ua_last_updated
):
209 #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
210 #wait for the empty metrics
212 if self
.mx_last_updated
>= ua_last_updated
:
213 self
.log
.debug("reregistering queries...")
214 self
.module
.reregister_mds_perf_queries()
215 self
.prev_rank0_gid
= rank0_gid
217 #reschedule the timer
218 self
.rqtimer
= Timer(REREGISTER_TIMER_INTERVAL
,
219 self
.re_register_queries
, args
=(rank0_gid
, ua_last_updated
,))
223 def get_rank0_mds_gid_state(fsmap
):
224 for fs
in fsmap
['filesystems']:
225 mds_map
= fs
['mdsmap']
226 if mds_map
is not None:
227 for mds_id
, mds_status
in mds_map
['info'].items():
228 if mds_status
['rank'] == 0:
229 return mds_status
['gid'], mds_status
['state']
230 logger
.warn("No rank0 mds in the fsmap")
232 def update_client_meta(self
):
234 pending_updates
= [v
[0] for v
in self
.client_metadata
['in_progress'].values()]
236 fsmap
= self
.module
.get('fs_map')
237 for fs
in fsmap
['filesystems']:
238 mdsmap
= fs
['mdsmap']
239 gid
= mdsmap
['up']["mds_0"]
240 if gid
in pending_updates
:
242 tag
= str(uuid
.uuid4())
243 result
= CommandResult(tag
)
244 new_updates
[tag
] = (gid
, result
)
245 self
.client_metadata
['in_progress'].update(new_updates
)
247 self
.log
.debug(f
"updating client metadata from {new_updates}")
249 cmd_dict
= {'prefix': 'client ls'}
250 for tag
,val
in new_updates
.items():
251 self
.module
.send_command(val
[1], "mds", str(val
[0]), json
.dumps(cmd_dict
), tag
)
255 self
.log
.info("FSPerfStats::report_processor starting...")
258 self
.scrub_expired_queries()
259 self
.process_mds_reports()
262 stats_period
= int(self
.module
.get_ceph_option("mgr_stats_period"))
263 self
.q_cv
.wait(stats_period
)
264 self
.log
.debug("FSPerfStats::tick")
265 except Exception as e
:
266 self
.log
.fatal("fatal error: {}".format(traceback
.format_exc()))
268 def cull_mds_entries(self
, raw_perf_counters
, incoming_metrics
, missing_clients
):
269 # this is pretty straight forward -- find what MDSs are missing from
270 # what is tracked vs what we received in incoming report and purge
272 tracked_ranks
= raw_perf_counters
.keys()
273 available_ranks
= [int(counter
['k'][0][0]) for counter
in incoming_metrics
]
274 for rank
in set(tracked_ranks
) - set(available_ranks
):
275 culled
= raw_perf_counters
.pop(rank
)
276 self
.log
.info("culled {0} client entries from rank {1} (laggy: {2})".format(
277 len(culled
[1]), rank
, "yes" if culled
[0] else "no"))
278 missing_clients
.update(list(culled
[1].keys()))
280 def cull_client_entries(self
, raw_perf_counters
, incoming_metrics
, missing_clients
):
281 # this is a bit more involed -- for each rank figure out what clients
282 # are missing in incoming report and purge them from our tracked map.
283 # but, if this is invoked _after_ cull_mds_entries(), the rank set
284 # is same, so we can loop based on that assumption.
285 ranks
= raw_perf_counters
.keys()
287 tracked_clients
= raw_perf_counters
[rank
][1].keys()
288 available_clients
= [extract_client_id_and_ip(counter
['k'][1][0]) for counter
in incoming_metrics
]
289 for client
in set(tracked_clients
) - set([c
[0] for c
in available_clients
if c
[0] is not None]):
290 raw_perf_counters
[rank
][1].pop(client
)
291 self
.log
.info("culled {0} from rank {1}".format(client
, rank
))
292 missing_clients
.add(client
)
294 def cull_missing_entries(self
, raw_perf_counters
, incoming_metrics
):
295 missing_clients
= set() # type: Set[str]
296 self
.cull_mds_entries(raw_perf_counters
, incoming_metrics
, missing_clients
)
297 self
.cull_client_entries(raw_perf_counters
, incoming_metrics
, missing_clients
)
299 self
.log
.debug("missing_clients={0}".format(missing_clients
))
301 if self
.client_metadata
['in_progress']:
302 self
.client_metadata
['to_purge'].update(missing_clients
)
303 self
.log
.info("deferring client metadata purge (now {0} client(s))".format(
304 len(self
.client_metadata
['to_purge'])))
306 for client
in missing_clients
:
308 self
.log
.info("purge client metadata for {0}".format(client
))
309 self
.client_metadata
['metadata'].pop(client
)
312 self
.log
.debug("client_metadata={0}".format(self
.client_metadata
['metadata']))
314 def cull_global_metrics(self
, raw_perf_counters
, incoming_metrics
):
315 tracked_clients
= raw_perf_counters
.keys()
316 available_clients
= [counter
['k'][0][0] for counter
in incoming_metrics
]
317 for client
in set(tracked_clients
) - set(available_clients
):
318 raw_perf_counters
.pop(client
)
320 def get_raw_perf_counters(self
, query
):
321 raw_perf_counters
= query
.setdefault(QUERY_RAW_COUNTERS
, {})
323 for query_id
in query
[QUERY_IDS
]:
324 result
= self
.module
.get_mds_perf_counters(query_id
)
325 self
.log
.debug("raw_perf_counters={}".format(raw_perf_counters
))
326 self
.log
.debug("get_raw_perf_counters={}".format(result
))
328 # extract passed in delayed ranks. metrics for delayed ranks are tagged
330 delayed_ranks
= extract_mds_ranks_from_report(result
['metrics'][0][0])
332 # what's received from MDS
333 incoming_metrics
= result
['metrics'][1]
335 # metrics updated (monotonic) time
336 self
.mx_last_updated
= result
['metrics'][2][0]
338 # cull missing MDSs and clients
339 self
.cull_missing_entries(raw_perf_counters
, incoming_metrics
)
341 # iterate over metrics list and update our copy (note that we have
342 # already culled the differences).
343 for counter
in incoming_metrics
:
344 mds_rank
= int(counter
['k'][0][0])
345 client_id
, client_ip
= extract_client_id_and_ip(counter
['k'][1][0])
346 if client_id
is not None or not client_ip
: # client_id _could_ be 0
348 self
.set_client_metadata(client_id
, "IP", client_ip
)
350 self
.log
.warn("client metadata for client_id={0} might be unavailable".format(client_id
))
352 raw_counters
= raw_perf_counters
.setdefault(mds_rank
, [False, {}])
353 raw_counters
[0] = True if mds_rank
in delayed_ranks
else False
354 raw_client_counters
= raw_counters
[1].setdefault(client_id
, [])
356 del raw_client_counters
[:]
357 raw_client_counters
.extend(counter
['c'])
358 # send an asynchronous client metadata refresh
359 self
.update_client_meta()
361 def get_raw_perf_counters_global(self
, query
):
362 raw_perf_counters
= query
.setdefault(QUERY_RAW_COUNTERS_GLOBAL
, {})
363 result
= self
.module
.get_mds_perf_counters(query
[GLOBAL_QUERY_ID
])
365 self
.log
.debug("raw_perf_counters_global={}".format(raw_perf_counters
))
366 self
.log
.debug("get_raw_perf_counters_global={}".format(result
))
368 global_metrics
= result
['metrics'][1]
369 self
.cull_global_metrics(raw_perf_counters
, global_metrics
)
370 for counter
in global_metrics
:
371 client_id
, _
= extract_client_id_and_ip(counter
['k'][0][0])
372 raw_client_counters
= raw_perf_counters
.setdefault(client_id
, [])
373 del raw_client_counters
[:]
374 raw_client_counters
.extend(counter
['c'])
376 def process_mds_reports(self
):
377 for query
in self
.user_queries
.values():
378 self
.get_raw_perf_counters(query
)
379 self
.get_raw_perf_counters_global(query
)
381 def scrub_expired_queries(self
):
382 expire_time
= datetime
.now() - QUERY_EXPIRE_INTERVAL
383 for filter_spec
in list(self
.user_queries
.keys()):
384 user_query
= self
.user_queries
[filter_spec
]
385 self
.log
.debug("scrubbing query={}".format(user_query
))
386 if user_query
[QUERY_LAST_REQUEST
] < expire_time
:
387 expired_query_ids
= user_query
[QUERY_IDS
].copy()
388 expired_query_ids
.append(user_query
[GLOBAL_QUERY_ID
])
389 self
.log
.debug("unregistering query={} ids={}".format(user_query
, expired_query_ids
))
390 self
.unregister_mds_perf_queries(filter_spec
, expired_query_ids
)
391 del self
.user_queries
[filter_spec
]
393 def prepare_mds_perf_query(self
, rank
, client_id
, client_ip
):
394 mds_rank_regex
= MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
396 mds_rank_regex
= '^({})$'.format(rank
)
397 client_regex
= MDS_PERF_QUERY_REGEX_MATCH_CLIENTS
.format(client_id
, client_ip
)
400 {'type' : 'mds_rank', 'regex' : mds_rank_regex
},
401 {'type' : 'client_id', 'regex' : client_regex
},
403 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS
,
406 def prepare_global_perf_query(self
, client_id
, client_ip
):
407 client_regex
= MDS_PERF_QUERY_REGEX_MATCH_CLIENTS
.format(client_id
, client_ip
)
410 {'type' : 'client_id', 'regex' : client_regex
},
412 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS
,
415 def unregister_mds_perf_queries(self
, filter_spec
, query_ids
):
416 self
.log
.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
417 filter_spec
, query_ids
))
418 for query_id
in query_ids
:
419 self
.module
.remove_mds_perf_query(query_id
)
421 def register_mds_perf_query(self
, filter_spec
):
422 mds_ranks
= filter_spec
.mds_ranks
423 client_id
= filter_spec
.client_id
424 client_ip
= filter_spec
.client_ip
428 # register per-mds perf query
429 for rank
in mds_ranks
:
430 query
= self
.prepare_mds_perf_query(rank
, client_id
, client_ip
)
431 self
.log
.info("register_mds_perf_query: {}".format(query
))
433 query_id
= self
.module
.add_mds_perf_query(query
)
434 if query_id
is None: # query id can be 0
435 raise RuntimeError("failed to add MDS perf query: {}".format(query
))
436 query_ids
.append(query_id
)
438 for query_id
in query_ids
:
439 self
.module
.remove_mds_perf_query(query_id
)
443 def register_global_perf_query(self
, filter_spec
):
444 client_id
= filter_spec
.client_id
445 client_ip
= filter_spec
.client_ip
447 # register a global perf query for metrics
448 query
= self
.prepare_global_perf_query(client_id
, client_ip
)
449 self
.log
.info("register_global_perf_query: {}".format(query
))
451 query_id
= self
.module
.add_mds_perf_query(query
)
452 if query_id
is None: # query id can be 0
453 raise RuntimeError("failed to add global perf query: {}".format(query
))
456 def register_query(self
, filter_spec
):
457 user_query
= self
.user_queries
.get(filter_spec
, None)
460 QUERY_IDS
: self
.register_mds_perf_query(filter_spec
),
461 GLOBAL_QUERY_ID
: self
.register_global_perf_query(filter_spec
),
462 QUERY_LAST_REQUEST
: datetime
.now(),
464 self
.user_queries
[filter_spec
] = user_query
469 user_query
[QUERY_LAST_REQUEST
] = datetime
.now()
472 def generate_report(self
, user_query
):
473 result
= {} # type: Dict
474 # start with counter info -- metrics that are global and per mds
475 result
["version"] = PERF_STATS_VERSION
476 result
["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
477 result
["counters"] = MDS_PERF_QUERY_COUNTERS
479 # fill in client metadata
480 raw_perfs
= user_query
.setdefault(QUERY_RAW_COUNTERS_GLOBAL
, {})
482 result_meta
= result
.setdefault("client_metadata", {})
483 for client_id
in raw_perfs
.keys():
484 if client_id
in self
.client_metadata
["metadata"]:
485 client_meta
= result_meta
.setdefault(client_id
, {})
486 client_meta
.update(self
.client_metadata
["metadata"][client_id
])
488 # start populating global perf metrics w/ client metadata
489 metrics
= result
.setdefault("global_metrics", {})
490 for client_id
, counters
in raw_perfs
.items():
491 global_client_metrics
= metrics
.setdefault(client_id
, [])
492 del global_client_metrics
[:]
493 global_client_metrics
.extend(counters
)
495 # and, now per-mds metrics keyed by mds rank along with delayed ranks
496 raw_perfs
= user_query
.setdefault(QUERY_RAW_COUNTERS
, {})
497 metrics
= result
.setdefault("metrics", {})
499 metrics
["delayed_ranks"] = [rank
for rank
,counters
in raw_perfs
.items() if counters
[0]]
500 for rank
, counters
in raw_perfs
.items():
501 mds_key
= "mds.{}".format(rank
)
502 mds_metrics
= metrics
.setdefault(mds_key
, {})
503 mds_metrics
.update(counters
[1])
506 def extract_query_filters(self
, cmd
):
507 mds_rank_spec
= cmd
.get('mds_rank', None)
508 client_id_spec
= cmd
.get('client_id', None)
509 client_ip_spec
= cmd
.get('client_ip', None)
511 self
.log
.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
512 mds_rank_spec
, client_id_spec
, client_ip_spec
))
514 mds_ranks
= extract_mds_ranks_from_spec(mds_rank_spec
)
515 client_id
= extract_client_id_from_spec(client_id_spec
)
516 client_ip
= extract_client_ip_from_spec(client_ip_spec
)
518 return FilterSpec(mds_ranks
, client_id
, client_ip
)
520 def get_perf_data(self
, cmd
):
522 filter_spec
= self
.extract_query_filters(cmd
)
523 except ValueError as e
:
524 return -errno
.EINVAL
, "", str(e
)
528 user_query
= self
.register_query(filter_spec
)
529 result
= self
.generate_report(user_query
)
530 return 0, json
.dumps(result
), ""