LICENSE = 'sharing-1-0'
LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
LICENSE_URL = 'https://cdla.io/sharing-1-0/'
+NO_SALT_CNT = 0
# Latest revision of the telemetry report. Bump this each time we make
# *any* change.
basic_mds_metadata = 'basic_mds_metadata'
basic_pool_usage = 'basic_pool_usage'
basic_usage_by_class = 'basic_usage_by_class'
+ basic_rook_v01 = 'basic_rook_v01'
+ perf_memory_metrics = 'perf_memory_metrics'
MODULE_COLLECTION : List[Dict] = [
{
"description": "Default device class usage statistics",
"channel": "basic",
"nag": False
- }
+ },
+ {
+ "name": Collection.basic_rook_v01,
+ "description": "Basic Rook deployment data",
+ "channel": "basic",
+ "nag": True
+ },
+ {
+ "name": Collection.perf_memory_metrics,
+ "description": "Heap stats and mempools for mon and mds",
+ "channel": "perf",
+ "nag": False
+ },
+]
+
+ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
+ # Note: a key cannot be both a node and a leaf, e.g.
+ # "rook/a/b"
+ # "rook/a/b/c"
+ ("rook/version", Collection.basic_rook_v01),
+ ("rook/kubernetes/version", Collection.basic_rook_v01),
+ ("rook/csi/version", Collection.basic_rook_v01),
+ ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
+ ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
+ ("rook/cluster/mon/count", Collection.basic_rook_v01),
+ ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
+ ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
+ ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
+ ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
+ ("rook/cluster/network/provider", Collection.basic_rook_v01),
+ ("rook/cluster/external-mode", Collection.basic_rook_v01),
]
class Module(MgrModule):
'active_changed': sorted(list(active)),
}
- def get_heap_stats(self) -> Dict[str, dict]:
- # Initialize result dict
- result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
+ def anonymize_entity_name(self, entity_name:str) -> str:
+ if '.' not in entity_name:
+ self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
+ return entity_name
+
+ (etype, eid) = entity_name.split('.', 1)
+ m = hashlib.sha1()
+ salt = ''
+ if self.salt is not None:
+ salt = self.salt
+ # avoid asserting that salt exists
+ if not self.salt:
+ # do not set self.salt to a temp value
+ salt = f"no_salt_found_{NO_SALT_CNT}"
+ NO_SALT_CNT += 1
+ self.log.debug(f"No salt found, created a temp one: {salt}")
+ m.update(salt.encode('utf-8'))
+ m.update(eid.encode('utf-8'))
+ m.update(salt.encode('utf-8'))
+
+ return etype + '.' + m.hexdigest()
- # Get list of osd ids from the metadata
- osd_metadata = self.get('osd_metadata')
+ def get_heap_stats(self) -> Dict[str, dict]:
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
- # Grab output from the "osd.x heap stats" command
- for osd_id in osd_metadata:
- cmd_dict = {
- 'prefix': 'heap',
- 'heapcmd': 'stats',
- 'id': str(osd_id),
- }
- r, outb, outs = self.osd_command(cmd_dict)
- if r != 0:
- self.log.debug("Invalid command dictionary.")
- continue
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (1/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
+
+ # Grab output from the "daemon.x heap stats" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.', 1)
+ heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+ if heap_stats:
+ if (daemon_type != 'osd'):
+ # Anonymize mon and mds
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = heap_stats
else:
- if 'tcmalloc heap stats' in outs:
- values = [int(i) for i in outs.split() if i.isdigit()]
- # `categories` must be ordered this way for the correct output to be parsed
- categories = ['use_by_application',
- 'page_heap_freelist',
- 'central_cache_freelist',
- 'transfer_cache_freelist',
- 'thread_cache_freelists',
- 'malloc_metadata',
- 'actual_memory_used',
- 'released_to_os',
- 'virtual_address_space_used',
- 'spans_in_use',
- 'thread_heaps_in_use',
- 'tcmalloc_page_size']
- if len(values) != len(categories):
- self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
- 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
- continue
- osd = 'osd.' + str(osd_id)
- result[osd] = dict(zip(categories, values))
- else:
- self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
- continue
+ continue
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
return result
+ def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+ parsed_output = {}
+
+ cmd_dict = {
+ 'prefix': 'heap',
+ 'heapcmd': 'stats'
+ }
+ r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ else:
+ if 'tcmalloc heap stats' in outb:
+ values = [int(i) for i in outb.split() if i.isdigit()]
+ # `categories` must be ordered this way for the correct output to be parsed
+ categories = ['use_by_application',
+ 'page_heap_freelist',
+ 'central_cache_freelist',
+ 'transfer_cache_freelist',
+ 'thread_cache_freelists',
+ 'malloc_metadata',
+ 'actual_memory_used',
+ 'released_to_os',
+ 'virtual_address_space_used',
+ 'spans_in_use',
+ 'thread_heaps_in_use',
+ 'tcmalloc_page_size']
+ if len(values) != len(categories):
+ self.log.error('Received unexpected output from {}.{}; ' \
+ 'number of values should match the number' \
+ 'of expected categories:\n values: len={} {} '\
+ '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+ else:
+ parsed_output = dict(zip(categories, values))
+ else:
+ self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+
+ return parsed_output
+
def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
- # Initialize result dict
- result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
- # Get list of osd ids from the metadata
- osd_metadata = self.get('osd_metadata')
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (2/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
- # Grab output from the "osd.x dump_mempools" command
- for osd_id in osd_metadata:
+ # Grab output from the "dump_mempools" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.', 1)
cmd_dict = {
'prefix': 'dump_mempools',
- 'id': str(osd_id),
'format': 'json'
}
- r, outb, outs = self.osd_command(cmd_dict)
+ r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
if r != 0:
- self.log.debug("Invalid command dictionary.")
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
continue
else:
try:
# This is where the mempool will land.
dump = json.loads(outb)
if mode == 'separated':
- result["osd." + str(osd_id)] = dump['mempool']['by_pool']
+ # Anonymize mon and mds
+ if daemon_type != 'osd':
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = dump['mempool']['by_pool']
elif mode == 'aggregated':
for mem_type in dump['mempool']['by_pool']:
- result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
- result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+ result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+ result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
else:
- self.log.debug("Incorrect mode specified in get_mempool")
+ self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
except (json.decoder.JSONDecodeError, KeyError) as e:
- self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+ self.log.error("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
continue
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
return result
def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
r, outb, outs = self.osd_command(cmd_dict)
# Check for invalid calls
if r != 0:
- self.log.debug("Invalid command dictionary.")
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
continue
else:
try:
# schema when it doesn't. In either case, we'll handle that
# by continuing and collecting what we can from other osds.
except (json.decoder.JSONDecodeError, KeyError) as e:
- self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+ self.log.error("Error caught on osd.{}: {}".format(osd_id, e))
continue
return list(result.values())
result: Dict[str, dict] = defaultdict(lambda: defaultdict(
lambda: defaultdict(lambda: defaultdict(int))))
- for daemon in all_perf_counters:
+ # 'separated' mode
+ anonymized_daemon_dict = {}
+
+ for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
+ daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
+
+ if mode == 'separated':
+ # anonymize individual daemon names except osds
+ if (daemon_type != 'osd'):
+ anonymized_daemon = self.anonymize_entity_name(daemon)
+ anonymized_daemon_dict[anonymized_daemon] = daemon
+ daemon = anonymized_daemon
# Calculate num combined daemon types if in aggregated mode
if mode == 'aggregated':
- daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
if 'num_combined_daemons' not in result[daemon_type]:
result[daemon_type]['num_combined_daemons'] = 1
else:
result[daemon_type]['num_combined_daemons'] += 1
- for collection in all_perf_counters[daemon]:
+ for collection in all_perf_counters_by_daemon:
# Split the collection to avoid redundancy in final report; i.e.:
# bluestore.kv_flush_lat, bluestore.kv_final_lat -->
# bluestore: kv_flush_lat, kv_final_lat
if mode == 'separated':
# Add value to result
result[daemon][col_0][col_1]['value'] = \
- all_perf_counters[daemon][collection]['value']
+ all_perf_counters_by_daemon[collection]['value']
# Check that 'count' exists, as not all counters have a count field.
- if 'count' in all_perf_counters[daemon][collection]:
+ if 'count' in all_perf_counters_by_daemon[collection]:
result[daemon][col_0][col_1]['count'] = \
- all_perf_counters[daemon][collection]['count']
+ all_perf_counters_by_daemon[collection]['count']
elif mode == 'aggregated':
# Not every rgw daemon has the same schema. Specifically, each rgw daemon
# has a uniquely-named collection that starts off identically (i.e.
# the files are of type 'pair' (real-integer-pair, integer-integer pair).
# In those cases, the value is a dictionary, and not a number.
# i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
- if isinstance(all_perf_counters[daemon][collection]['value'], numbers.Number):
+ if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
result[daemon_type][col_0][col_1]['value'] += \
- all_perf_counters[daemon][collection]['value']
+ all_perf_counters_by_daemon[collection]['value']
# Check that 'count' exists, as not all counters have a count field.
- if 'count' in all_perf_counters[daemon][collection]:
+ if 'count' in all_perf_counters_by_daemon[collection]:
result[daemon_type][col_0][col_1]['count'] += \
- all_perf_counters[daemon][collection]['count']
+ all_perf_counters_by_daemon[collection]['count']
else:
self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
return {}
+ if mode == 'separated':
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
+
return result
def get_active_channels(self) -> List[str]:
m = self.remote('devicehealth', 'get_recent_device_metrics',
devid, min_sample)
except Exception as e:
- self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
+ self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
continue
# anonymize host id
try:
host = d['location'][0]['host']
except (KeyError, IndexError) as e:
- self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e))
+ self.log.error('Unable to get host from device with id "{}": {}'.format(devid, e))
continue
anon_host = self.get_store('host-id/%s' % host)
if not anon_host:
'active': False
}
+ # Rook
+ self.get_rook_data(report)
+
if 'crash' in channels:
report['crashes'] = self.gather_crashinfo()
return report
+ def get_rook_data(self, report: Dict[str, object]) -> None:
+ r, outb, outs = self.mon_command({
+ 'prefix': 'config-key dump',
+ 'format': 'json'
+ })
+ if r != 0:
+ return
+ try:
+ config_kv_dump = json.loads(outb)
+ except json.decoder.JSONDecodeError:
+ return
+
+ for elem in ROOK_KEYS_BY_COLLECTION:
+ # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
+ # elem[1] is the Collection this key belongs to
+ if self.is_enabled_collection(elem[1]):
+ self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
+
+ def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
+ last_node = key_path.split('/')[-1]
+ for node in key_path.split('/')[0:-1]:
+ if node not in report:
+ report[node] = {}
+ report = report[node] # type: ignore
+
+ # sanity check of keys correctness
+ if not isinstance(report, dict):
+ self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
+ return
+
+ if last_node in report:
+ self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
+ return
+
+ report[last_node] = value
+
def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
self.log.info('Sending %s to: %s' % (what, url))
proxies = dict()
msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
f"`ceph telemetry enable channel{disabled_channels}`"
+ # wake up serve() to reset health warning
+ self.event.set()
+
return 0, msg, ''
@CLICommand('telemetry off')