basic_pool_usage = 'basic_pool_usage'
basic_usage_by_class = 'basic_usage_by_class'
basic_rook_v01 = 'basic_rook_v01'
+ perf_memory_metrics = 'perf_memory_metrics'
MODULE_COLLECTION : List[Dict] = [
{
"channel": "basic",
"nag": True
},
+ {
+ "name": Collection.perf_memory_metrics,
+ "description": "Heap stats and mempools for mon and mds",
+ "channel": "perf",
+ "nag": False
+ },
]
ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
return etype + '.' + m.hexdigest()
def get_heap_stats(self) -> Dict[str, dict]:
- # Initialize result dict
- result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
-
- # Get list of osd ids from the metadata
- osd_metadata = self.get('osd_metadata')
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
- # Grab output from the "osd.x heap stats" command
- for osd_id in osd_metadata:
- cmd_dict = {
- 'prefix': 'heap',
- 'heapcmd': 'stats',
- 'id': str(osd_id),
- }
- r, outb, outs = self.osd_command(cmd_dict)
- if r != 0:
- self.log.debug("Invalid command dictionary.")
- continue
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (1/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
+
+ # Grab output from the "daemon.x heap stats" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.')
+ heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+ if heap_stats:
+ if (daemon_type != 'osd'):
+ # Anonymize mon and mds
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = heap_stats
else:
- if 'tcmalloc heap stats' in outs:
- values = [int(i) for i in outs.split() if i.isdigit()]
- # `categories` must be ordered this way for the correct output to be parsed
- categories = ['use_by_application',
- 'page_heap_freelist',
- 'central_cache_freelist',
- 'transfer_cache_freelist',
- 'thread_cache_freelists',
- 'malloc_metadata',
- 'actual_memory_used',
- 'released_to_os',
- 'virtual_address_space_used',
- 'spans_in_use',
- 'thread_heaps_in_use',
- 'tcmalloc_page_size']
- if len(values) != len(categories):
- self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
- 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
- continue
- osd = 'osd.' + str(osd_id)
- result[osd] = dict(zip(categories, values))
- else:
- self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
- continue
+ continue
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
return result
+ def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+ parsed_output = {}
+
+ cmd_dict = {
+ 'prefix': 'heap',
+ 'heapcmd': 'stats'
+ }
+ r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ else:
+ if 'tcmalloc heap stats' in outb:
+ values = [int(i) for i in outb.split() if i.isdigit()]
+ # `categories` must be ordered this way for the correct output to be parsed
+ categories = ['use_by_application',
+ 'page_heap_freelist',
+ 'central_cache_freelist',
+ 'transfer_cache_freelist',
+ 'thread_cache_freelists',
+ 'malloc_metadata',
+ 'actual_memory_used',
+ 'released_to_os',
+ 'virtual_address_space_used',
+ 'spans_in_use',
+ 'thread_heaps_in_use',
+ 'tcmalloc_page_size']
+ if len(values) != len(categories):
+ self.log.error('Received unexpected output from {}.{}; ' \
+ 'number of values should match the number' \
+ 'of expected categories:\n values: len={} {} '\
+ '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+ else:
+ parsed_output = dict(zip(categories, values))
+ else:
+ self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+
+ return parsed_output
+
def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
- # Initialize result dict
- result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
- # Get list of osd ids from the metadata
- osd_metadata = self.get('osd_metadata')
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (2/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
- # Grab output from the "osd.x dump_mempools" command
- for osd_id in osd_metadata:
+ # Grab output from the "dump_mempools" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.')
cmd_dict = {
'prefix': 'dump_mempools',
- 'id': str(osd_id),
'format': 'json'
}
- r, outb, outs = self.osd_command(cmd_dict)
+ r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
if r != 0:
- self.log.debug("Invalid command dictionary.")
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
continue
else:
try:
# This is where the mempool will land.
dump = json.loads(outb)
if mode == 'separated':
- result["osd." + str(osd_id)] = dump['mempool']['by_pool']
+ # Anonymize mon and mds
+ if daemon_type != 'osd':
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = dump['mempool']['by_pool']
elif mode == 'aggregated':
for mem_type in dump['mempool']['by_pool']:
- result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
- result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+ result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+ result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
else:
- self.log.debug("Incorrect mode specified in get_mempool")
+ self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
except (json.decoder.JSONDecodeError, KeyError) as e:
- self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+ self.log.error("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
continue
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
return result
def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
r, outb, outs = self.osd_command(cmd_dict)
# Check for invalid calls
if r != 0:
- self.log.debug("Invalid command dictionary.")
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
continue
else:
try:
# schema when it doesn't. In either case, we'll handle that
# by continuing and collecting what we can from other osds.
except (json.decoder.JSONDecodeError, KeyError) as e:
- self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+ self.log.error("Error caught on osd.{}: {}".format(osd_id, e))
continue
return list(result.values())
m = self.remote('devicehealth', 'get_recent_device_metrics',
devid, min_sample)
except Exception as e:
- self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
+ self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
continue
# anonymize host id
try:
host = d['location'][0]['host']
except (KeyError, IndexError) as e:
- self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e))
+ self.log.error('Unable to get host from device with id "{}": {}'.format(devid, e))
continue
anon_host = self.get_store('host-id/%s' % host)
if not anon_host:
msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
f"`ceph telemetry enable channel{disabled_channels}`"
+ # wake up serve() to reset health warning
+ self.event.set()
+
return 0, msg, ''
@CLICommand('telemetry off')