2 Telemetry module for ceph-mgr
4 Collect statistics from Ceph cluster and send this back to the Ceph project
17 from datetime
import datetime
, timedelta
18 from prettytable
import PrettyTable
19 from threading
import Event
, Lock
20 from collections
import defaultdict
21 from typing
import cast
, Any
, DefaultDict
, Dict
, List
, Optional
, Tuple
, TypeVar
, TYPE_CHECKING
, Union
23 from mgr_module
import CLICommand
, CLIReadCommand
, MgrModule
, Option
, OptionValue
, ServiceInfoT
26 ALL_CHANNELS
= ['basic', 'ident', 'crash', 'device', 'perf']
28 LICENSE
= 'sharing-1-0'
29 LICENSE_NAME
= 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL
= 'https://cdla.io/sharing-1-0/'
33 # Latest revision of the telemetry report. Bump this each time we make
37 # History of revisions
38 # --------------------
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
48 # - added explicit license acknowledgement to the opt-in process
51 # - added device health metrics (i.e., SMART data, minus serial number)
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
62 class Collection(str, enum
.Enum
):
63 basic_base
= 'basic_base'
64 device_base
= 'device_base'
65 crash_base
= 'crash_base'
66 ident_base
= 'ident_base'
67 perf_perf
= 'perf_perf'
68 basic_mds_metadata
= 'basic_mds_metadata'
69 basic_pool_usage
= 'basic_pool_usage'
70 basic_usage_by_class
= 'basic_usage_by_class'
71 basic_rook_v01
= 'basic_rook_v01'
72 perf_memory_metrics
= 'perf_memory_metrics'
73 basic_pool_options_bluestore
= 'basic_pool_options_bluestore'
75 MODULE_COLLECTION
: List
[Dict
] = [
77 "name": Collection
.basic_base
,
78 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
83 "name": Collection
.device_base
,
84 "description": "Information about device health metrics",
89 "name": Collection
.crash_base
,
90 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
95 "name": Collection
.ident_base
,
96 "description": "User-provided identifying information about the cluster",
101 "name": Collection
.perf_perf
,
102 "description": "Information about performance counters of the cluster",
107 "name": Collection
.basic_mds_metadata
,
108 "description": "MDS metadata",
113 "name": Collection
.basic_pool_usage
,
114 "description": "Default pool application and usage statistics",
119 "name": Collection
.basic_usage_by_class
,
120 "description": "Default device class usage statistics",
125 "name": Collection
.basic_rook_v01
,
126 "description": "Basic Rook deployment data",
131 "name": Collection
.perf_memory_metrics
,
132 "description": "Heap stats and mempools for mon and mds",
137 "name": Collection
.basic_pool_options_bluestore
,
138 "description": "Per-pool bluestore config options",
144 ROOK_KEYS_BY_COLLECTION
: List
[Tuple
[str, Collection
]] = [
145 # Note: a key cannot be both a node and a leaf, e.g.
148 ("rook/version", Collection
.basic_rook_v01
),
149 ("rook/kubernetes/version", Collection
.basic_rook_v01
),
150 ("rook/csi/version", Collection
.basic_rook_v01
),
151 ("rook/node/count/kubernetes-total", Collection
.basic_rook_v01
),
152 ("rook/node/count/with-ceph-daemons", Collection
.basic_rook_v01
),
153 ("rook/node/count/with-csi-rbd-plugin", Collection
.basic_rook_v01
),
154 ("rook/node/count/with-csi-cephfs-plugin", Collection
.basic_rook_v01
),
155 ("rook/node/count/with-csi-nfs-plugin", Collection
.basic_rook_v01
),
156 ("rook/usage/storage-class/count/total", Collection
.basic_rook_v01
),
157 ("rook/usage/storage-class/count/rbd", Collection
.basic_rook_v01
),
158 ("rook/usage/storage-class/count/cephfs", Collection
.basic_rook_v01
),
159 ("rook/usage/storage-class/count/nfs", Collection
.basic_rook_v01
),
160 ("rook/usage/storage-class/count/bucket", Collection
.basic_rook_v01
),
161 ("rook/cluster/storage/device-set/count/total", Collection
.basic_rook_v01
),
162 ("rook/cluster/storage/device-set/count/portable", Collection
.basic_rook_v01
),
163 ("rook/cluster/storage/device-set/count/non-portable", Collection
.basic_rook_v01
),
164 ("rook/cluster/mon/count", Collection
.basic_rook_v01
),
165 ("rook/cluster/mon/allow-multiple-per-node", Collection
.basic_rook_v01
),
166 ("rook/cluster/mon/max-id", Collection
.basic_rook_v01
),
167 ("rook/cluster/mon/pvc/enabled", Collection
.basic_rook_v01
),
168 ("rook/cluster/mon/stretch/enabled", Collection
.basic_rook_v01
),
169 ("rook/cluster/network/provider", Collection
.basic_rook_v01
),
170 ("rook/cluster/external-mode", Collection
.basic_rook_v01
),
173 class Module(MgrModule
):
179 "kernel_description",
181 "distro_description",
188 default
='https://telemetry.ceph.com/report'),
189 Option(name
='device_url',
191 default
='https://telemetry.ceph.com/device'),
192 Option(name
='enabled',
195 Option(name
='last_opt_revision',
198 Option(name
='leaderboard',
201 Option(name
='leaderboard_description',
204 Option(name
='description',
207 Option(name
='contact',
210 Option(name
='organization',
216 Option(name
='interval',
220 Option(name
='channel_basic',
223 desc
='Share basic cluster information (size, version)'),
224 Option(name
='channel_ident',
227 desc
='Share a user-provided description and/or contact email for the cluster'),
228 Option(name
='channel_crash',
231 desc
='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
232 Option(name
='channel_device',
235 desc
=('Share device health metrics '
236 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
237 Option(name
='channel_perf',
240 desc
='Share various performance metrics of a cluster'),
244 def config_keys(self
) -> Dict
[str, OptionValue
]:
245 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
247 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
248 super(Module
, self
).__init
__(*args
, **kwargs
)
251 self
.db_collection
: Optional
[List
[str]] = None
252 self
.last_opted_in_ceph_version
: Optional
[int] = None
253 self
.last_opted_out_ceph_version
: Optional
[int] = None
254 self
.last_upload
: Optional
[int] = None
255 self
.last_report
: Dict
[str, Any
] = dict()
256 self
.report_id
: Optional
[str] = None
257 self
.salt
: Optional
[str] = None
258 self
.get_report_lock
= Lock()
259 self
.config_update_module_option()
260 # for mypy which does not run the code
265 self
.last_opt_revision
= 0
266 self
.leaderboard
= ''
267 self
.leaderboard_description
= ''
270 self
.channel_basic
= True
271 self
.channel_ident
= False
272 self
.channel_crash
= True
273 self
.channel_device
= True
274 self
.channel_perf
= False
275 self
.db_collection
= ['basic_base', 'device_base']
276 self
.last_opted_in_ceph_version
= 17
277 self
.last_opted_out_ceph_version
= 0
279 def config_update_module_option(self
) -> None:
280 for opt
in self
.MODULE_OPTIONS
:
283 self
.get_module_option(opt
['name']))
284 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
286 def config_notify(self
) -> None:
287 self
.config_update_module_option()
288 # wake up serve() thread
291 def load(self
) -> None:
292 last_upload
= self
.get_store('last_upload', None)
293 if last_upload
is None:
294 self
.last_upload
= None
296 self
.last_upload
= int(last_upload
)
298 report_id
= self
.get_store('report_id', None)
299 if report_id
is None:
300 self
.report_id
= str(uuid
.uuid4())
301 self
.set_store('report_id', self
.report_id
)
303 self
.report_id
= report_id
305 salt
= self
.get_store('salt', None)
307 self
.salt
= str(uuid
.uuid4())
308 self
.set_store('salt', self
.salt
)
312 self
.init_collection()
314 last_opted_in_ceph_version
= self
.get_store('last_opted_in_ceph_version', None)
315 if last_opted_in_ceph_version
is None:
316 self
.last_opted_in_ceph_version
= None
318 self
.last_opted_in_ceph_version
= int(last_opted_in_ceph_version
)
320 last_opted_out_ceph_version
= self
.get_store('last_opted_out_ceph_version', None)
321 if last_opted_out_ceph_version
is None:
322 self
.last_opted_out_ceph_version
= None
324 self
.last_opted_out_ceph_version
= int(last_opted_out_ceph_version
)
326 def gather_osd_metadata(self
,
327 osd_map
: Dict
[str, List
[Dict
[str, int]]]) -> Dict
[str, Dict
[str, int]]:
328 keys
= ["osd_objectstore", "rotational"]
329 keys
+= self
.metadata_keys
331 metadata
: Dict
[str, Dict
[str, int]] = dict()
333 metadata
[key
] = defaultdict(int)
335 for osd
in osd_map
['osds']:
336 res
= self
.get_metadata('osd', str(osd
['osd']))
338 self
.log
.debug('Could not get metadata for osd.%s' % str(osd
['osd']))
340 for k
, v
in res
.items():
348 def gather_mon_metadata(self
,
349 mon_map
: Dict
[str, List
[Dict
[str, str]]]) -> Dict
[str, Dict
[str, int]]:
351 keys
+= self
.metadata_keys
353 metadata
: Dict
[str, Dict
[str, int]] = dict()
355 metadata
[key
] = defaultdict(int)
357 for mon
in mon_map
['mons']:
358 res
= self
.get_metadata('mon', mon
['name'])
360 self
.log
.debug('Could not get metadata for mon.%s' % (mon
['name']))
362 for k
, v
in res
.items():
370 def gather_mds_metadata(self
) -> Dict
[str, Dict
[str, int]]:
371 metadata
: Dict
[str, Dict
[str, int]] = dict()
373 res
= self
.get('mds_metadata') # metadata of *all* mds daemons
374 if res
is None or not res
:
375 self
.log
.debug('Could not get metadata for mds daemons')
379 keys
+= self
.metadata_keys
382 metadata
[key
] = defaultdict(int)
384 for mds
in res
.values():
385 for k
, v
in mds
.items():
393 def gather_crush_info(self
) -> Dict
[str, Union
[int,
398 osdmap
= self
.get_osdmap()
399 crush_raw
= osdmap
.get_crush()
400 crush
= crush_raw
.dump()
402 BucketKeyT
= TypeVar('BucketKeyT', int, str)
404 def inc(d
: Dict
[BucketKeyT
, int], k
: BucketKeyT
) -> None:
410 device_classes
: Dict
[str, int] = {}
411 for dev
in crush
['devices']:
412 inc(device_classes
, dev
.get('class', ''))
414 bucket_algs
: Dict
[str, int] = {}
415 bucket_types
: Dict
[str, int] = {}
416 bucket_sizes
: Dict
[int, int] = {}
417 for bucket
in crush
['buckets']:
418 if '~' in bucket
['name']: # ignore shadow buckets
420 inc(bucket_algs
, bucket
['alg'])
421 inc(bucket_types
, bucket
['type_id'])
422 inc(bucket_sizes
, len(bucket
['items']))
425 'num_devices': len(crush
['devices']),
426 'num_types': len(crush
['types']),
427 'num_buckets': len(crush
['buckets']),
428 'num_rules': len(crush
['rules']),
429 'device_classes': list(device_classes
.values()),
430 'tunables': crush
['tunables'],
431 'compat_weight_set': '-1' in crush
['choose_args'],
432 'num_weight_sets': len(crush
['choose_args']),
433 'bucket_algs': bucket_algs
,
434 'bucket_sizes': bucket_sizes
,
435 'bucket_types': bucket_types
,
438 def gather_configs(self
) -> Dict
[str, List
[str]]:
439 # cluster config options
441 r
, outb
, outs
= self
.mon_command({
442 'prefix': 'config dump',
448 dump
= json
.loads(outb
)
449 except json
.decoder
.JSONDecodeError
:
452 name
= opt
.get('name')
455 # daemon-reported options (which may include ceph.conf)
457 ls
= self
.get("modified_config_options")
458 for opt
in ls
.get('options', {}):
461 'cluster_changed': sorted(list(cluster
)),
462 'active_changed': sorted(list(active
)),
465 def anonymize_entity_name(self
, entity_name
:str) -> str:
466 if '.' not in entity_name
:
467 self
.log
.debug(f
"Cannot split entity name ({entity_name}), no '.' is found")
470 (etype
, eid
) = entity_name
.split('.', 1)
473 if self
.salt
is not None:
475 # avoid asserting that salt exists
477 # do not set self.salt to a temp value
478 salt
= f
"no_salt_found_{NO_SALT_CNT}"
480 self
.log
.debug(f
"No salt found, created a temp one: {salt}")
481 m
.update(salt
.encode('utf-8'))
482 m
.update(eid
.encode('utf-8'))
483 m
.update(salt
.encode('utf-8'))
485 return etype
+ '.' + m
.hexdigest()
487 def get_heap_stats(self
) -> Dict
[str, dict]:
488 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
489 anonymized_daemons
= {}
490 osd_map
= self
.get('osd_map')
492 # Combine available daemons
494 for osd
in osd_map
['osds']:
495 daemons
.append('osd'+'.'+str(osd
['osd']))
496 # perf_memory_metrics collection (1/2)
497 if self
.is_enabled_collection(Collection
.perf_memory_metrics
):
498 mon_map
= self
.get('mon_map')
499 mds_metadata
= self
.get('mds_metadata')
500 for mon
in mon_map
['mons']:
501 daemons
.append('mon'+'.'+mon
['name'])
502 for mds
in mds_metadata
:
503 daemons
.append('mds'+'.'+mds
)
505 # Grab output from the "daemon.x heap stats" command
506 for daemon
in daemons
:
507 daemon_type
, daemon_id
= daemon
.split('.', 1)
508 heap_stats
= self
.parse_heap_stats(daemon_type
, daemon_id
)
510 if (daemon_type
!= 'osd'):
511 # Anonymize mon and mds
512 anonymized_daemons
[daemon
] = self
.anonymize_entity_name(daemon
)
513 daemon
= anonymized_daemons
[daemon
]
514 result
[daemon_type
][daemon
] = heap_stats
518 if anonymized_daemons
:
519 # for debugging purposes only, this data is never reported
520 self
.log
.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons
))
523 def parse_heap_stats(self
, daemon_type
: str, daemon_id
: Any
) -> Dict
[str, int]:
530 r
, outb
, outs
= self
.tell_command(daemon_type
, str(daemon_id
), cmd_dict
)
533 self
.log
.error("Invalid command dictionary: {}".format(cmd_dict
))
535 if 'tcmalloc heap stats' in outb
:
536 values
= [int(i
) for i
in outb
.split() if i
.isdigit()]
537 # `categories` must be ordered this way for the correct output to be parsed
538 categories
= ['use_by_application',
539 'page_heap_freelist',
540 'central_cache_freelist',
541 'transfer_cache_freelist',
542 'thread_cache_freelists',
544 'actual_memory_used',
546 'virtual_address_space_used',
548 'thread_heaps_in_use',
549 'tcmalloc_page_size']
550 if len(values
) != len(categories
):
551 self
.log
.error('Received unexpected output from {}.{}; ' \
552 'number of values should match the number' \
553 'of expected categories:\n values: len={} {} '\
554 '~ categories: len={} {} ~ outs: {}'.format(daemon_type
, daemon_id
, len(values
), values
, len(categories
), categories
, outs
))
556 parsed_output
= dict(zip(categories
, values
))
558 self
.log
.error('No heap stats available on {}.{}: {}'.format(daemon_type
, daemon_id
, outs
))
562 def get_mempool(self
, mode
: str = 'separated') -> Dict
[str, dict]:
563 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
564 anonymized_daemons
= {}
565 osd_map
= self
.get('osd_map')
567 # Combine available daemons
569 for osd
in osd_map
['osds']:
570 daemons
.append('osd'+'.'+str(osd
['osd']))
571 # perf_memory_metrics collection (2/2)
572 if self
.is_enabled_collection(Collection
.perf_memory_metrics
):
573 mon_map
= self
.get('mon_map')
574 mds_metadata
= self
.get('mds_metadata')
575 for mon
in mon_map
['mons']:
576 daemons
.append('mon'+'.'+mon
['name'])
577 for mds
in mds_metadata
:
578 daemons
.append('mds'+'.'+mds
)
580 # Grab output from the "dump_mempools" command
581 for daemon
in daemons
:
582 daemon_type
, daemon_id
= daemon
.split('.', 1)
584 'prefix': 'dump_mempools',
587 r
, outb
, outs
= self
.tell_command(daemon_type
, daemon_id
, cmd_dict
)
589 self
.log
.error("Invalid command dictionary: {}".format(cmd_dict
))
593 # This is where the mempool will land.
594 dump
= json
.loads(outb
)
595 if mode
== 'separated':
596 # Anonymize mon and mds
597 if daemon_type
!= 'osd':
598 anonymized_daemons
[daemon
] = self
.anonymize_entity_name(daemon
)
599 daemon
= anonymized_daemons
[daemon
]
600 result
[daemon_type
][daemon
] = dump
['mempool']['by_pool']
601 elif mode
== 'aggregated':
602 for mem_type
in dump
['mempool']['by_pool']:
603 result
[daemon_type
][mem_type
]['bytes'] += dump
['mempool']['by_pool'][mem_type
]['bytes']
604 result
[daemon_type
][mem_type
]['items'] += dump
['mempool']['by_pool'][mem_type
]['items']
606 self
.log
.error("Incorrect mode specified in get_mempool: {}".format(mode
))
607 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
608 self
.log
.exception("Error caught on {}.{}: {}".format(daemon_type
, daemon_id
, e
))
611 if anonymized_daemons
:
612 # for debugging purposes only, this data is never reported
613 self
.log
.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons
))
617 def get_osd_histograms(self
, mode
: str = 'separated') -> List
[Dict
[str, dict]]:
618 # Initialize result dict
619 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
623 lambda: defaultdict(int))))))
625 # Get list of osd ids from the metadata
626 osd_metadata
= self
.get('osd_metadata')
628 # Grab output from the "osd.x perf histogram dump" command
629 for osd_id
in osd_metadata
:
631 'prefix': 'perf histogram dump',
635 r
, outb
, outs
= self
.osd_command(cmd_dict
)
636 # Check for invalid calls
638 self
.log
.error("Invalid command dictionary: {}".format(cmd_dict
))
642 # This is where the histograms will land if there are any.
643 dump
= json
.loads(outb
)
645 for histogram
in dump
['osd']:
646 # Log axis information. There are two axes, each represented
647 # as a dictionary. Both dictionaries are contained inside a
648 # list called 'axes'.
650 for axis
in dump
['osd'][histogram
]['axes']:
652 # This is the dict that contains information for an individual
653 # axis. It will be appended to the 'axes' list at the end.
654 axis_dict
: Dict
[str, Any
] = defaultdict()
656 # Collecting information for buckets, min, name, etc.
657 axis_dict
['buckets'] = axis
['buckets']
658 axis_dict
['min'] = axis
['min']
659 axis_dict
['name'] = axis
['name']
660 axis_dict
['quant_size'] = axis
['quant_size']
661 axis_dict
['scale_type'] = axis
['scale_type']
663 # Collecting ranges; placing them in lists to
664 # improve readability later on.
666 for _range
in axis
['ranges']:
667 _max
, _min
= None, None
672 ranges
.append([_min
, _max
])
673 axis_dict
['ranges'] = ranges
675 # Now that 'axis_dict' contains all the appropriate
676 # information for the current axis, append it to the 'axes' list.
677 # There will end up being two axes in the 'axes' list, since the
679 axes
.append(axis_dict
)
681 # Add the 'axes' list, containing both axes, to result.
682 # At this point, you will see that the name of the key is the string
683 # form of our axes list (str(axes)). This is there so that histograms
684 # with different axis configs will not be combined.
685 # These key names are later dropped when only the values are returned.
686 result
[str(axes
)][histogram
]['axes'] = axes
688 # Collect current values and make sure they are in
691 for value_list
in dump
['osd'][histogram
]['values']:
692 values
.append([int(v
) for v
in value_list
])
694 if mode
== 'separated':
695 if 'osds' not in result
[str(axes
)][histogram
]:
696 result
[str(axes
)][histogram
]['osds'] = []
697 result
[str(axes
)][histogram
]['osds'].append({'osd_id': int(osd_id
), 'values': values
})
699 elif mode
== 'aggregated':
700 # Aggregate values. If 'values' have already been initialized,
702 if 'values' in result
[str(axes
)][histogram
]:
703 for i
in range (0, len(values
)):
704 for j
in range (0, len(values
[i
])):
705 values
[i
][j
] += result
[str(axes
)][histogram
]['values'][i
][j
]
707 # Add the values to result.
708 result
[str(axes
)][histogram
]['values'] = values
710 # Update num_combined_osds
711 if 'num_combined_osds' not in result
[str(axes
)][histogram
]:
712 result
[str(axes
)][histogram
]['num_combined_osds'] = 1
714 result
[str(axes
)][histogram
]['num_combined_osds'] += 1
716 self
.log
.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode
))
719 # Sometimes, json errors occur if you give it an empty string.
720 # I am also putting in a catch for a KeyError since it could
721 # happen where the code is assuming that a key exists in the
722 # schema when it doesn't. In either case, we'll handle that
723 # by continuing and collecting what we can from other osds.
724 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
725 self
.log
.exception("Error caught on osd.{}: {}".format(osd_id
, e
))
728 return list(result
.values())
730 def get_io_rate(self
) -> dict:
731 return self
.get('io_rate')
733 def get_stats_per_pool(self
) -> dict:
734 result
= self
.get('pg_dump')['pool_stats']
736 # collect application metadata from osd_map
737 osd_map
= self
.get('osd_map')
738 application_metadata
= {pool
['pool']: pool
['application_metadata'] for pool
in osd_map
['pools']}
740 # add application to each pool from pg_dump
742 pool
['application'] = []
743 # Only include default applications
744 for application
in application_metadata
[pool
['poolid']]:
745 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
746 pool
['application'].append(application
)
750 def get_stats_per_pg(self
) -> dict:
751 return self
.get('pg_dump')['pg_stats']
753 def get_rocksdb_stats(self
) -> Dict
[str, str]:
755 result
: Dict
[str, str] = defaultdict()
756 version
= self
.get_rocksdb_version()
759 result
['version'] = version
763 def gather_crashinfo(self
) -> List
[Dict
[str, str]]:
764 crashlist
: List
[Dict
[str, str]] = list()
765 errno
, crashids
, err
= self
.remote('crash', 'ls')
768 for crashid
in crashids
.split():
769 errno
, crashinfo
, err
= self
.remote('crash', 'do_info', crashid
)
772 c
= json
.loads(crashinfo
)
775 del c
['utsname_hostname']
777 # entity_name might have more than one '.', beware
778 (etype
, eid
) = c
.get('entity_name', '').split('.', 1)
781 m
.update(self
.salt
.encode('utf-8'))
782 m
.update(eid
.encode('utf-8'))
783 m
.update(self
.salt
.encode('utf-8'))
784 c
['entity_name'] = etype
+ '.' + m
.hexdigest()
786 # redact final line of python tracebacks, as the exception
787 # payload may contain identifying information
788 if 'mgr_module' in c
and 'backtrace' in c
:
789 # backtrace might be empty
790 if len(c
['backtrace']) > 0:
791 c
['backtrace'][-1] = '<redacted>'
796 def gather_perf_counters(self
, mode
: str = 'separated') -> Dict
[str, dict]:
797 # Extract perf counter data with get_unlabeled_perf_counters(), a method
798 # from mgr/mgr_module.py. This method returns a nested dictionary that
799 # looks a lot like perf schema, except with some additional fields.
801 # Example of output, a snapshot of a mon daemon:
803 # "bluestore.kv_flush_lat": {
805 # "description": "Average kv_thread flush latency",
813 perf_counters
= self
.get_unlabeled_perf_counters()
815 # Initialize 'result' dict
816 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
817 lambda: defaultdict(lambda: defaultdict(int))))
820 anonymized_daemon_dict
= {}
822 for daemon
, perf_counters_by_daemon
in perf_counters
.items():
823 daemon_type
= daemon
[0:3] # i.e. 'mds', 'osd', 'rgw'
825 if mode
== 'separated':
826 # anonymize individual daemon names except osds
827 if (daemon_type
!= 'osd'):
828 anonymized_daemon
= self
.anonymize_entity_name(daemon
)
829 anonymized_daemon_dict
[anonymized_daemon
] = daemon
830 daemon
= anonymized_daemon
832 # Calculate num combined daemon types if in aggregated mode
833 if mode
== 'aggregated':
834 if 'num_combined_daemons' not in result
[daemon_type
]:
835 result
[daemon_type
]['num_combined_daemons'] = 1
837 result
[daemon_type
]['num_combined_daemons'] += 1
839 for collection
in perf_counters_by_daemon
:
840 # Split the collection to avoid redundancy in final report; i.e.:
841 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
842 # bluestore: kv_flush_lat, kv_final_lat
843 col_0
, col_1
= collection
.split('.')
845 # Debug log for empty keys. This initially was a problem for prioritycache
846 # perf counters, where the col_0 was empty for certain mon counters:
848 # "mon.a": { instead of "mon.a": {
849 # "": { "prioritycache": {
850 # "cache_bytes": {...}, "cache_bytes": {...},
852 # This log is here to detect any future instances of a similar issue.
853 if (daemon
== "") or (col_0
== "") or (col_1
== ""):
854 self
.log
.debug("Instance of an empty key: {}{}".format(daemon
, collection
))
856 if mode
== 'separated':
857 # Add value to result
858 result
[daemon
][col_0
][col_1
]['value'] = \
859 perf_counters_by_daemon
[collection
]['value']
861 # Check that 'count' exists, as not all counters have a count field.
862 if 'count' in perf_counters_by_daemon
[collection
]:
863 result
[daemon
][col_0
][col_1
]['count'] = \
864 perf_counters_by_daemon
[collection
]['count']
865 elif mode
== 'aggregated':
866 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
867 # has a uniquely-named collection that starts off identically (i.e.
868 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
869 # This bit of code combines these unique counters all under one rgw instance.
870 # Without this check, the schema would remain separeted out in the final report.
871 if col_0
[0:11] == "objecter-0x":
872 col_0
= "objecter-0x"
874 # Check that the value can be incremented. In some cases,
875 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
876 # In those cases, the value is a dictionary, and not a number.
877 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
878 if isinstance(perf_counters_by_daemon
[collection
]['value'], numbers
.Number
):
879 result
[daemon_type
][col_0
][col_1
]['value'] += \
880 perf_counters_by_daemon
[collection
]['value']
882 # Check that 'count' exists, as not all counters have a count field.
883 if 'count' in perf_counters_by_daemon
[collection
]:
884 result
[daemon_type
][col_0
][col_1
]['count'] += \
885 perf_counters_by_daemon
[collection
]['count']
887 self
.log
.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode
))
890 if mode
== 'separated':
891 # for debugging purposes only, this data is never reported
892 self
.log
.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict
))
896 def get_active_channels(self
) -> List
[str]:
898 if self
.channel_basic
:
900 if self
.channel_crash
:
902 if self
.channel_device
:
904 if self
.channel_ident
:
906 if self
.channel_perf
:
910 def gather_device_report(self
) -> Dict
[str, Dict
[str, Dict
[str, str]]]:
912 time_format
= self
.remote('devicehealth', 'get_time_format')
913 except Exception as e
:
914 self
.log
.debug('Unable to format time: {}'.format(e
))
916 cutoff
= datetime
.utcnow() - timedelta(hours
=self
.interval
* 2)
917 min_sample
= cutoff
.strftime(time_format
)
919 devices
= self
.get('devices')['devices']
921 self
.log
.debug('Unable to get device info from the mgr.')
924 # anon-host-id -> anon-devid -> { timestamp -> record }
925 res
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
929 # this is a map of stamp -> {device info}
930 m
= self
.remote('devicehealth', 'get_recent_device_metrics',
932 except Exception as e
:
933 self
.log
.error('Unable to get recent metrics from device with id "{}": {}'.format(devid
, e
))
938 host
= d
['location'][0]['host']
939 except (KeyError, IndexError) as e
:
940 self
.log
.exception('Unable to get host from device with id "{}": {}'.format(devid
, e
))
942 anon_host
= self
.get_store('host-id/%s' % host
)
944 anon_host
= str(uuid
.uuid1())
945 self
.set_store('host-id/%s' % host
, anon_host
)
947 for dev
, rep
in m
.items():
948 rep
['host_id'] = anon_host
949 if serial
is None and 'serial_number' in rep
:
950 serial
= rep
['serial_number']
952 # anonymize device id
953 anon_devid
= self
.get_store('devid-id/%s' % devid
)
955 # ideally devid is 'vendor_model_serial',
956 # but can also be 'model_serial', 'serial'
958 anon_devid
= f
"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
960 anon_devid
= str(uuid
.uuid1())
961 self
.set_store('devid-id/%s' % devid
, anon_devid
)
962 self
.log
.info('devid %s / %s, host %s / %s' % (devid
, anon_devid
,
965 # anonymize the smartctl report itself
967 m_str
= json
.dumps(m
)
968 m
= json
.loads(m_str
.replace(serial
, 'deleted'))
970 if anon_host
not in res
:
972 res
[anon_host
][anon_devid
] = m
975 def get_latest(self
, daemon_type
: str, daemon_name
: str, stat
: str) -> int:
976 data
= self
.get_counter(daemon_type
, daemon_name
, stat
)[stat
]
982 def compile_report(self
, channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
984 channels
= self
.get_active_channels()
986 'leaderboard': self
.leaderboard
,
987 'leaderboard_description': self
.leaderboard_description
,
989 'report_timestamp': datetime
.utcnow().isoformat(),
990 'report_id': self
.report_id
,
991 'channels': channels
,
992 'channels_available': ALL_CHANNELS
,
994 'collections_available': [c
['name'].name
for c
in MODULE_COLLECTION
],
995 'collections_opted_in': [c
['name'].name
for c
in MODULE_COLLECTION
if self
.is_enabled_collection(c
['name'])],
998 if 'ident' in channels
:
999 for option
in ['description', 'contact', 'organization']:
1000 report
[option
] = getattr(self
, option
)
1002 if 'basic' in channels
:
1003 mon_map
= self
.get('mon_map')
1004 osd_map
= self
.get('osd_map')
1005 service_map
= self
.get('service_map')
1006 fs_map
= self
.get('fs_map')
1008 df_pools
= {pool
['id']: pool
for pool
in df
['pools']}
1010 report
['created'] = mon_map
['created']
1017 for mon
in mon_map
['mons']:
1018 for a
in mon
['public_addrs']['addrvec']:
1019 if a
['type'] == 'v2':
1021 elif a
['type'] == 'v1':
1023 if a
['addr'].startswith('['):
1028 'count': len(mon_map
['mons']),
1029 'features': mon_map
['features'],
1030 'min_mon_release': mon_map
['min_mon_release'],
1031 'v1_addr_mons': v1_mons
,
1032 'v2_addr_mons': v2_mons
,
1033 'ipv4_addr_mons': ipv4_mons
,
1034 'ipv6_addr_mons': ipv6_mons
,
1037 report
['config'] = self
.gather_configs()
1042 rbd_num_images_by_pool
= []
1043 rbd_mirroring_by_pool
= []
1045 report
['pools'] = list()
1046 for pool
in osd_map
['pools']:
1047 num_pg
+= pool
['pg_num']
1049 if pool
['erasure_code_profile']:
1050 orig
= osd_map
['erasure_code_profiles'].get(
1051 pool
['erasure_code_profile'], {})
1053 k
: orig
[k
] for k
in orig
.keys()
1054 if k
in ['k', 'm', 'plugin', 'technique',
1055 'crush-failure-domain', 'l']
1058 'pool': pool
['pool'],
1059 'pg_num': pool
['pg_num'],
1060 'pgp_num': pool
['pg_placement_num'],
1061 'size': pool
['size'],
1062 'min_size': pool
['min_size'],
1063 'pg_autoscale_mode': pool
['pg_autoscale_mode'],
1064 'target_max_bytes': pool
['target_max_bytes'],
1065 'target_max_objects': pool
['target_max_objects'],
1066 'type': ['', 'replicated', '', 'erasure'][pool
['type']],
1067 'erasure_code_profile': ec_profile
,
1068 'cache_mode': pool
['cache_mode'],
1071 # basic_pool_usage collection
1072 if self
.is_enabled_collection(Collection
.basic_pool_usage
):
1073 pool_data
['application'] = []
1074 for application
in pool
['application_metadata']:
1075 # Only include default applications
1076 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
1077 pool_data
['application'].append(application
)
1078 pool_stats
= df_pools
[pool
['pool']]['stats']
1079 pool_data
['stats'] = { # filter out kb_used
1080 'avail_raw': pool_stats
['avail_raw'],
1081 'bytes_used': pool_stats
['bytes_used'],
1082 'compress_bytes_used': pool_stats
['compress_bytes_used'],
1083 'compress_under_bytes': pool_stats
['compress_under_bytes'],
1084 'data_bytes_used': pool_stats
['data_bytes_used'],
1085 'dirty': pool_stats
['dirty'],
1086 'max_avail': pool_stats
['max_avail'],
1087 'objects': pool_stats
['objects'],
1088 'omap_bytes_used': pool_stats
['omap_bytes_used'],
1089 'percent_used': pool_stats
['percent_used'],
1090 'quota_bytes': pool_stats
['quota_bytes'],
1091 'quota_objects': pool_stats
['quota_objects'],
1092 'rd': pool_stats
['rd'],
1093 'rd_bytes': pool_stats
['rd_bytes'],
1094 'stored': pool_stats
['stored'],
1095 'stored_data': pool_stats
['stored_data'],
1096 'stored_omap': pool_stats
['stored_omap'],
1097 'stored_raw': pool_stats
['stored_raw'],
1098 'wr': pool_stats
['wr'],
1099 'wr_bytes': pool_stats
['wr_bytes']
1101 pool_data
['options'] = {}
1102 # basic_pool_options_bluestore collection
1103 if self
.is_enabled_collection(Collection
.basic_pool_options_bluestore
):
1104 bluestore_options
= ['compression_algorithm',
1106 'compression_required_ratio',
1107 'compression_min_blob_size',
1108 'compression_max_blob_size']
1109 for option
in bluestore_options
:
1110 if option
in pool
['options']:
1111 pool_data
['options'][option
] = pool
['options'][option
]
1112 cast(List
[Dict
[str, Any
]], report
['pools']).append(pool_data
)
1113 if 'rbd' in pool
['application_metadata']:
1115 ioctx
= self
.rados
.open_ioctx(pool
['pool_name'])
1116 rbd_num_images_by_pool
.append(
1117 sum(1 for _
in rbd
.RBD().list2(ioctx
)))
1118 rbd_mirroring_by_pool
.append(
1119 rbd
.RBD().mirror_mode_get(ioctx
) != rbd
.RBD_MIRROR_MODE_DISABLED
)
1121 'num_pools': rbd_num_pools
,
1122 'num_images_by_pool': rbd_num_images_by_pool
,
1123 'mirroring_by_pool': rbd_mirroring_by_pool
}
1126 cluster_network
= False
1127 for osd
in osd_map
['osds']:
1128 if osd
['up'] and not cluster_network
:
1129 front_ip
= osd
['public_addrs']['addrvec'][0]['addr'].split(':')[0]
1130 back_ip
= osd
['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
1131 if front_ip
!= back_ip
:
1132 cluster_network
= True
1134 'count': len(osd_map
['osds']),
1135 'require_osd_release': osd_map
['require_osd_release'],
1136 'require_min_compat_client': osd_map
['require_min_compat_client'],
1137 'cluster_network': cluster_network
,
1141 report
['crush'] = self
.gather_crush_info()
1145 'count': len(fs_map
['filesystems']),
1146 'feature_flags': fs_map
['feature_flags'],
1147 'num_standby_mds': len(fs_map
['standbys']),
1150 num_mds
= len(fs_map
['standbys'])
1151 for fsm
in fs_map
['filesystems']:
1161 for gid
, mds
in fs
['info'].items():
1162 num_sessions
+= self
.get_latest('mds', mds
['name'],
1163 'mds_sessions.session_count')
1164 cached_ino
+= self
.get_latest('mds', mds
['name'],
1166 cached_dn
+= self
.get_latest('mds', mds
['name'],
1168 cached_cap
+= self
.get_latest('mds', mds
['name'],
1170 subtrees
+= self
.get_latest('mds', mds
['name'],
1172 if mds
['rank'] == 0:
1173 rfiles
= self
.get_latest('mds', mds
['name'],
1175 rbytes
= self
.get_latest('mds', mds
['name'],
1177 rsnaps
= self
.get_latest('mds', mds
['name'],
1179 report
['fs']['filesystems'].append({ # type: ignore
1180 'max_mds': fs
['max_mds'],
1181 'ever_allowed_features': fs
['ever_allowed_features'],
1182 'explicitly_allowed_features': fs
['explicitly_allowed_features'],
1183 'num_in': len(fs
['in']),
1184 'num_up': len(fs
['up']),
1185 'num_standby_replay': len(
1186 [mds
for gid
, mds
in fs
['info'].items()
1187 if mds
['state'] == 'up:standby-replay']),
1188 'num_mds': len(fs
['info']),
1189 'num_sessions': num_sessions
,
1190 'cached_inos': cached_ino
,
1191 'cached_dns': cached_dn
,
1192 'cached_caps': cached_cap
,
1193 'cached_subtrees': subtrees
,
1194 'balancer_enabled': len(fs
['balancer']) > 0,
1195 'num_data_pools': len(fs
['data_pools']),
1196 'standby_count_wanted': fs
['standby_count_wanted'],
1197 'approx_ctime': fs
['created'][0:7],
1202 num_mds
+= len(fs
['info'])
1203 report
['fs']['total_num_mds'] = num_mds
# type: ignore
1206 report
['metadata'] = dict(osd
=self
.gather_osd_metadata(osd_map
),
1207 mon
=self
.gather_mon_metadata(mon_map
))
1209 if self
.is_enabled_collection(Collection
.basic_mds_metadata
):
1210 report
['metadata']['mds'] = self
.gather_mds_metadata() # type: ignore
1213 servers
= self
.list_servers()
1214 self
.log
.debug('servers %s' % servers
)
1216 'num': len([h
for h
in servers
if h
['hostname']]),
1218 for t
in ['mon', 'mds', 'osd', 'mgr']:
1219 nr_services
= sum(1 for host
in servers
if
1220 any(service
for service
in cast(List
[ServiceInfoT
],
1222 if service
['type'] == t
))
1223 hosts
['num_with_' + t
] = nr_services
1224 report
['hosts'] = hosts
1227 'pools': len(df
['pools']),
1229 'total_used_bytes': df
['stats']['total_used_bytes'],
1230 'total_bytes': df
['stats']['total_bytes'],
1231 'total_avail_bytes': df
['stats']['total_avail_bytes']
1233 # basic_usage_by_class collection
1234 if self
.is_enabled_collection(Collection
.basic_usage_by_class
):
1235 report
['usage']['stats_by_class'] = {} # type: ignore
1236 for device_class
in df
['stats_by_class']:
1237 if device_class
in ['hdd', 'ssd', 'nvme']:
1238 report
['usage']['stats_by_class'][device_class
] = df
['stats_by_class'][device_class
] # type: ignore
1240 services
: DefaultDict
[str, int] = defaultdict(int)
1241 for key
, value
in service_map
['services'].items():
1249 d
= value
.get('daemons', dict())
1250 for k
, v
in d
.items():
1251 if k
== 'summary' and v
:
1253 elif isinstance(v
, dict) and 'metadata' in v
:
1255 zones
.add(v
['metadata']['zone_id'])
1256 zonegroups
.add(v
['metadata']['zonegroup_id'])
1257 frontends
.add(v
['metadata']['frontend_type#0'])
1259 # we could actually iterate over all the keys of
1260 # the dict and check for how many frontends there
1261 # are, but it is unlikely that one would be running
1262 # more than 2 supported ones
1263 f2
= v
['metadata'].get('frontend_type#1', None)
1267 rgw
['count'] = count
1268 rgw
['zones'] = len(zones
)
1269 rgw
['zonegroups'] = len(zonegroups
)
1270 rgw
['frontends'] = list(frontends
) # sets aren't json-serializable
1272 report
['services'] = services
1275 report
['balancer'] = self
.remote('balancer', 'gather_telemetry')
1277 report
['balancer'] = {
1282 self
.get_rook_data(report
)
1284 if 'crash' in channels
:
1285 report
['crashes'] = self
.gather_crashinfo()
1287 if 'perf' in channels
:
1288 if self
.is_enabled_collection(Collection
.perf_perf
):
1289 report
['perf_counters'] = self
.gather_perf_counters('separated')
1290 report
['stats_per_pool'] = self
.get_stats_per_pool()
1291 report
['stats_per_pg'] = self
.get_stats_per_pg()
1292 report
['io_rate'] = self
.get_io_rate()
1293 report
['osd_perf_histograms'] = self
.get_osd_histograms('separated')
1294 report
['mempool'] = self
.get_mempool('separated')
1295 report
['heap_stats'] = self
.get_heap_stats()
1296 report
['rocksdb_stats'] = self
.get_rocksdb_stats()
1298 # NOTE: We do not include the 'device' channel in this report; it is
1299 # sent to a different endpoint.
1303 def get_rook_data(self
, report
: Dict
[str, object]) -> None:
1304 r
, outb
, outs
= self
.mon_command({
1305 'prefix': 'config-key dump',
1311 config_kv_dump
= json
.loads(outb
)
1312 except json
.decoder
.JSONDecodeError
:
1315 for elem
in ROOK_KEYS_BY_COLLECTION
:
1316 # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
1317 # elem[1] is the Collection this key belongs to
1318 if self
.is_enabled_collection(elem
[1]):
1319 self
.add_kv_to_report(report
, elem
[0], config_kv_dump
.get(elem
[0]))
1321 def add_kv_to_report(self
, report
: Dict
[str, object], key_path
: str, value
: Any
) -> None:
1322 last_node
= key_path
.split('/')[-1]
1323 for node
in key_path
.split('/')[0:-1]:
1324 if node
not in report
:
1326 report
= report
[node
] # type: ignore
1328 # sanity check of keys correctness
1329 if not isinstance(report
, dict):
1330 self
.log
.error(f
"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
1333 if last_node
in report
:
1334 self
.log
.error(f
"'{key_path}' is an invalid key, last part must not exist at this point")
1337 report
[last_node
] = value
1339 def _try_post(self
, what
: str, url
: str, report
: Dict
[str, Dict
[str, str]]) -> Optional
[str]:
1340 self
.log
.info('Sending %s to: %s' % (what
, url
))
1343 self
.log
.info('Send using HTTP(S) proxy: %s', self
.proxy
)
1344 proxies
['http'] = self
.proxy
1345 proxies
['https'] = self
.proxy
1347 resp
= requests
.put(url
=url
, json
=report
, proxies
=proxies
)
1348 resp
.raise_for_status()
1349 except Exception as e
:
1350 fail_reason
= 'Failed to send %s to %s: %s' % (what
, url
, str(e
))
1351 self
.log
.error(fail_reason
)
1355 class EndPoint(enum
.Enum
):
1359 def collection_delta(self
, channels
: Optional
[List
[str]] = None) -> Optional
[List
[Collection
]]:
1361 Find collections that are available in the module, but are not in the db
1363 if self
.db_collection
is None:
1367 channels
= ALL_CHANNELS
1370 if ch
not in ALL_CHANNELS
:
1371 self
.log
.debug(f
"invalid channel name: {ch}")
1374 new_collection
: List
[Collection
] = []
1376 for c
in MODULE_COLLECTION
:
1377 if c
['name'].name
not in self
.db_collection
:
1378 if c
['channel'] in channels
:
1379 new_collection
.append(c
['name'])
1381 return new_collection
1383 def is_major_upgrade(self
) -> bool:
1385 Returns True only if the user last opted-in to an older major
1387 if self
.last_opted_in_ceph_version
is None or self
.last_opted_in_ceph_version
== 0:
1388 # we do not know what Ceph version was when the user last opted-in,
1389 # thus we do not wish to nag in case of a major upgrade
1392 mon_map
= self
.get('mon_map')
1393 mon_min
= mon_map
.get("min_mon_release", 0)
1395 if mon_min
- self
.last_opted_in_ceph_version
> 0:
1396 self
.log
.debug(f
"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1401 def is_opted_in(self
) -> bool:
1402 # If len is 0 it means that the user is either opted-out (never
1403 # opted-in, or invoked `telemetry off`), or they upgraded from a
1404 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1405 # regardless, hence is considered as opted-out
1406 if self
.db_collection
is None:
1408 return len(self
.db_collection
) > 0
1410 def should_nag(self
) -> bool:
1411 # Find delta between opted-in collections and module collections;
1412 # nag only if module has a collection which is not in db, and nag == True.
1414 # We currently do not nag if the user is opted-out (or never opted-in).
1415 # If we wish to do this in the future, we need to have a tri-mode state
1416 # (opted in, opted out, no action yet), and it needs to be guarded by a
1417 # config option (so that nagging can be turned off via config).
1418 # We also need to add a last_opted_out_ceph_version variable, for the
1419 # major upgrade check.
1421 # check if there are collections the user is not opt-in to
1422 # that we should nag about
1423 if self
.db_collection
is not None:
1424 for c
in MODULE_COLLECTION
:
1425 if c
['name'].name
not in self
.db_collection
:
1426 if c
['nag'] == True:
1427 self
.log
.debug(f
"The collection: {c['name']} is not reported")
1430 # user might be opted-in to the most recent collection, or there is no
1431 # new collection which requires nagging about; thus nag in case it's a
1432 # major upgrade and there are new collections
1433 # (which their own nag == False):
1434 new_collections
= False
1435 col_delta
= self
.collection_delta()
1436 if col_delta
is not None and len(col_delta
) > 0:
1437 new_collections
= True
1439 return self
.is_major_upgrade() and new_collections
1441 def init_collection(self
) -> None:
1442 # We fetch from db the collections the user had already opted-in to.
1443 # During the transition the results will be empty, but the user might
1444 # be opted-in to an older version (e.g. revision = 3)
1446 collection
= self
.get_store('collection')
1448 if collection
is not None:
1449 self
.db_collection
= json
.loads(collection
)
1451 if self
.db_collection
is None:
1452 # happens once on upgrade
1453 if not self
.enabled
:
1454 # user is not opted-in
1455 self
.set_store('collection', json
.dumps([]))
1456 self
.log
.debug("user is not opted-in")
1458 # user is opted-in, verify the revision:
1459 if self
.last_opt_revision
== REVISION
:
1460 self
.log
.debug(f
"telemetry revision is {REVISION}")
1461 base_collection
= [Collection
.basic_base
.name
, Collection
.device_base
.name
, Collection
.crash_base
.name
, Collection
.ident_base
.name
]
1462 self
.set_store('collection', json
.dumps(base_collection
))
1464 # user is opted-in to an older version, meaning they need
1465 # to re-opt in regardless
1466 self
.set_store('collection', json
.dumps([]))
1467 self
.log
.debug(f
"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1469 # reload collection after setting
1470 collection
= self
.get_store('collection')
1471 if collection
is not None:
1472 self
.db_collection
= json
.loads(collection
)
1474 raise RuntimeError('collection is None after initial setting')
1476 # user has already upgraded
1477 self
.log
.debug(f
"user has upgraded already: collection: {self.db_collection}")
1479 def is_enabled_collection(self
, collection
: Collection
) -> bool:
1480 if self
.db_collection
is None:
1482 return collection
.name
in self
.db_collection
1484 def opt_in_all_collections(self
) -> None:
1486 Opt-in to all collections; Update db with the currently available collections in the module
1488 if self
.db_collection
is None:
1489 raise RuntimeError('db_collection is None after initial setting')
1491 for c
in MODULE_COLLECTION
:
1492 if c
['name'].name
not in self
.db_collection
:
1493 self
.db_collection
.append(c
['name'])
1495 self
.set_store('collection', json
.dumps(self
.db_collection
))
1498 report
: Dict
[str, Dict
[str, str]],
1499 endpoint
: Optional
[List
[EndPoint
]] = None) -> Tuple
[int, str, str]:
1501 endpoint
= [self
.EndPoint
.ceph
, self
.EndPoint
.device
]
1504 self
.log
.debug('Send endpoints %s' % endpoint
)
1506 if e
== self
.EndPoint
.ceph
:
1507 fail_reason
= self
._try
_post
('ceph report', self
.url
, report
)
1509 failed
.append(fail_reason
)
1511 now
= int(time
.time())
1512 self
.last_upload
= now
1513 self
.set_store('last_upload', str(now
))
1514 success
.append('Ceph report sent to {0}'.format(self
.url
))
1515 self
.log
.info('Sent report to {0}'.format(self
.url
))
1516 elif e
== self
.EndPoint
.device
:
1517 if 'device' in self
.get_active_channels():
1518 devices
= self
.gather_device_report()
1522 for host
, ls
in devices
.items():
1523 self
.log
.debug('host %s devices %s' % (host
, ls
))
1526 fail_reason
= self
._try
_post
('devices', self
.device_url
,
1529 failed
.append(fail_reason
)
1534 success
.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1535 num_devs
, num_hosts
, len(devices
)))
1537 fail_reason
= 'Unable to send device report: Device channel is on, but the generated report was empty.'
1538 failed
.append(fail_reason
)
1539 self
.log
.error(fail_reason
)
1541 return 1, '', '\n'.join(success
+ failed
)
1542 return 0, '', '\n'.join(success
)
1544 def format_perf_histogram(self
, report
: Dict
[str, Any
]) -> None:
1545 # Formatting the perf histograms so they are human-readable. This will change the
1546 # ranges and values, which are currently in list form, into strings so that
1547 # they are displayed horizontally instead of vertically.
1548 if 'report' in report
:
1549 report
= report
['report']
1551 # Formatting ranges and values in osd_perf_histograms
1552 mode
= 'osd_perf_histograms'
1553 for config
in report
[mode
]:
1554 for histogram
in config
:
1555 # Adjust ranges by converting lists into strings
1556 for axis
in config
[histogram
]['axes']:
1557 for i
in range(0, len(axis
['ranges'])):
1558 axis
['ranges'][i
] = str(axis
['ranges'][i
])
1560 for osd
in config
[histogram
]['osds']:
1561 for i
in range(0, len(osd
['values'])):
1562 osd
['values'][i
] = str(osd
['values'][i
])
1564 # If the perf channel is not enabled, there should be a KeyError since
1565 # 'osd_perf_histograms' would not be present in the report. In that case,
1566 # the show function should pass as usual without trying to format the
1570 def toggle_channel(self
, action
: str, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1572 Enable or disable a list of channels
1574 if not self
.enabled
:
1575 # telemetry should be on for channels to be toggled
1576 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1577 'Preview sample reports with `ceph telemetry preview`.'
1580 if channels
is None:
1581 msg
= f
'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1584 state
= action
== 'enable'
1587 if c
not in ALL_CHANNELS
:
1588 msg
= f
"{msg}{c} is not a valid channel name. "\
1589 f
"Available channels: {ALL_CHANNELS}.\n"
1591 self
.set_module_option(f
"channel_{c}", state
)
1595 msg
= f
"{msg}channel_{c} is {action}d\n"
1599 @CLIReadCommand('telemetry status')
1600 def status(self
) -> Tuple
[int, str, str]:
1602 Show current configuration
1605 for opt
in self
.MODULE_OPTIONS
:
1606 r
[opt
['name']] = getattr(self
, opt
['name'])
1607 r
['last_upload'] = (time
.ctime(self
.last_upload
)
1608 if self
.last_upload
else self
.last_upload
)
1609 return 0, json
.dumps(r
, indent
=4, sort_keys
=True), ''
1611 @CLIReadCommand('telemetry diff')
1612 def diff(self
) -> Tuple
[int, str, str]:
1614 Show the diff between opted-in collection and available collection
1619 for c
in MODULE_COLLECTION
:
1620 if not self
.is_enabled_collection(c
['name']):
1621 diff
.append({key
: val
for key
, val
in c
.items() if key
not in keys
})
1625 r
= "Telemetry is up to date"
1627 r
= json
.dumps(diff
, indent
=4, sort_keys
=True)
1631 @CLICommand('telemetry on')
1632 def on(self
, license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1634 Enable telemetry reports from this cluster
1636 if license
!= LICENSE
:
1637 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1638 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1640 self
.set_module_option('enabled', True)
1642 self
.opt_in_all_collections()
1644 # for major releases upgrade nagging
1645 mon_map
= self
.get('mon_map')
1646 mon_min
= mon_map
.get("min_mon_release", 0)
1647 self
.set_store('last_opted_in_ceph_version', str(mon_min
))
1648 self
.last_opted_in_ceph_version
= mon_min
1650 msg
= 'Telemetry is on.'
1651 disabled_channels
= ''
1652 active_channels
= self
.get_active_channels()
1653 for c
in ALL_CHANNELS
:
1654 if c
not in active_channels
and c
!= 'ident':
1655 disabled_channels
= f
"{disabled_channels} {c}"
1657 if len(disabled_channels
) > 0:
1658 msg
= f
"{msg}\nSome channels are disabled, please enable with:\n"\
1659 f
"`ceph telemetry enable channel{disabled_channels}`"
1661 # wake up serve() to reset health warning
1666 @CLICommand('telemetry off')
1667 def off(self
) -> Tuple
[int, str, str]:
1669 Disable telemetry reports from this cluster
1671 if not self
.enabled
:
1672 # telemetry is already off
1673 msg
= 'Telemetry is currently not enabled, nothing to turn off. '\
1674 'Please consider opting-in with `ceph telemetry on`.\n' \
1675 'Preview sample reports with `ceph telemetry preview`.'
1678 self
.set_module_option('enabled', False)
1679 self
.enabled
= False
1680 self
.set_store('collection', json
.dumps([]))
1681 self
.db_collection
= []
1683 # we might need this info in the future, in case
1684 # of nagging when user is opted-out
1685 mon_map
= self
.get('mon_map')
1686 mon_min
= mon_map
.get("min_mon_release", 0)
1687 self
.set_store('last_opted_out_ceph_version', str(mon_min
))
1688 self
.last_opted_out_ceph_version
= mon_min
1690 msg
= 'Telemetry is now disabled.'
1693 @CLIReadCommand('telemetry enable channel all')
1694 def enable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1698 return self
.toggle_channel('enable', channels
)
1700 @CLIReadCommand('telemetry enable channel')
1701 def enable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1703 Enable a list of channels
1705 return self
.toggle_channel('enable', channels
)
1707 @CLIReadCommand('telemetry disable channel all')
1708 def disable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1710 Disable all channels
1712 return self
.toggle_channel('disable', channels
)
1714 @CLIReadCommand('telemetry disable channel')
1715 def disable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1717 Disable a list of channels
1719 return self
.toggle_channel('disable', channels
)
1721 @CLIReadCommand('telemetry channel ls')
1722 def channel_ls(self
) -> Tuple
[int, str, str]:
1726 table
= PrettyTable(
1728 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1731 table
.align
['NAME'] = 'l'
1732 table
.align
['ENABLED'] = 'l'
1733 table
.align
['DEFAULT'] = 'l'
1734 table
.align
['DESC'] = 'l'
1735 table
.left_padding_width
= 0
1736 table
.right_padding_width
= 4
1738 for c
in ALL_CHANNELS
:
1739 enabled
= "ON" if getattr(self
, f
"channel_{c}") else "OFF"
1740 for o
in self
.MODULE_OPTIONS
:
1741 if o
['name'] == f
"channel_{c}":
1742 default
= "ON" if o
.get('default', None) else "OFF"
1743 desc
= o
.get('desc', None)
1752 return 0, table
.get_string(sortby
="NAME"), ''
1754 @CLIReadCommand('telemetry collection ls')
1755 def collection_ls(self
) -> Tuple
[int, str, str]:
1757 List all collections
1759 col_delta
= self
.collection_delta()
1761 if col_delta
is not None and len(col_delta
) > 0:
1762 msg
= f
"New collections are available:\n" \
1763 f
"{sorted([c.name for c in col_delta])}\n" \
1764 f
"Run `ceph telemetry on` to opt-in to these collections.\n"
1766 table
= PrettyTable(
1768 'NAME', 'STATUS', 'DESC',
1771 table
.align
['NAME'] = 'l'
1772 table
.align
['STATUS'] = 'l'
1773 table
.align
['DESC'] = 'l'
1774 table
.left_padding_width
= 0
1775 table
.right_padding_width
= 4
1777 for c
in MODULE_COLLECTION
:
1779 opted_in
= self
.is_enabled_collection(name
)
1780 channel_enabled
= getattr(self
, f
"channel_{c['channel']}")
1783 if channel_enabled
and opted_in
:
1784 status
= "REPORTING"
1790 why
+= "NOT OPTED-IN"
1792 if not channel_enabled
:
1793 why
+= f
"{delimiter}CHANNEL {c['channel']} IS OFF"
1795 status
= f
"NOT REPORTING: {why}"
1797 desc
= c
['description']
1806 # add a new line between message and table output
1809 return 0, f
'{msg}{table.get_string(sortby="NAME")}', ''
1811 @CLICommand('telemetry send')
1813 endpoint
: Optional
[List
[EndPoint
]] = None,
1814 license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1816 Send a sample report
1818 if not self
.is_opted_in() and license
!= LICENSE
:
1819 self
.log
.debug(('A telemetry send attempt while opted-out. '
1820 'Asking for license agreement'))
1821 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1822 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1823 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1825 self
.last_report
= self
.compile_report()
1826 return self
.send(self
.last_report
, endpoint
)
1828 @CLIReadCommand('telemetry show')
1829 def show(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1831 Show a sample report of opted-in collections (except for 'device')
1833 if not self
.enabled
:
1834 # if telemetry is off, no report is being sent, hence nothing to show
1835 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1836 'Preview sample reports with `ceph telemetry preview`.'
1839 report
= self
.get_report_locked(channels
=channels
)
1840 self
.format_perf_histogram(report
)
1841 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1843 if self
.channel_device
:
1844 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1846 return 0, report
, ''
1848 @CLIReadCommand('telemetry preview')
1849 def preview(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1851 Preview a sample report of the most recent collections available (except for 'device')
1855 # We use a lock to prevent a scenario where the user wishes to preview
1856 # the report, and at the same time the module hits the interval of
1857 # sending a report with the opted-in collection, which has less data
1858 # than in the preview report.
1859 col_delta
= self
.collection_delta()
1860 with self
.get_report_lock
:
1861 if col_delta
is not None and len(col_delta
) == 0:
1862 # user is already opted-in to the most recent collection
1863 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1866 # there are collections the user is not opted-in to
1867 next_collection
= []
1869 for c
in MODULE_COLLECTION
:
1870 next_collection
.append(c
['name'].name
)
1872 opted_in_collection
= self
.db_collection
1873 self
.db_collection
= next_collection
1874 report
= self
.get_report(channels
=channels
)
1875 self
.db_collection
= opted_in_collection
1877 self
.format_perf_histogram(report
)
1878 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1880 if self
.channel_device
:
1881 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1883 return 0, report
, ''
1885 @CLIReadCommand('telemetry show-device')
1886 def show_device(self
) -> Tuple
[int, str, str]:
1888 Show a sample device report
1890 if not self
.enabled
:
1891 # if telemetry is off, no report is being sent, hence nothing to show
1892 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1893 'Preview sample device reports with `ceph telemetry preview-device`.'
1896 if not self
.channel_device
:
1897 # if device channel is off, device report is not being sent, hence nothing to show
1898 msg
= 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1899 'Preview sample device reports with `ceph telemetry preview-device`.'
1902 return 0, json
.dumps(self
.get_report_locked('device'), indent
=4, sort_keys
=True), ''
1904 @CLIReadCommand('telemetry preview-device')
1905 def preview_device(self
) -> Tuple
[int, str, str]:
1907 Preview a sample device report of the most recent device collection
1911 device_col_delta
= self
.collection_delta(['device'])
1912 with self
.get_report_lock
:
1913 if device_col_delta
is not None and len(device_col_delta
) == 0 and self
.channel_device
:
1914 # user is already opted-in to the most recent device collection,
1915 # and device channel is on, thus `show-device` should be called
1916 msg
= 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1919 # either the user is not opted-in at all, or there are collections
1920 # they are not opted-in to
1921 next_collection
= []
1923 for c
in MODULE_COLLECTION
:
1924 next_collection
.append(c
['name'].name
)
1926 opted_in_collection
= self
.db_collection
1927 self
.db_collection
= next_collection
1928 report
= self
.get_report('device')
1929 self
.db_collection
= opted_in_collection
1931 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1932 return 0, report
, ''
1934 @CLIReadCommand('telemetry show-all')
1935 def show_all(self
) -> Tuple
[int, str, str]:
1937 Show a sample report of all enabled channels (including 'device' channel)
1939 if not self
.enabled
:
1940 # if telemetry is off, no report is being sent, hence nothing to show
1941 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1942 'Preview sample reports with `ceph telemetry preview`.'
1945 if not self
.channel_device
:
1946 # device channel is off, no need to display its report
1947 report
= self
.get_report_locked('default')
1949 # telemetry is on and device channel is enabled, show both
1950 report
= self
.get_report_locked('all')
1952 self
.format_perf_histogram(report
)
1953 return 0, json
.dumps(report
, indent
=4, sort_keys
=True), ''
1955 @CLIReadCommand('telemetry preview-all')
1956 def preview_all(self
) -> Tuple
[int, str, str]:
1958 Preview a sample report of the most recent collections available of all channels (including 'device')
1962 col_delta
= self
.collection_delta()
1963 with self
.get_report_lock
:
1964 if col_delta
is not None and len(col_delta
) == 0:
1965 # user is already opted-in to the most recent collection
1966 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1969 # there are collections the user is not opted-in to
1970 next_collection
= []
1972 for c
in MODULE_COLLECTION
:
1973 next_collection
.append(c
['name'].name
)
1975 opted_in_collection
= self
.db_collection
1976 self
.db_collection
= next_collection
1977 report
= self
.get_report('all')
1978 self
.db_collection
= opted_in_collection
1980 self
.format_perf_histogram(report
)
1981 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1983 return 0, report
, ''
1985 def get_report_locked(self
,
1986 report_type
: str = 'default',
1987 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1989 A wrapper around get_report to allow for compiling a report of the most recent module collections
1991 with self
.get_report_lock
:
1992 return self
.get_report(report_type
, channels
)
1994 def get_report(self
,
1995 report_type
: str = 'default',
1996 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1997 if report_type
== 'default':
1998 return self
.compile_report(channels
=channels
)
1999 elif report_type
== 'device':
2000 return self
.gather_device_report()
2001 elif report_type
== 'all':
2002 return {'report': self
.compile_report(channels
=channels
),
2003 'device_report': self
.gather_device_report()}
2006 def self_test(self
) -> None:
2007 self
.opt_in_all_collections()
2008 report
= self
.compile_report(channels
=ALL_CHANNELS
)
2009 if len(report
) == 0:
2010 raise RuntimeError('Report is empty')
2012 if 'report_id' not in report
:
2013 raise RuntimeError('report_id not found in report')
2015 def shutdown(self
) -> None:
2019 def refresh_health_checks(self
) -> None:
2021 # TODO do we want to nag also in case the user is not opted-in?
2022 if self
.enabled
and self
.should_nag():
2023 health_checks
['TELEMETRY_CHANGED'] = {
2024 'severity': 'warning',
2025 'summary': 'Telemetry requires re-opt-in',
2027 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
2030 self
.set_health_checks(health_checks
)
2032 def serve(self
) -> None:
2036 self
.log
.debug('Waiting for mgr to warm up')
2042 self
.refresh_health_checks()
2044 if not self
.is_opted_in():
2045 self
.log
.debug('Not sending report until user re-opts-in')
2046 self
.event
.wait(1800)
2048 if not self
.enabled
:
2049 self
.log
.debug('Not sending report until configured to do so')
2050 self
.event
.wait(1800)
2053 now
= int(time
.time())
2054 if not self
.last_upload
or \
2055 (now
- self
.last_upload
) > self
.interval
* 3600:
2056 self
.log
.info('Compiling and sending report to %s',
2060 self
.last_report
= self
.compile_report()
2062 self
.log
.exception('Exception while compiling report:')
2064 self
.send(self
.last_report
)
2066 self
.log
.debug('Interval for sending new report has not expired')
2069 self
.log
.debug('Sleeping for %d seconds', sleep
)
2070 self
.event
.wait(sleep
)
2073 def can_run() -> Tuple
[bool, str]: