]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/telemetry/module.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
index ea97df47741baaff41541cff47e89848e563a018..779cd529b32a749a870236ad14272c9c4941ba5b 100644 (file)
@@ -69,6 +69,7 @@ class Collection(str, enum.Enum):
     basic_pool_usage = 'basic_pool_usage'
     basic_usage_by_class = 'basic_usage_by_class'
     basic_rook_v01 = 'basic_rook_v01'
+    perf_memory_metrics = 'perf_memory_metrics'
 
 MODULE_COLLECTION : List[Dict] = [
     {
@@ -125,6 +126,12 @@ MODULE_COLLECTION : List[Dict] = [
         "channel": "basic",
         "nag": True
     },
+    {
+        "name": Collection.perf_memory_metrics,
+        "description": "Heap stats and mempools for mon and mds",
+        "channel": "perf",
+        "nag": False
+    },
 ]
 
 ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
@@ -467,85 +474,133 @@ class Module(MgrModule):
         return  etype + '.' + m.hexdigest()
 
     def get_heap_stats(self) -> Dict[str, dict]:
-        # Initialize result dict
-        result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
-
-        # Get list of osd ids from the metadata
-        osd_metadata = self.get('osd_metadata')
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
 
-        # Grab output from the "osd.x heap stats" command
-        for osd_id in osd_metadata:
-            cmd_dict = {
-                'prefix': 'heap',
-                'heapcmd': 'stats',
-                'id': str(osd_id),
-            }
-            r, outb, outs = self.osd_command(cmd_dict)
-            if r != 0:
-                self.log.debug("Invalid command dictionary.")
-                continue
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (1/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
+
+        # Grab output from the "daemon.x heap stats" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.')
+            heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+            if heap_stats:
+                if (daemon_type != 'osd'):
+                    # Anonymize mon and mds
+                    anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                    daemon = anonymized_daemons[daemon]
+                result[daemon_type][daemon] = heap_stats
             else:
-                if 'tcmalloc heap stats' in outs:
-                    values = [int(i) for i in outs.split() if i.isdigit()]
-                    # `categories` must be ordered this way for the correct output to be parsed
-                    categories = ['use_by_application',
-                                  'page_heap_freelist',
-                                  'central_cache_freelist',
-                                  'transfer_cache_freelist',
-                                  'thread_cache_freelists',
-                                  'malloc_metadata',
-                                  'actual_memory_used',
-                                  'released_to_os',
-                                  'virtual_address_space_used',
-                                  'spans_in_use',
-                                  'thread_heaps_in_use',
-                                  'tcmalloc_page_size']
-                    if len(values) != len(categories):
-                        self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
-                                'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
-                        continue
-                    osd = 'osd.' + str(osd_id)
-                    result[osd] = dict(zip(categories, values))
-                else:
-                    self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
-                    continue
+                continue
 
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
         return result
 
+    def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+        parsed_output = {}
+
+        cmd_dict = {
+            'prefix': 'heap',
+            'heapcmd': 'stats'
+        }
+        r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+        if r != 0:
+            self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+        else:
+            if 'tcmalloc heap stats' in outb:
+                values = [int(i) for i in outb.split() if i.isdigit()]
+                # `categories` must be ordered this way for the correct output to be parsed
+                categories = ['use_by_application',
+                              'page_heap_freelist',
+                              'central_cache_freelist',
+                              'transfer_cache_freelist',
+                              'thread_cache_freelists',
+                              'malloc_metadata',
+                              'actual_memory_used',
+                              'released_to_os',
+                              'virtual_address_space_used',
+                              'spans_in_use',
+                              'thread_heaps_in_use',
+                              'tcmalloc_page_size']
+                if len(values) != len(categories):
+                    self.log.error('Received unexpected output from {}.{}; ' \
+                                   'number of values should match the number' \
+                                   'of expected categories:\n values: len={} {} '\
+                                   '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+                else:
+                    parsed_output = dict(zip(categories, values))
+            else:
+                self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+        
+        return parsed_output
+
     def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
-        # Initialize result dict
-        result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
 
-        # Get list of osd ids from the metadata
-        osd_metadata = self.get('osd_metadata')
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (2/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
 
-        # Grab output from the "osd.x dump_mempools" command
-        for osd_id in osd_metadata:
+        # Grab output from the "dump_mempools" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.')
             cmd_dict = {
                 'prefix': 'dump_mempools',
-                'id': str(osd_id),
                 'format': 'json'
             }
-            r, outb, outs = self.osd_command(cmd_dict)
+            r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
             if r != 0:
-                self.log.debug("Invalid command dictionary.")
+                self.log.error("Invalid command dictionary: {}".format(cmd_dict))
                 continue
             else:
                 try:
                     # This is where the mempool will land.
                     dump = json.loads(outb)
                     if mode == 'separated':
-                        result["osd." + str(osd_id)] = dump['mempool']['by_pool']
+                        # Anonymize mon and mds
+                        if daemon_type != 'osd':
+                            anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                            daemon = anonymized_daemons[daemon]
+                        result[daemon_type][daemon] = dump['mempool']['by_pool']
                     elif mode == 'aggregated':
                         for mem_type in dump['mempool']['by_pool']:
-                            result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
-                            result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+                            result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+                            result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
                     else:
-                        self.log.debug("Incorrect mode specified in get_mempool")
+                        self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
                 except (json.decoder.JSONDecodeError, KeyError) as e:
-                    self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+                    self.log.error("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
                     continue
 
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
         return result
 
     def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
@@ -569,7 +624,7 @@ class Module(MgrModule):
             r, outb, outs = self.osd_command(cmd_dict)
             # Check for invalid calls
             if r != 0:
-                self.log.debug("Invalid command dictionary.")
+                self.log.error("Invalid command dictionary: {}".format(cmd_dict))
                 continue
             else:
                 try:
@@ -656,7 +711,7 @@ class Module(MgrModule):
                 # schema when it doesn't. In either case, we'll handle that
                 # by continuing and collecting what we can from other osds.
                 except (json.decoder.JSONDecodeError, KeyError) as e:
-                    self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+                    self.log.error("Error caught on osd.{}: {}".format(osd_id, e))
                     continue
 
         return list(result.values())
@@ -864,14 +919,14 @@ class Module(MgrModule):
                 m = self.remote('devicehealth', 'get_recent_device_metrics',
                                 devid, min_sample)
             except Exception as e:
-                self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
+                self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
                 continue
 
             # anonymize host id
             try:
                 host = d['location'][0]['host']
             except (KeyError, IndexError) as e:
-                self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e))
+                self.log.error('Unable to get host from device with id "{}": {}'.format(devid, e))
                 continue
             anon_host = self.get_store('host-id/%s' % host)
             if not anon_host:
@@ -1579,6 +1634,9 @@ To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
                 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
                         f"`ceph telemetry enable channel{disabled_channels}`"
 
+            # wake up serve() to reset health warning
+            self.event.set()
+
             return 0, msg, ''
 
     @CLICommand('telemetry off')