]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/stats/fs/perf_stats.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / stats / fs / perf_stats.py
index 13d2adc8fc1dac596798d355d5ba2187282fabd3..089479cdec6e886d8172bae1874d71c7a3a96f7e 100644 (file)
@@ -4,13 +4,14 @@ import time
 import uuid
 import errno
 import traceback
+import logging
 from collections import OrderedDict
 from typing import List, Dict, Set
 
 from mgr_module import CommandResult
 
 from datetime import datetime, timedelta
-from threading import Lock, Condition, Thread
+from threading import Lock, Condition, Thread, Timer
 from ipaddress import ip_address
 
 PERF_STATS_VERSION = 1
@@ -41,6 +42,7 @@ MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
 MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes', 'read_io_sizes', 'write_io_sizes'] # type: List[str]
 
 QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+REREGISTER_TIMER_INTERVAL = 1
 
 CLIENT_METADATA_KEY = "client_metadata"
 CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
@@ -48,6 +50,8 @@ CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
 
 NON_EXISTENT_KEY_STR = "N/A"
 
+logger = logging.getLogger(__name__)
+
 class FilterSpec(object):
     """
     query filters encapsulated and used as key for query map
@@ -118,6 +122,7 @@ class FSPerfStats(object):
     user_queries = {} # type: Dict[str, Dict]
 
     meta_lock = Lock()
+    rqtimer = None
     client_metadata = {
         'metadata' : {},
         'to_purge' : set(),
@@ -127,6 +132,7 @@ class FSPerfStats(object):
     def __init__(self, module):
         self.module = module
         self.log = module.log
+        self.prev_rank0_gid = None
         # report processor thread
         self.report_processor = Thread(target=self.run)
         self.report_processor.start()
@@ -136,7 +142,7 @@ class FSPerfStats(object):
         if not key in result or not result[key] == meta:
             result[key] = meta
 
-    def notify(self, cmdtag):
+    def notify_cmd(self, cmdtag):
         self.log.debug("cmdtag={0}".format(cmdtag))
         with self.meta_lock:
             try:
@@ -177,19 +183,62 @@ class FSPerfStats(object):
             self.log.debug("client_metadata={0}, to_purge={1}".format(
                 self.client_metadata['metadata'], self.client_metadata['to_purge']))
 
-    def update_client_meta(self, rank_set):
+    def notify_fsmap(self):
+        #Reregister the user queries when there is a new rank0 mds
+        with self.lock:
+            gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map'))
+            if not gid_state:
+                return
+            rank0_gid, state = gid_state
+            if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'):
+                #the new rank0 MDS is up:active
+                ua_last_updated = time.monotonic()
+                if (self.rqtimer and self.rqtimer.is_alive()):
+                    self.rqtimer.cancel()
+                self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+                                     self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+                self.rqtimer.start()
+
+    def re_register_queries(self, rank0_gid, ua_last_updated):
+        #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
+        #wait for the empty metrics
+        with self.lock:
+            if self.mx_last_updated >= ua_last_updated:
+                self.log.debug("reregistering queries...")
+                self.module.reregister_mds_perf_queries()
+                self.prev_rank0_gid = rank0_gid
+            else:
+                #reschedule the timer
+                self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+                                     self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+                self.rqtimer.start()
+
+    @staticmethod
+    def get_rank0_mds_gid_state(fsmap):
+        for fs in fsmap['filesystems']:
+            mds_map = fs['mdsmap']
+            if mds_map is not None:
+                for mds_id, mds_status in mds_map['info'].items():
+                    if mds_status['rank'] == 0:
+                        return mds_status['gid'], mds_status['state']
+        logger.warn("No rank0 mds in the fsmap")
+
+    def update_client_meta(self):
         new_updates = {}
         pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
         with self.meta_lock:
-            for rank in rank_set:
-                if rank in pending_updates:
-                    continue
+            fsmap = self.module.get('fs_map')
+            for fs in fsmap['filesystems']:
+                mdsmap = fs['mdsmap']                
+                gid = mdsmap['up']["mds_0"]
+                if gid in pending_updates:
+                    continue                
                 tag = str(uuid.uuid4())
                 result = CommandResult(tag)
-                new_updates[tag] = (rank, result)
+                new_updates[tag] = (gid, result)
             self.client_metadata['in_progress'].update(new_updates)
 
-        self.log.debug("updating client metadata from {0}".format(new_updates))
+        self.log.debug(f"updating client metadata from {new_updates}")
 
         cmd_dict = {'prefix': 'client ls'}
         for tag,val in new_updates.items():
@@ -277,19 +326,19 @@ class FSPerfStats(object):
             # what's received from MDS
             incoming_metrics = result['metrics'][1]
 
+            # metrics updated (monotonic) time
+            self.mx_last_updated = result['metrics'][2][0]
+
             # cull missing MDSs and clients
             self.cull_missing_entries(raw_perf_counters, incoming_metrics)
 
             # iterate over metrics list and update our copy (note that we have
             # already culled the differences).
-            meta_refresh_ranks = set()
             for counter in incoming_metrics:
                 mds_rank = int(counter['k'][0][0])
                 client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
                 if client_id is not None or not client_ip: # client_id _could_ be 0
                     with self.meta_lock:
-                        if not client_id in self.client_metadata['metadata']:
-                            meta_refresh_ranks.add(mds_rank)
                         self.set_client_metadata(client_id, "IP", client_ip)
                 else:
                     self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
@@ -301,7 +350,7 @@ class FSPerfStats(object):
                 del raw_client_counters[:]
                 raw_client_counters.extend(counter['c'])
         # send an asynchronous client metadata refresh
-        self.update_client_meta(meta_refresh_ranks)
+        self.update_client_meta()
 
     def get_raw_perf_counters_global(self, query):
         raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})