]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/telemetry/module.py
import ceph quincy 17.2.5
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
index f7cb29df3a2756b85abc916bb84900f1ac1a72ed..f3d49055a5ee3f0206cbd0931753951169a4c5f1 100644 (file)
@@ -28,6 +28,7 @@ ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
 LICENSE = 'sharing-1-0'
 LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
 LICENSE_URL = 'https://cdla.io/sharing-1-0/'
+NO_SALT_CNT = 0
 
 # Latest revision of the telemetry report.  Bump this each time we make
 # *any* change.
@@ -67,6 +68,8 @@ class Collection(str, enum.Enum):
     basic_mds_metadata = 'basic_mds_metadata'
     basic_pool_usage = 'basic_pool_usage'
     basic_usage_by_class = 'basic_usage_by_class'
+    basic_rook_v01 = 'basic_rook_v01'
+    perf_memory_metrics = 'perf_memory_metrics'
 
 MODULE_COLLECTION : List[Dict] = [
     {
@@ -116,7 +119,48 @@ MODULE_COLLECTION : List[Dict] = [
         "description": "Default device class usage statistics",
         "channel": "basic",
         "nag": False
-    }
+    },
+    {
+        "name": Collection.basic_rook_v01,
+        "description": "Basic Rook deployment data",
+        "channel": "basic",
+        "nag": True
+    },
+    {
+        "name": Collection.perf_memory_metrics,
+        "description": "Heap stats and mempools for mon and mds",
+        "channel": "perf",
+        "nag": False
+    },
+]
+
+ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
+        # Note: a key cannot be both a node and a leaf, e.g.
+        # "rook/a/b"
+        # "rook/a/b/c"
+        ("rook/version", Collection.basic_rook_v01),
+        ("rook/kubernetes/version", Collection.basic_rook_v01),
+        ("rook/csi/version", Collection.basic_rook_v01),
+        ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
+        ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
+        ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
+        ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
+        ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
+        ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
+        ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
+        ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
+        ("rook/cluster/mon/count", Collection.basic_rook_v01),
+        ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
+        ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
+        ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
+        ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
+        ("rook/cluster/network/provider", Collection.basic_rook_v01),
+        ("rook/cluster/external-mode", Collection.basic_rook_v01),
 ]
 
 class Module(MgrModule):
@@ -407,86 +451,156 @@ class Module(MgrModule):
             'active_changed': sorted(list(active)),
         }
 
-    def get_heap_stats(self) -> Dict[str, dict]:
-        # Initialize result dict
-        result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
+    def anonymize_entity_name(self, entity_name:str) -> str:
+        if '.' not in entity_name:
+            self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
+            return entity_name
+
+        (etype, eid) = entity_name.split('.', 1)
+        m = hashlib.sha1()
+        salt = ''
+        if self.salt is not None:
+            salt = self.salt
+        # avoid asserting that salt exists
+        if not self.salt:
+            # do not set self.salt to a temp value
+            salt = f"no_salt_found_{NO_SALT_CNT}"
+            NO_SALT_CNT += 1
+            self.log.debug(f"No salt found, created a temp one: {salt}")
+        m.update(salt.encode('utf-8'))
+        m.update(eid.encode('utf-8'))
+        m.update(salt.encode('utf-8'))
+
+        return  etype + '.' + m.hexdigest()
 
-        # Get list of osd ids from the metadata
-        osd_metadata = self.get('osd_metadata')
+    def get_heap_stats(self) -> Dict[str, dict]:
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
 
-        # Grab output from the "osd.x heap stats" command
-        for osd_id in osd_metadata:
-            cmd_dict = {
-                'prefix': 'heap',
-                'heapcmd': 'stats',
-                'id': str(osd_id),
-            }
-            r, outb, outs = self.osd_command(cmd_dict)
-            if r != 0:
-                self.log.debug("Invalid command dictionary.")
-                continue
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (1/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
+
+        # Grab output from the "daemon.x heap stats" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.', 1)
+            heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+            if heap_stats:
+                if (daemon_type != 'osd'):
+                    # Anonymize mon and mds
+                    anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                    daemon = anonymized_daemons[daemon]
+                result[daemon_type][daemon] = heap_stats
             else:
-                if 'tcmalloc heap stats' in outs:
-                    values = [int(i) for i in outs.split() if i.isdigit()]
-                    # `categories` must be ordered this way for the correct output to be parsed
-                    categories = ['use_by_application',
-                                  'page_heap_freelist',
-                                  'central_cache_freelist',
-                                  'transfer_cache_freelist',
-                                  'thread_cache_freelists',
-                                  'malloc_metadata',
-                                  'actual_memory_used',
-                                  'released_to_os',
-                                  'virtual_address_space_used',
-                                  'spans_in_use',
-                                  'thread_heaps_in_use',
-                                  'tcmalloc_page_size']
-                    if len(values) != len(categories):
-                        self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
-                                'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
-                        continue
-                    osd = 'osd.' + str(osd_id)
-                    result[osd] = dict(zip(categories, values))
-                else:
-                    self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
-                    continue
+                continue
 
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
         return result
 
+    def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+        parsed_output = {}
+
+        cmd_dict = {
+            'prefix': 'heap',
+            'heapcmd': 'stats'
+        }
+        r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+        if r != 0:
+            self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+        else:
+            if 'tcmalloc heap stats' in outb:
+                values = [int(i) for i in outb.split() if i.isdigit()]
+                # `categories` must be ordered this way for the correct output to be parsed
+                categories = ['use_by_application',
+                              'page_heap_freelist',
+                              'central_cache_freelist',
+                              'transfer_cache_freelist',
+                              'thread_cache_freelists',
+                              'malloc_metadata',
+                              'actual_memory_used',
+                              'released_to_os',
+                              'virtual_address_space_used',
+                              'spans_in_use',
+                              'thread_heaps_in_use',
+                              'tcmalloc_page_size']
+                if len(values) != len(categories):
+                    self.log.error('Received unexpected output from {}.{}; ' \
+                                   'number of values should match the number' \
+                                   'of expected categories:\n values: len={} {} '\
+                                   '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+                else:
+                    parsed_output = dict(zip(categories, values))
+            else:
+                self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+        
+        return parsed_output
+
     def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
-        # Initialize result dict
-        result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
 
-        # Get list of osd ids from the metadata
-        osd_metadata = self.get('osd_metadata')
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (2/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
 
-        # Grab output from the "osd.x dump_mempools" command
-        for osd_id in osd_metadata:
+        # Grab output from the "dump_mempools" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.', 1)
             cmd_dict = {
                 'prefix': 'dump_mempools',
-                'id': str(osd_id),
                 'format': 'json'
             }
-            r, outb, outs = self.osd_command(cmd_dict)
+            r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
             if r != 0:
-                self.log.debug("Invalid command dictionary.")
+                self.log.error("Invalid command dictionary: {}".format(cmd_dict))
                 continue
             else:
                 try:
                     # This is where the mempool will land.
                     dump = json.loads(outb)
                     if mode == 'separated':
-                        result["osd." + str(osd_id)] = dump['mempool']['by_pool']
+                        # Anonymize mon and mds
+                        if daemon_type != 'osd':
+                            anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                            daemon = anonymized_daemons[daemon]
+                        result[daemon_type][daemon] = dump['mempool']['by_pool']
                     elif mode == 'aggregated':
                         for mem_type in dump['mempool']['by_pool']:
-                            result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
-                            result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+                            result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+                            result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
                     else:
-                        self.log.debug("Incorrect mode specified in get_mempool")
+                        self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
                 except (json.decoder.JSONDecodeError, KeyError) as e:
-                    self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+                    self.log.error("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
                     continue
 
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
         return result
 
     def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
@@ -510,7 +624,7 @@ class Module(MgrModule):
             r, outb, outs = self.osd_command(cmd_dict)
             # Check for invalid calls
             if r != 0:
-                self.log.debug("Invalid command dictionary.")
+                self.log.error("Invalid command dictionary: {}".format(cmd_dict))
                 continue
             else:
                 try:
@@ -597,7 +711,7 @@ class Module(MgrModule):
                 # schema when it doesn't. In either case, we'll handle that
                 # by continuing and collecting what we can from other osds.
                 except (json.decoder.JSONDecodeError, KeyError) as e:
-                    self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+                    self.log.error("Error caught on osd.{}: {}".format(osd_id, e))
                     continue
 
         return list(result.values())
@@ -691,17 +805,27 @@ class Module(MgrModule):
         result: Dict[str, dict] = defaultdict(lambda: defaultdict(
             lambda: defaultdict(lambda: defaultdict(int))))
 
-        for daemon in all_perf_counters:
+        # 'separated' mode
+        anonymized_daemon_dict = {}
+
+        for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
+            daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
+
+            if mode == 'separated':
+                # anonymize individual daemon names except osds
+                if (daemon_type != 'osd'):
+                    anonymized_daemon = self.anonymize_entity_name(daemon)
+                    anonymized_daemon_dict[anonymized_daemon] = daemon
+                    daemon = anonymized_daemon
 
             # Calculate num combined daemon types if in aggregated mode
             if mode == 'aggregated':
-                daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
                 if 'num_combined_daemons' not in result[daemon_type]:
                     result[daemon_type]['num_combined_daemons'] = 1
                 else:
                     result[daemon_type]['num_combined_daemons'] += 1
 
-            for collection in all_perf_counters[daemon]:
+            for collection in all_perf_counters_by_daemon:
                 # Split the collection to avoid redundancy in final report; i.e.:
                 #   bluestore.kv_flush_lat, bluestore.kv_final_lat -->
                 #   bluestore: kv_flush_lat, kv_final_lat
@@ -721,12 +845,12 @@ class Module(MgrModule):
                 if mode == 'separated':
                     # Add value to result
                     result[daemon][col_0][col_1]['value'] = \
-                            all_perf_counters[daemon][collection]['value']
+                            all_perf_counters_by_daemon[collection]['value']
 
                     # Check that 'count' exists, as not all counters have a count field.
-                    if 'count' in all_perf_counters[daemon][collection]:
+                    if 'count' in all_perf_counters_by_daemon[collection]:
                         result[daemon][col_0][col_1]['count'] = \
-                                all_perf_counters[daemon][collection]['count']
+                                all_perf_counters_by_daemon[collection]['count']
                 elif mode == 'aggregated':
                     # Not every rgw daemon has the same schema. Specifically, each rgw daemon
                     # has a uniquely-named collection that starts off identically (i.e.
@@ -740,18 +864,22 @@ class Module(MgrModule):
                     # the files are of type 'pair' (real-integer-pair, integer-integer pair).
                     # In those cases, the value is a dictionary, and not a number.
                     #   i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
-                    if isinstance(all_perf_counters[daemon][collection]['value'], numbers.Number):
+                    if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
                         result[daemon_type][col_0][col_1]['value'] += \
-                                all_perf_counters[daemon][collection]['value']
+                                all_perf_counters_by_daemon[collection]['value']
 
                     # Check that 'count' exists, as not all counters have a count field.
-                    if 'count' in all_perf_counters[daemon][collection]:
+                    if 'count' in all_perf_counters_by_daemon[collection]:
                         result[daemon_type][col_0][col_1]['count'] += \
-                                all_perf_counters[daemon][collection]['count']
+                                all_perf_counters_by_daemon[collection]['count']
                 else:
                     self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
                     return {}
 
+        if mode == 'separated':
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
+
         return result
 
     def get_active_channels(self) -> List[str]:
@@ -791,14 +919,14 @@ class Module(MgrModule):
                 m = self.remote('devicehealth', 'get_recent_device_metrics',
                                 devid, min_sample)
             except Exception as e:
-                self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
+                self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
                 continue
 
             # anonymize host id
             try:
                 host = d['location'][0]['host']
             except (KeyError, IndexError) as e:
-                self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e))
+                self.log.error('Unable to get host from device with id "{}": {}'.format(devid, e))
                 continue
             anon_host = self.get_store('host-id/%s' % host)
             if not anon_host:
@@ -1128,6 +1256,9 @@ class Module(MgrModule):
                     'active': False
                 }
 
+            # Rook
+            self.get_rook_data(report)
+
         if 'crash' in channels:
             report['crashes'] = self.gather_crashinfo()
 
@@ -1147,6 +1278,42 @@ class Module(MgrModule):
 
         return report
 
+    def get_rook_data(self, report: Dict[str, object]) -> None:
+        r, outb, outs = self.mon_command({
+            'prefix': 'config-key dump',
+            'format': 'json'
+        })
+        if r != 0:
+            return
+        try:
+            config_kv_dump = json.loads(outb)
+        except json.decoder.JSONDecodeError:
+            return
+
+        for elem in ROOK_KEYS_BY_COLLECTION:
+            # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
+            # elem[1] is the Collection this key belongs to
+            if self.is_enabled_collection(elem[1]):
+                self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
+
+    def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
+        last_node = key_path.split('/')[-1]
+        for node in key_path.split('/')[0:-1]:
+            if node not in report:
+                report[node] = {}
+            report = report[node]  # type: ignore
+
+            # sanity check of keys correctness
+            if not isinstance(report, dict):
+                self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
+                return
+
+        if last_node in report:
+            self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
+            return
+
+        report[last_node] = value
+
     def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
         self.log.info('Sending %s to: %s' % (what, url))
         proxies = dict()
@@ -1467,6 +1634,9 @@ To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
                 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
                         f"`ceph telemetry enable channel{disabled_channels}`"
 
+            # wake up serve() to reset health warning
+            self.event.set()
+
             return 0, msg, ''
 
     @CLICommand('telemetry off')