import uuid
import errno
import traceback
+import logging
from collections import OrderedDict
from typing import List, Dict, Set
from mgr_module import CommandResult
from datetime import datetime, timedelta
-from threading import Lock, Condition, Thread
+from threading import Lock, Condition, Thread, Timer
from ipaddress import ip_address
PERF_STATS_VERSION = 1
MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes', 'read_io_sizes', 'write_io_sizes'] # type: List[str]
QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+REREGISTER_TIMER_INTERVAL = 1
CLIENT_METADATA_KEY = "client_metadata"
CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
NON_EXISTENT_KEY_STR = "N/A"
+logger = logging.getLogger(__name__)
+
class FilterSpec(object):
"""
query filters encapsulated and used as key for query map
user_queries = {} # type: Dict[str, Dict]
meta_lock = Lock()
+ rqtimer = None
client_metadata = {
'metadata' : {},
'to_purge' : set(),
def __init__(self, module):
self.module = module
self.log = module.log
+ self.prev_rank0_gid = None
# report processor thread
self.report_processor = Thread(target=self.run)
self.report_processor.start()
if not key in result or not result[key] == meta:
result[key] = meta
- def notify(self, cmdtag):
+ def notify_cmd(self, cmdtag):
self.log.debug("cmdtag={0}".format(cmdtag))
with self.meta_lock:
try:
self.log.debug("client_metadata={0}, to_purge={1}".format(
self.client_metadata['metadata'], self.client_metadata['to_purge']))
- def update_client_meta(self, rank_set):
+ def notify_fsmap(self):
+ #Reregister the user queries when there is a new rank0 mds
+ with self.lock:
+ gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map'))
+ if not gid_state:
+ return
+ rank0_gid, state = gid_state
+ if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'):
+ #the new rank0 MDS is up:active
+ ua_last_updated = time.monotonic()
+ if (self.rqtimer and self.rqtimer.is_alive()):
+ self.rqtimer.cancel()
+ self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+ self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+ self.rqtimer.start()
+
+ def re_register_queries(self, rank0_gid, ua_last_updated):
+ #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
+ #wait for the empty metrics
+ with self.lock:
+ if self.mx_last_updated >= ua_last_updated:
+ self.log.debug("reregistering queries...")
+ self.module.reregister_mds_perf_queries()
+ self.prev_rank0_gid = rank0_gid
+ else:
+ #reschedule the timer
+ self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+ self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+ self.rqtimer.start()
+
+ @staticmethod
+ def get_rank0_mds_gid_state(fsmap):
+ for fs in fsmap['filesystems']:
+ mds_map = fs['mdsmap']
+ if mds_map is not None:
+ for mds_id, mds_status in mds_map['info'].items():
+ if mds_status['rank'] == 0:
+ return mds_status['gid'], mds_status['state']
+ logger.warn("No rank0 mds in the fsmap")
+
+ def update_client_meta(self):
new_updates = {}
pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
with self.meta_lock:
- for rank in rank_set:
- if rank in pending_updates:
- continue
+ fsmap = self.module.get('fs_map')
+ for fs in fsmap['filesystems']:
+ mdsmap = fs['mdsmap']
+ gid = mdsmap['up']["mds_0"]
+ if gid in pending_updates:
+ continue
tag = str(uuid.uuid4())
result = CommandResult(tag)
- new_updates[tag] = (rank, result)
+ new_updates[tag] = (gid, result)
self.client_metadata['in_progress'].update(new_updates)
- self.log.debug("updating client metadata from {0}".format(new_updates))
+ self.log.debug(f"updating client metadata from {new_updates}")
cmd_dict = {'prefix': 'client ls'}
for tag,val in new_updates.items():
# what's received from MDS
incoming_metrics = result['metrics'][1]
+ # metrics updated (monotonic) time
+ self.mx_last_updated = result['metrics'][2][0]
+
# cull missing MDSs and clients
self.cull_missing_entries(raw_perf_counters, incoming_metrics)
# iterate over metrics list and update our copy (note that we have
# already culled the differences).
- meta_refresh_ranks = set()
for counter in incoming_metrics:
mds_rank = int(counter['k'][0][0])
client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
if client_id is not None or not client_ip: # client_id _could_ be 0
with self.meta_lock:
- if not client_id in self.client_metadata['metadata']:
- meta_refresh_ranks.add(mds_rank)
self.set_client_metadata(client_id, "IP", client_ip)
else:
self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
del raw_client_counters[:]
raw_client_counters.extend(counter['c'])
# send an asynchronous client metadata refresh
- self.update_client_meta(meta_refresh_ranks)
+ self.update_client_meta()
def get_raw_perf_counters_global(self, query):
raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})