]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/telemetry/module.py
import ceph quincy 17.2.5
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
index e986bbb42347c11973e98f0d8c752bb746da94ce..f3d49055a5ee3f0206cbd0931753951169a4c5f1 100644 (file)
@@ -4,148 +4,329 @@ Telemetry module for ceph-mgr
 Collect statistics from Ceph cluster and send this back to the Ceph project
 when user has opted-in
 """
+import logging
+import numbers
+import enum
 import errno
+import hashlib
 import json
-import re
+import rbd
 import requests
 import uuid
 import time
-from datetime import datetime
-from threading import Event
+from datetime import datetime, timedelta
+from prettytable import PrettyTable
+from threading import Event, Lock
 from collections import defaultdict
+from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
 
-from mgr_module import MgrModule
+from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
 
 
-class Module(MgrModule):
-    config = dict()
+ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
+
+LICENSE = 'sharing-1-0'
+LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
+LICENSE_URL = 'https://cdla.io/sharing-1-0/'
+NO_SALT_CNT = 0
+
+# Latest revision of the telemetry report.  Bump this each time we make
+# *any* change.
+REVISION = 3
+
+# History of revisions
+# --------------------
+#
+# Version 1:
+#   Mimic and/or nautilus are lumped together here, since
+#   we didn't track revisions yet.
+#
+# Version 2:
+#   - added revision tracking, nagging, etc.
+#   - added config option changes
+#   - added channels
+#   - added explicit license acknowledgement to the opt-in process
+#
+# Version 3:
+#   - added device health metrics (i.e., SMART data, minus serial number)
+#   - remove crush_rule
+#   - added CephFS metadata (how many MDSs, fs features, how many data pools,
+#     how much metadata is cached, rfiles, rbytes, rsnapshots)
+#   - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
+#   - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
+#   - whether an OSD cluster network is in use
+#   - rbd pool and image count, and rbd mirror mode (pool-level)
+#   - rgw daemons, zones, zonegroups; which rgw frontends
+#   - crush map stats
+
+class Collection(str, enum.Enum):
+    basic_base = 'basic_base'
+    device_base = 'device_base'
+    crash_base = 'crash_base'
+    ident_base = 'ident_base'
+    perf_perf = 'perf_perf'
+    basic_mds_metadata = 'basic_mds_metadata'
+    basic_pool_usage = 'basic_pool_usage'
+    basic_usage_by_class = 'basic_usage_by_class'
+    basic_rook_v01 = 'basic_rook_v01'
+    perf_memory_metrics = 'perf_memory_metrics'
 
+MODULE_COLLECTION : List[Dict] = [
+    {
+        "name": Collection.basic_base,
+        "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
+        "channel": "basic",
+        "nag": False
+    },
+    {
+        "name": Collection.device_base,
+        "description": "Information about device health metrics",
+        "channel": "device",
+        "nag": False
+    },
+    {
+        "name": Collection.crash_base,
+        "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
+        "channel": "crash",
+        "nag": False
+    },
+    {
+        "name": Collection.ident_base,
+        "description": "User-provided identifying information about the cluster",
+        "channel": "ident",
+        "nag": False
+    },
+    {
+        "name": Collection.perf_perf,
+        "description": "Information about performance counters of the cluster",
+        "channel": "perf",
+        "nag": True
+    },
+    {
+        "name": Collection.basic_mds_metadata,
+        "description": "MDS metadata",
+        "channel": "basic",
+        "nag": False
+    },
+    {
+        "name": Collection.basic_pool_usage,
+        "description": "Default pool application and usage statistics",
+        "channel": "basic",
+        "nag": False
+    },
+    {
+        "name": Collection.basic_usage_by_class,
+        "description": "Default device class usage statistics",
+        "channel": "basic",
+        "nag": False
+    },
+    {
+        "name": Collection.basic_rook_v01,
+        "description": "Basic Rook deployment data",
+        "channel": "basic",
+        "nag": True
+    },
+    {
+        "name": Collection.perf_memory_metrics,
+        "description": "Heap stats and mempools for mon and mds",
+        "channel": "perf",
+        "nag": False
+    },
+]
+
+ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
+        # Note: a key cannot be both a node and a leaf, e.g.
+        # "rook/a/b"
+        # "rook/a/b/c"
+        ("rook/version", Collection.basic_rook_v01),
+        ("rook/kubernetes/version", Collection.basic_rook_v01),
+        ("rook/csi/version", Collection.basic_rook_v01),
+        ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
+        ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
+        ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
+        ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
+        ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
+        ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
+        ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
+        ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
+        ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
+        ("rook/cluster/mon/count", Collection.basic_rook_v01),
+        ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
+        ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
+        ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
+        ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
+        ("rook/cluster/network/provider", Collection.basic_rook_v01),
+        ("rook/cluster/external-mode", Collection.basic_rook_v01),
+]
+
+class Module(MgrModule):
     metadata_keys = [
-            "arch",
-            "ceph_version",
-            "os",
-            "cpu",
-            "kernel_description",
-            "kernel_version",
-            "distro_description",
-            "distro"
+        "arch",
+        "ceph_version",
+        "os",
+        "cpu",
+        "kernel_description",
+        "kernel_version",
+        "distro_description",
+        "distro"
     ]
 
     MODULE_OPTIONS = [
-        {
-            'name': 'url',
-            'type': 'str',
-            'default': 'https://telemetry.ceph.com/report'
-        },
-        {
-            'name': 'enabled',
-            'type': 'bool',
-            'default': False
-        },
-        {
-            'name': 'leaderboard',
-            'type': 'bool',
-            'default': False
-        },
-        {
-            'name': 'description',
-            'type': 'str',
-            'default': None
-        },
-        {
-            'name': 'contact',
-            'type': 'str',
-            'default': None
-        },
-        {
-            'name': 'organization',
-            'type': 'str',
-            'default': None
-        },
-        {
-            'name': 'proxy',
-            'type': 'str',
-            'default': None
-        },
-        {
-            'name': 'interval',
-            'type': 'int',
-            'default': 24,
-            'min': 8
-        }
-    ]
-
-    COMMANDS = [
-        {
-            "cmd": "telemetry status",
-            "desc": "Show current configuration",
-            "perm": "r"
-        },
-        {
-            "cmd": "telemetry send",
-            "desc": "Force sending data to Ceph telemetry",
-            "perm": "rw"
-        },
-        {
-            "cmd": "telemetry show",
-            "desc": "Show last report or report to be sent",
-            "perm": "r"
-        },
-        {
-            "cmd": "telemetry on",
-            "desc": "Enable telemetry reports from this cluster",
-            "perm": "rw",
-        },
-        {
-            "cmd": "telemetry off",
-            "desc": "Disable telemetry reports from this cluster",
-            "perm": "rw",
-        },
+        Option(name='url',
+               type='str',
+               default='https://telemetry.ceph.com/report'),
+        Option(name='device_url',
+               type='str',
+               default='https://telemetry.ceph.com/device'),
+        Option(name='enabled',
+               type='bool',
+               default=False),
+        Option(name='last_opt_revision',
+               type='int',
+               default=1),
+        Option(name='leaderboard',
+               type='bool',
+               default=False),
+        Option(name='description',
+               type='str',
+               default=None),
+        Option(name='contact',
+               type='str',
+               default=None),
+        Option(name='organization',
+               type='str',
+               default=None),
+        Option(name='proxy',
+               type='str',
+               default=None),
+        Option(name='interval',
+               type='int',
+               default=24,
+               min=8),
+        Option(name='channel_basic',
+               type='bool',
+               default=True,
+               desc='Share basic cluster information (size, version)'),
+        Option(name='channel_ident',
+               type='bool',
+               default=False,
+               desc='Share a user-provided description and/or contact email for the cluster'),
+        Option(name='channel_crash',
+               type='bool',
+               default=True,
+               desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
+        Option(name='channel_device',
+               type='bool',
+               default=True,
+               desc=('Share device health metrics '
+                     '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
+        Option(name='channel_perf',
+               type='bool',
+               default=False,
+               desc='Share various performance metrics of a cluster'),
     ]
 
     @property
-    def config_keys(self):
+    def config_keys(self) -> Dict[str, OptionValue]:
         return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(Module, self).__init__(*args, **kwargs)
         self.event = Event()
         self.run = False
-        self.last_upload = None
-        self.last_report = dict()
-        self.report_id = None
+        self.db_collection: Optional[List[str]] = None
+        self.last_opted_in_ceph_version: Optional[int] = None
+        self.last_opted_out_ceph_version: Optional[int] = None
+        self.last_upload: Optional[int] = None
+        self.last_report: Dict[str, Any] = dict()
+        self.report_id: Optional[str] = None
+        self.salt: Optional[str] = None
+        self.get_report_lock = Lock()
+        self.config_update_module_option()
+        # for mypy which does not run the code
+        if TYPE_CHECKING:
+            self.url = ''
+            self.device_url = ''
+            self.enabled = False
+            self.last_opt_revision = 0
+            self.leaderboard = ''
+            self.interval = 0
+            self.proxy = ''
+            self.channel_basic = True
+            self.channel_ident = False
+            self.channel_crash = True
+            self.channel_device = True
+            self.channel_perf = False
+            self.db_collection = ['basic_base', 'device_base']
+            self.last_opted_in_ceph_version = 17
+            self.last_opted_out_ceph_version = 0
 
-    def config_notify(self):
+    def config_update_module_option(self) -> None:
         for opt in self.MODULE_OPTIONS:
             setattr(self,
                     opt['name'],
                     self.get_module_option(opt['name']))
             self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
 
-    @staticmethod
-    def parse_timestamp(timestamp):
-        return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
+    def config_notify(self) -> None:
+        self.config_update_module_option()
+        # wake up serve() thread
+        self.event.set()
 
-    def load(self):
-        self.last_upload = self.get_store('last_upload', None)
-        if self.last_upload is not None:
-            self.last_upload = int(self.last_upload)
+    def load(self) -> None:
+        last_upload = self.get_store('last_upload', None)
+        if last_upload is None:
+            self.last_upload = None
+        else:
+            self.last_upload = int(last_upload)
 
-        self.report_id = self.get_store('report_id', None)
-        if self.report_id is None:
+        report_id = self.get_store('report_id', None)
+        if report_id is None:
             self.report_id = str(uuid.uuid4())
             self.set_store('report_id', self.report_id)
+        else:
+            self.report_id = report_id
+
+        salt = self.get_store('salt', None)
+        if salt is None:
+            self.salt = str(uuid.uuid4())
+            self.set_store('salt', self.salt)
+        else:
+            self.salt = salt
+
+        self.init_collection()
 
-    def gather_osd_metadata(self, osd_map):
+        last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
+        if last_opted_in_ceph_version is None:
+            self.last_opted_in_ceph_version = None
+        else:
+            self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
+
+        last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
+        if last_opted_out_ceph_version is None:
+            self.last_opted_out_ceph_version = None
+        else:
+            self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
+
+    def gather_osd_metadata(self,
+                            osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
         keys = ["osd_objectstore", "rotational"]
         keys += self.metadata_keys
 
-        metadata = dict()
+        metadata: Dict[str, Dict[str, int]] = dict()
         for key in keys:
             metadata[key] = defaultdict(int)
 
         for osd in osd_map['osds']:
-            for k, v in self.get_metadata('osd', str(osd['osd'])).items():
+            res = self.get_metadata('osd', str(osd['osd']))
+            if res is None:
+                self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
+                continue
+            for k, v in res.items():
                 if k not in keys:
                     continue
 
@@ -153,16 +334,21 @@ class Module(MgrModule):
 
         return metadata
 
-    def gather_mon_metadata(self, mon_map):
+    def gather_mon_metadata(self,
+                            mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
         keys = list()
         keys += self.metadata_keys
 
-        metadata = dict()
+        metadata: Dict[str, Dict[str, int]] = dict()
         for key in keys:
             metadata[key] = defaultdict(int)
 
         for mon in mon_map['mons']:
-            for k, v in self.get_metadata('mon', mon['name']).items():
+            res = self.get_metadata('mon', mon['name'])
+            if res is None:
+                self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
+                continue
+            for k, v in res.items():
                 if k not in keys:
                     continue
 
@@ -170,142 +356,1627 @@ class Module(MgrModule):
 
         return metadata
 
-    def gather_crashinfo(self):
-        crashlist = list()
-        errno, crashids, err = self.remote('crash', 'do_ls', '', '')
+    def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
+        metadata: Dict[str, Dict[str, int]] = dict()
+
+        res = self.get('mds_metadata')  # metadata of *all* mds daemons
+        if res is None or not res:
+            self.log.debug('Could not get metadata for mds daemons')
+            return metadata
+
+        keys = list()
+        keys += self.metadata_keys
+
+        for key in keys:
+            metadata[key] = defaultdict(int)
+
+        for mds in res.values():
+            for k, v in mds.items():
+                if k not in keys:
+                    continue
+
+                metadata[k][v] += 1
+
+        return metadata
+
+    def gather_crush_info(self) -> Dict[str, Union[int,
+                                                   bool,
+                                                   List[int],
+                                                   Dict[str, int],
+                                                   Dict[int, int]]]:
+        osdmap = self.get_osdmap()
+        crush_raw = osdmap.get_crush()
+        crush = crush_raw.dump()
+
+        BucketKeyT = TypeVar('BucketKeyT', int, str)
+
+        def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
+            if k in d:
+                d[k] += 1
+            else:
+                d[k] = 1
+
+        device_classes: Dict[str, int] = {}
+        for dev in crush['devices']:
+            inc(device_classes, dev.get('class', ''))
+
+        bucket_algs: Dict[str, int] = {}
+        bucket_types: Dict[str, int] = {}
+        bucket_sizes: Dict[int, int] = {}
+        for bucket in crush['buckets']:
+            if '~' in bucket['name']:  # ignore shadow buckets
+                continue
+            inc(bucket_algs, bucket['alg'])
+            inc(bucket_types, bucket['type_id'])
+            inc(bucket_sizes, len(bucket['items']))
+
+        return {
+            'num_devices': len(crush['devices']),
+            'num_types': len(crush['types']),
+            'num_buckets': len(crush['buckets']),
+            'num_rules': len(crush['rules']),
+            'device_classes': list(device_classes.values()),
+            'tunables': crush['tunables'],
+            'compat_weight_set': '-1' in crush['choose_args'],
+            'num_weight_sets': len(crush['choose_args']),
+            'bucket_algs': bucket_algs,
+            'bucket_sizes': bucket_sizes,
+            'bucket_types': bucket_types,
+        }
+
+    def gather_configs(self) -> Dict[str, List[str]]:
+        # cluster config options
+        cluster = set()
+        r, outb, outs = self.mon_command({
+            'prefix': 'config dump',
+            'format': 'json'
+        })
+        if r != 0:
+            return {}
+        try:
+            dump = json.loads(outb)
+        except json.decoder.JSONDecodeError:
+            return {}
+        for opt in dump:
+            name = opt.get('name')
+            if name:
+                cluster.add(name)
+        # daemon-reported options (which may include ceph.conf)
+        active = set()
+        ls = self.get("modified_config_options")
+        for opt in ls.get('options', {}):
+            active.add(opt)
+        return {
+            'cluster_changed': sorted(list(cluster)),
+            'active_changed': sorted(list(active)),
+        }
+
+    def anonymize_entity_name(self, entity_name:str) -> str:
+        if '.' not in entity_name:
+            self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
+            return entity_name
+
+        (etype, eid) = entity_name.split('.', 1)
+        m = hashlib.sha1()
+        salt = ''
+        if self.salt is not None:
+            salt = self.salt
+        # avoid asserting that salt exists
+        if not self.salt:
+            # do not set self.salt to a temp value
+            salt = f"no_salt_found_{NO_SALT_CNT}"
+            NO_SALT_CNT += 1
+            self.log.debug(f"No salt found, created a temp one: {salt}")
+        m.update(salt.encode('utf-8'))
+        m.update(eid.encode('utf-8'))
+        m.update(salt.encode('utf-8'))
+
+        return  etype + '.' + m.hexdigest()
+
+    def get_heap_stats(self) -> Dict[str, dict]:
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
+
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (1/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
+
+        # Grab output from the "daemon.x heap stats" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.', 1)
+            heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+            if heap_stats:
+                if (daemon_type != 'osd'):
+                    # Anonymize mon and mds
+                    anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                    daemon = anonymized_daemons[daemon]
+                result[daemon_type][daemon] = heap_stats
+            else:
+                continue
+
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
+        return result
+
+    def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+        parsed_output = {}
+
+        cmd_dict = {
+            'prefix': 'heap',
+            'heapcmd': 'stats'
+        }
+        r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+        if r != 0:
+            self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+        else:
+            if 'tcmalloc heap stats' in outb:
+                values = [int(i) for i in outb.split() if i.isdigit()]
+                # `categories` must be ordered this way for the correct output to be parsed
+                categories = ['use_by_application',
+                              'page_heap_freelist',
+                              'central_cache_freelist',
+                              'transfer_cache_freelist',
+                              'thread_cache_freelists',
+                              'malloc_metadata',
+                              'actual_memory_used',
+                              'released_to_os',
+                              'virtual_address_space_used',
+                              'spans_in_use',
+                              'thread_heaps_in_use',
+                              'tcmalloc_page_size']
+                if len(values) != len(categories):
+                    self.log.error('Received unexpected output from {}.{}; ' \
+                                   'number of values should match the number' \
+                                   'of expected categories:\n values: len={} {} '\
+                                   '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+                else:
+                    parsed_output = dict(zip(categories, values))
+            else:
+                self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+        
+        return parsed_output
+
+    def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
+
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (2/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
+
+        # Grab output from the "dump_mempools" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.', 1)
+            cmd_dict = {
+                'prefix': 'dump_mempools',
+                'format': 'json'
+            }
+            r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
+            if r != 0:
+                self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+                continue
+            else:
+                try:
+                    # This is where the mempool will land.
+                    dump = json.loads(outb)
+                    if mode == 'separated':
+                        # Anonymize mon and mds
+                        if daemon_type != 'osd':
+                            anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                            daemon = anonymized_daemons[daemon]
+                        result[daemon_type][daemon] = dump['mempool']['by_pool']
+                    elif mode == 'aggregated':
+                        for mem_type in dump['mempool']['by_pool']:
+                            result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+                            result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+                    else:
+                        self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
+                except (json.decoder.JSONDecodeError, KeyError) as e:
+                    self.log.error("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
+                    continue
+
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
+        return result
+
+    def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
+        # Initialize result dict
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(
+                                              lambda: defaultdict(
+                                              lambda: defaultdict(
+                                              lambda: defaultdict(
+                                              lambda: defaultdict(int))))))
+
+        # Get list of osd ids from the metadata
+        osd_metadata = self.get('osd_metadata')
+
+        # Grab output from the "osd.x perf histogram dump" command
+        for osd_id in osd_metadata:
+            cmd_dict = {
+                'prefix': 'perf histogram dump',
+                'id': str(osd_id),
+                'format': 'json'
+            }
+            r, outb, outs = self.osd_command(cmd_dict)
+            # Check for invalid calls
+            if r != 0:
+                self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+                continue
+            else:
+                try:
+                    # This is where the histograms will land if there are any.
+                    dump = json.loads(outb)
+
+                    for histogram in dump['osd']:
+                        # Log axis information. There are two axes, each represented
+                        # as a dictionary. Both dictionaries are contained inside a
+                        # list called 'axes'.
+                        axes = []
+                        for axis in dump['osd'][histogram]['axes']:
+
+                            # This is the dict that contains information for an individual
+                            # axis. It will be appended to the 'axes' list at the end.
+                            axis_dict: Dict[str, Any] = defaultdict()
+
+                            # Collecting information for buckets, min, name, etc.
+                            axis_dict['buckets'] = axis['buckets']
+                            axis_dict['min'] = axis['min']
+                            axis_dict['name'] = axis['name']
+                            axis_dict['quant_size'] = axis['quant_size']
+                            axis_dict['scale_type'] = axis['scale_type']
+
+                            # Collecting ranges; placing them in lists to
+                            # improve readability later on.
+                            ranges = []
+                            for _range in axis['ranges']:
+                                _max, _min = None, None
+                                if 'max' in _range:
+                                    _max = _range['max']
+                                if 'min' in _range:
+                                    _min = _range['min']
+                                ranges.append([_min, _max])
+                            axis_dict['ranges'] = ranges
+
+                            # Now that 'axis_dict' contains all the appropriate
+                            # information for the current axis, append it to the 'axes' list.
+                            # There will end up being two axes in the 'axes' list, since the
+                            # histograms are 2D.
+                            axes.append(axis_dict)
+
+                        # Add the 'axes' list, containing both axes, to result.
+                        # At this point, you will see that the name of the key is the string
+                        # form of our axes list (str(axes)). This is there so that histograms
+                        # with different axis configs will not be combined.
+                        # These key names are later dropped when only the values are returned.
+                        result[str(axes)][histogram]['axes'] = axes
+
+                        # Collect current values and make sure they are in
+                        # integer form.
+                        values = []
+                        for value_list in dump['osd'][histogram]['values']:
+                            values.append([int(v) for v in value_list])
+
+                        if mode == 'separated':
+                            if 'osds' not in result[str(axes)][histogram]:
+                                result[str(axes)][histogram]['osds'] = []
+                            result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
+
+                        elif mode == 'aggregated':
+                            # Aggregate values. If 'values' have already been initialized,
+                            # we can safely add.
+                            if 'values' in result[str(axes)][histogram]:
+                                for i in range (0, len(values)):
+                                    for j in range (0, len(values[i])):
+                                        values[i][j] += result[str(axes)][histogram]['values'][i][j]
+
+                            # Add the values to result.
+                            result[str(axes)][histogram]['values'] = values
+
+                            # Update num_combined_osds
+                            if 'num_combined_osds' not in result[str(axes)][histogram]:
+                                result[str(axes)][histogram]['num_combined_osds'] = 1
+                            else:
+                                result[str(axes)][histogram]['num_combined_osds'] += 1
+                        else:
+                            self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
+                            return list()
+
+                # Sometimes, json errors occur if you give it an empty string.
+                # I am also putting in a catch for a KeyError since it could
+                # happen where the code is assuming that a key exists in the
+                # schema when it doesn't. In either case, we'll handle that
+                # by continuing and collecting what we can from other osds.
+                except (json.decoder.JSONDecodeError, KeyError) as e:
+                    self.log.error("Error caught on osd.{}: {}".format(osd_id, e))
+                    continue
+
+        return list(result.values())
+
+    def get_io_rate(self) -> dict:
+        return self.get('io_rate')
+
+    def get_stats_per_pool(self) -> dict:
+        result = self.get('pg_dump')['pool_stats']
+
+        # collect application metadata from osd_map
+        osd_map = self.get('osd_map')
+        application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
+
+        # add application to each pool from pg_dump
+        for pool in result:
+            pool['application'] = []
+            # Only include default applications
+            for application in application_metadata[pool['poolid']]:
+                if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
+                    pool['application'].append(application)
+
+        return result
+
+    def get_stats_per_pg(self) -> dict:
+        return self.get('pg_dump')['pg_stats']
+
+    def get_rocksdb_stats(self) -> Dict[str, str]:
+        # Initalizers
+        result: Dict[str, str] = defaultdict()
+        version = self.get_rocksdb_version()
+
+        # Update result
+        result['version'] = version
+
+        return result
+
+    def gather_crashinfo(self) -> List[Dict[str, str]]:
+        crashlist: List[Dict[str, str]] = list()
+        errno, crashids, err = self.remote('crash', 'ls')
         if errno:
-            return ''
+            return crashlist
         for crashid in crashids.split():
-            cmd = {'id': crashid}
-            errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
+            errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
             if errno:
                 continue
             c = json.loads(crashinfo)
+
+            # redact hostname
             del c['utsname_hostname']
+
+            # entity_name might have more than one '.', beware
+            (etype, eid) = c.get('entity_name', '').split('.', 1)
+            m = hashlib.sha1()
+            assert self.salt
+            m.update(self.salt.encode('utf-8'))
+            m.update(eid.encode('utf-8'))
+            m.update(self.salt.encode('utf-8'))
+            c['entity_name'] = etype + '.' + m.hexdigest()
+
+            # redact final line of python tracebacks, as the exception
+            # payload may contain identifying information
+            if 'mgr_module' in c and 'backtrace' in c:
+                # backtrace might be empty
+                if len(c['backtrace']) > 0:
+                    c['backtrace'][-1] = '<redacted>'
+
             crashlist.append(c)
         return crashlist
 
-    def compile_report(self):
+    def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
+        # Extract perf counter data with get_all_perf_counters(), a method
+        # from mgr/mgr_module.py. This method returns a nested dictionary that
+        # looks a lot like perf schema, except with some additional fields.
+        #
+        # Example of output, a snapshot of a mon daemon:
+        #   "mon.b": {
+        #       "bluestore.kv_flush_lat": {
+        #           "count": 2431,
+        #           "description": "Average kv_thread flush latency",
+        #           "nick": "fl_l",
+        #           "priority": 8,
+        #           "type": 5,
+        #           "units": 1,
+        #           "value": 88814109
+        #       },
+        #   },
+        all_perf_counters = self.get_all_perf_counters()
+
+        # Initialize 'result' dict
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(
+            lambda: defaultdict(lambda: defaultdict(int))))
+
+        # 'separated' mode
+        anonymized_daemon_dict = {}
+
+        for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
+            daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
+
+            if mode == 'separated':
+                # anonymize individual daemon names except osds
+                if (daemon_type != 'osd'):
+                    anonymized_daemon = self.anonymize_entity_name(daemon)
+                    anonymized_daemon_dict[anonymized_daemon] = daemon
+                    daemon = anonymized_daemon
+
+            # Calculate num combined daemon types if in aggregated mode
+            if mode == 'aggregated':
+                if 'num_combined_daemons' not in result[daemon_type]:
+                    result[daemon_type]['num_combined_daemons'] = 1
+                else:
+                    result[daemon_type]['num_combined_daemons'] += 1
+
+            for collection in all_perf_counters_by_daemon:
+                # Split the collection to avoid redundancy in final report; i.e.:
+                #   bluestore.kv_flush_lat, bluestore.kv_final_lat -->
+                #   bluestore: kv_flush_lat, kv_final_lat
+                col_0, col_1 = collection.split('.')
+
+                # Debug log for empty keys. This initially was a problem for prioritycache
+                # perf counters, where the col_0 was empty for certain mon counters:
+                #
+                # "mon.a": {                  instead of    "mon.a": {
+                #      "": {                                     "prioritycache": {
+                #        "cache_bytes": {...},                          "cache_bytes": {...},
+                #
+                # This log is here to detect any future instances of a similar issue.
+                if (daemon == "") or (col_0 == "") or (col_1 == ""):
+                    self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
+
+                if mode == 'separated':
+                    # Add value to result
+                    result[daemon][col_0][col_1]['value'] = \
+                            all_perf_counters_by_daemon[collection]['value']
+
+                    # Check that 'count' exists, as not all counters have a count field.
+                    if 'count' in all_perf_counters_by_daemon[collection]:
+                        result[daemon][col_0][col_1]['count'] = \
+                                all_perf_counters_by_daemon[collection]['count']
+                elif mode == 'aggregated':
+                    # Not every rgw daemon has the same schema. Specifically, each rgw daemon
+                    # has a uniquely-named collection that starts off identically (i.e.
+                    # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
+                    # This bit of code combines these unique counters all under one rgw instance.
+                    # Without this check, the schema would remain separeted out in the final report.
+                    if col_0[0:11] == "objecter-0x":
+                        col_0 = "objecter-0x"
+
+                    # Check that the value can be incremented. In some cases,
+                    # the files are of type 'pair' (real-integer-pair, integer-integer pair).
+                    # In those cases, the value is a dictionary, and not a number.
+                    #   i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
+                    if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
+                        result[daemon_type][col_0][col_1]['value'] += \
+                                all_perf_counters_by_daemon[collection]['value']
+
+                    # Check that 'count' exists, as not all counters have a count field.
+                    if 'count' in all_perf_counters_by_daemon[collection]:
+                        result[daemon_type][col_0][col_1]['count'] += \
+                                all_perf_counters_by_daemon[collection]['count']
+                else:
+                    self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
+                    return {}
+
+        if mode == 'separated':
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
+
+        return result
+
+    def get_active_channels(self) -> List[str]:
+        r = []
+        if self.channel_basic:
+            r.append('basic')
+        if self.channel_crash:
+            r.append('crash')
+        if self.channel_device:
+            r.append('device')
+        if self.channel_ident:
+            r.append('ident')
+        if self.channel_perf:
+            r.append('perf')
+        return r
+
+    def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
+        try:
+            time_format = self.remote('devicehealth', 'get_time_format')
+        except Exception as e:
+            self.log.debug('Unable to format time: {}'.format(e))
+            return {}
+        cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
+        min_sample = cutoff.strftime(time_format)
+
+        devices = self.get('devices')['devices']
+        if not devices:
+            self.log.debug('Unable to get device info from the mgr.')
+            return {}
+
+        # anon-host-id -> anon-devid -> { timestamp -> record }
+        res: Dict[str, Dict[str, Dict[str, str]]] = {}
+        for d in devices:
+            devid = d['devid']
+            try:
+                # this is a map of stamp -> {device info}
+                m = self.remote('devicehealth', 'get_recent_device_metrics',
+                                devid, min_sample)
+            except Exception as e:
+                self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
+                continue
+
+            # anonymize host id
+            try:
+                host = d['location'][0]['host']
+            except (KeyError, IndexError) as e:
+                self.log.error('Unable to get host from device with id "{}": {}'.format(devid, e))
+                continue
+            anon_host = self.get_store('host-id/%s' % host)
+            if not anon_host:
+                anon_host = str(uuid.uuid1())
+                self.set_store('host-id/%s' % host, anon_host)
+            serial = None
+            for dev, rep in m.items():
+                rep['host_id'] = anon_host
+                if serial is None and 'serial_number' in rep:
+                    serial = rep['serial_number']
+
+            # anonymize device id
+            anon_devid = self.get_store('devid-id/%s' % devid)
+            if not anon_devid:
+                # ideally devid is 'vendor_model_serial',
+                # but can also be 'model_serial', 'serial'
+                if '_' in devid:
+                    anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
+                else:
+                    anon_devid = str(uuid.uuid1())
+                self.set_store('devid-id/%s' % devid, anon_devid)
+            self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
+                                                           host, anon_host))
+
+            # anonymize the smartctl report itself
+            if serial:
+                m_str = json.dumps(m)
+                m = json.loads(m_str.replace(serial, 'deleted'))
+
+            if anon_host not in res:
+                res[anon_host] = {}
+            res[anon_host][anon_devid] = m
+        return res
+
+    def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
+        data = self.get_counter(daemon_type, daemon_name, stat)[stat]
+        if data:
+            return data[-1][1]
+        else:
+            return 0
+
+    def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
+        if not channels:
+            channels = self.get_active_channels()
         report = {
-            'leaderboard': False,
+            'leaderboard': self.leaderboard,
             'report_version': 1,
-            'report_timestamp': datetime.utcnow().isoformat()
+            'report_timestamp': datetime.utcnow().isoformat(),
+            'report_id': self.report_id,
+            'channels': channels,
+            'channels_available': ALL_CHANNELS,
+            'license': LICENSE,
+            'collections_available': [c['name'].name for c in MODULE_COLLECTION],
+            'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
         }
 
-        if self.leaderboard:
-            report['leaderboard'] = True
+        if 'ident' in channels:
+            for option in ['description', 'contact', 'organization']:
+                report[option] = getattr(self, option)
 
-        for option in ['description', 'contact', 'organization']:
-            report[option] = getattr(self, option)
+        if 'basic' in channels:
+            mon_map = self.get('mon_map')
+            osd_map = self.get('osd_map')
+            service_map = self.get('service_map')
+            fs_map = self.get('fs_map')
+            df = self.get('df')
+            df_pools = {pool['id']: pool for pool in df['pools']}
 
-        mon_map = self.get('mon_map')
-        osd_map = self.get('osd_map')
-        service_map = self.get('service_map')
-        fs_map = self.get('fs_map')
-        df = self.get('df')
+            report['created'] = mon_map['created']
 
-        report['report_id'] = self.report_id
-        report['created'] = self.parse_timestamp(mon_map['created']).isoformat()
+            # mons
+            v1_mons = 0
+            v2_mons = 0
+            ipv4_mons = 0
+            ipv6_mons = 0
+            for mon in mon_map['mons']:
+                for a in mon['public_addrs']['addrvec']:
+                    if a['type'] == 'v2':
+                        v2_mons += 1
+                    elif a['type'] == 'v1':
+                        v1_mons += 1
+                    if a['addr'].startswith('['):
+                        ipv6_mons += 1
+                    else:
+                        ipv4_mons += 1
+            report['mon'] = {
+                'count': len(mon_map['mons']),
+                'features': mon_map['features'],
+                'min_mon_release': mon_map['min_mon_release'],
+                'v1_addr_mons': v1_mons,
+                'v2_addr_mons': v2_mons,
+                'ipv4_addr_mons': ipv4_mons,
+                'ipv6_addr_mons': ipv6_mons,
+            }
 
-        report['mon'] = {
-            'count': len(mon_map['mons']),
-            'features': mon_map['features']
-        }
+            report['config'] = self.gather_configs()
 
-        num_pg = 0
-        report['pools'] = list()
-        for pool in osd_map['pools']:
-            num_pg += pool['pg_num']
-            report['pools'].append(
-                {
-                    'pool': pool['pool'],
-                    'type': pool['type'],
-                    'pg_num': pool['pg_num'],
-                    'pgp_num': pool['pg_placement_num'],
-                    'size': pool['size'],
-                    'min_size': pool['min_size'],
-                    'crush_rule': pool['crush_rule']
-                }
-            )
+            # pools
 
-        report['osd'] = {
-            'count': len(osd_map['osds']),
-            'require_osd_release': osd_map['require_osd_release'],
-            'require_min_compat_client': osd_map['require_min_compat_client']
-        }
+            rbd_num_pools = 0
+            rbd_num_images_by_pool = []
+            rbd_mirroring_by_pool = []
+            num_pg = 0
+            report['pools'] = list()
+            for pool in osd_map['pools']:
+                num_pg += pool['pg_num']
+                ec_profile = {}
+                if pool['erasure_code_profile']:
+                    orig = osd_map['erasure_code_profiles'].get(
+                        pool['erasure_code_profile'], {})
+                    ec_profile = {
+                        k: orig[k] for k in orig.keys()
+                        if k in ['k', 'm', 'plugin', 'technique',
+                                 'crush-failure-domain', 'l']
+                    }
+                pool_data = {
+                        'pool': pool['pool'],
+                        'pg_num': pool['pg_num'],
+                        'pgp_num': pool['pg_placement_num'],
+                        'size': pool['size'],
+                        'min_size': pool['min_size'],
+                        'pg_autoscale_mode': pool['pg_autoscale_mode'],
+                        'target_max_bytes': pool['target_max_bytes'],
+                        'target_max_objects': pool['target_max_objects'],
+                        'type': ['', 'replicated', '', 'erasure'][pool['type']],
+                        'erasure_code_profile': ec_profile,
+                        'cache_mode': pool['cache_mode'],
+                    }
 
-        report['fs'] = {
-            'count': len(fs_map['filesystems'])
-        }
+                # basic_pool_usage collection
+                if self.is_enabled_collection(Collection.basic_pool_usage):
+                    pool_data['application'] = []
+                    for application in pool['application_metadata']:
+                        # Only include default applications
+                        if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
+                            pool_data['application'].append(application)
+                    pool_stats = df_pools[pool['pool']]['stats']
+                    pool_data['stats'] = { # filter out kb_used
+                                            'avail_raw': pool_stats['avail_raw'],
+                                            'bytes_used': pool_stats['bytes_used'],
+                                            'compress_bytes_used': pool_stats['compress_bytes_used'],
+                                            'compress_under_bytes': pool_stats['compress_under_bytes'],
+                                            'data_bytes_used': pool_stats['data_bytes_used'],
+                                            'dirty': pool_stats['dirty'],
+                                            'max_avail': pool_stats['max_avail'],
+                                            'objects': pool_stats['objects'],
+                                            'omap_bytes_used': pool_stats['omap_bytes_used'],
+                                            'percent_used': pool_stats['percent_used'],
+                                            'quota_bytes': pool_stats['quota_bytes'],
+                                            'quota_objects': pool_stats['quota_objects'],
+                                            'rd': pool_stats['rd'],
+                                            'rd_bytes': pool_stats['rd_bytes'],
+                                            'stored': pool_stats['stored'],
+                                            'stored_data': pool_stats['stored_data'],
+                                            'stored_omap': pool_stats['stored_omap'],
+                                            'stored_raw': pool_stats['stored_raw'],
+                                            'wr': pool_stats['wr'],
+                                            'wr_bytes': pool_stats['wr_bytes']
+                        }
 
-        report['metadata'] = dict()
-        report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
-        report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
+                cast(List[Dict[str, Any]], report['pools']).append(pool_data)
+                if 'rbd' in pool['application_metadata']:
+                    rbd_num_pools += 1
+                    ioctx = self.rados.open_ioctx(pool['pool_name'])
+                    rbd_num_images_by_pool.append(
+                        sum(1 for _ in rbd.RBD().list2(ioctx)))
+                    rbd_mirroring_by_pool.append(
+                        rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
+            report['rbd'] = {
+                'num_pools': rbd_num_pools,
+                'num_images_by_pool': rbd_num_images_by_pool,
+                'mirroring_by_pool': rbd_mirroring_by_pool}
 
-        report['usage'] = {
-            'pools': len(df['pools']),
-            'pg_num:': num_pg,
-            'total_used_bytes': df['stats']['total_used_bytes'],
-            'total_bytes': df['stats']['total_bytes'],
-            'total_avail_bytes': df['stats']['total_avail_bytes']
-        }
+            # osds
+            cluster_network = False
+            for osd in osd_map['osds']:
+                if osd['up'] and not cluster_network:
+                    front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
+                    back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
+                    if front_ip != back_ip:
+                        cluster_network = True
+            report['osd'] = {
+                'count': len(osd_map['osds']),
+                'require_osd_release': osd_map['require_osd_release'],
+                'require_min_compat_client': osd_map['require_min_compat_client'],
+                'cluster_network': cluster_network,
+            }
+
+            # crush
+            report['crush'] = self.gather_crush_info()
+
+            # cephfs
+            report['fs'] = {
+                'count': len(fs_map['filesystems']),
+                'feature_flags': fs_map['feature_flags'],
+                'num_standby_mds': len(fs_map['standbys']),
+                'filesystems': [],
+            }
+            num_mds = len(fs_map['standbys'])
+            for fsm in fs_map['filesystems']:
+                fs = fsm['mdsmap']
+                num_sessions = 0
+                cached_ino = 0
+                cached_dn = 0
+                cached_cap = 0
+                subtrees = 0
+                rfiles = 0
+                rbytes = 0
+                rsnaps = 0
+                for gid, mds in fs['info'].items():
+                    num_sessions += self.get_latest('mds', mds['name'],
+                                                    'mds_sessions.session_count')
+                    cached_ino += self.get_latest('mds', mds['name'],
+                                                  'mds_mem.ino')
+                    cached_dn += self.get_latest('mds', mds['name'],
+                                                 'mds_mem.dn')
+                    cached_cap += self.get_latest('mds', mds['name'],
+                                                  'mds_mem.cap')
+                    subtrees += self.get_latest('mds', mds['name'],
+                                                'mds.subtrees')
+                    if mds['rank'] == 0:
+                        rfiles = self.get_latest('mds', mds['name'],
+                                                 'mds.root_rfiles')
+                        rbytes = self.get_latest('mds', mds['name'],
+                                                 'mds.root_rbytes')
+                        rsnaps = self.get_latest('mds', mds['name'],
+                                                 'mds.root_rsnaps')
+                report['fs']['filesystems'].append({  # type: ignore
+                    'max_mds': fs['max_mds'],
+                    'ever_allowed_features': fs['ever_allowed_features'],
+                    'explicitly_allowed_features': fs['explicitly_allowed_features'],
+                    'num_in': len(fs['in']),
+                    'num_up': len(fs['up']),
+                    'num_standby_replay': len(
+                        [mds for gid, mds in fs['info'].items()
+                         if mds['state'] == 'up:standby-replay']),
+                    'num_mds': len(fs['info']),
+                    'num_sessions': num_sessions,
+                    'cached_inos': cached_ino,
+                    'cached_dns': cached_dn,
+                    'cached_caps': cached_cap,
+                    'cached_subtrees': subtrees,
+                    'balancer_enabled': len(fs['balancer']) > 0,
+                    'num_data_pools': len(fs['data_pools']),
+                    'standby_count_wanted': fs['standby_count_wanted'],
+                    'approx_ctime': fs['created'][0:7],
+                    'files': rfiles,
+                    'bytes': rbytes,
+                    'snaps': rsnaps,
+                })
+                num_mds += len(fs['info'])
+            report['fs']['total_num_mds'] = num_mds  # type: ignore
+
+            # daemons
+            report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
+                                      mon=self.gather_mon_metadata(mon_map))
+
+            if self.is_enabled_collection(Collection.basic_mds_metadata):
+                report['metadata']['mds'] = self.gather_mds_metadata()  # type: ignore
+
+            # host counts
+            servers = self.list_servers()
+            self.log.debug('servers %s' % servers)
+            hosts = {
+                'num': len([h for h in servers if h['hostname']]),
+            }
+            for t in ['mon', 'mds', 'osd', 'mgr']:
+                nr_services = sum(1 for host in servers if
+                                  any(service for service in cast(List[ServiceInfoT],
+                                                                  host['services'])
+                                      if service['type'] == t))
+                hosts['num_with_' + t] = nr_services
+            report['hosts'] = hosts
+
+            report['usage'] = {
+                'pools': len(df['pools']),
+                'pg_num': num_pg,
+                'total_used_bytes': df['stats']['total_used_bytes'],
+                'total_bytes': df['stats']['total_bytes'],
+                'total_avail_bytes': df['stats']['total_avail_bytes']
+            }
+            # basic_usage_by_class collection
+            if self.is_enabled_collection(Collection.basic_usage_by_class):
+                report['usage']['stats_by_class'] = {} # type: ignore
+                for device_class in df['stats_by_class']:
+                    if device_class in ['hdd', 'ssd', 'nvme']:
+                        report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
 
-        report['services'] = defaultdict(int)
-        for key, value in service_map['services'].items():
-            report['services'][key] += 1
+            services: DefaultDict[str, int] = defaultdict(int)
+            for key, value in service_map['services'].items():
+                services[key] += 1
+                if key == 'rgw':
+                    rgw = {}
+                    zones = set()
+                    zonegroups = set()
+                    frontends = set()
+                    count = 0
+                    d = value.get('daemons', dict())
+                    for k, v in d.items():
+                        if k == 'summary' and v:
+                            rgw[k] = v
+                        elif isinstance(v, dict) and 'metadata' in v:
+                            count += 1
+                            zones.add(v['metadata']['zone_id'])
+                            zonegroups.add(v['metadata']['zonegroup_id'])
+                            frontends.add(v['metadata']['frontend_type#0'])
 
-        report['crashes'] = self.gather_crashinfo()
+                            # we could actually iterate over all the keys of
+                            # the dict and check for how many frontends there
+                            # are, but it is unlikely that one would be running
+                            # more than 2 supported ones
+                            f2 = v['metadata'].get('frontend_type#1', None)
+                            if f2:
+                                frontends.add(f2)
+
+                    rgw['count'] = count
+                    rgw['zones'] = len(zones)
+                    rgw['zonegroups'] = len(zonegroups)
+                    rgw['frontends'] = list(frontends)  # sets aren't json-serializable
+                    report['rgw'] = rgw
+            report['services'] = services
+
+            try:
+                report['balancer'] = self.remote('balancer', 'gather_telemetry')
+            except ImportError:
+                report['balancer'] = {
+                    'active': False
+                }
+
+            # Rook
+            self.get_rook_data(report)
+
+        if 'crash' in channels:
+            report['crashes'] = self.gather_crashinfo()
+
+        if 'perf' in channels:
+            if self.is_enabled_collection(Collection.perf_perf):
+                report['perf_counters'] = self.gather_perf_counters('separated')
+                report['stats_per_pool'] = self.get_stats_per_pool()
+                report['stats_per_pg'] = self.get_stats_per_pg()
+                report['io_rate'] = self.get_io_rate()
+                report['osd_perf_histograms'] = self.get_osd_histograms('separated')
+                report['mempool'] = self.get_mempool('separated')
+                report['heap_stats'] = self.get_heap_stats()
+                report['rocksdb_stats'] = self.get_rocksdb_stats()
+
+        # NOTE: We do not include the 'device' channel in this report; it is
+        # sent to a different endpoint.
 
         return report
 
-    def send(self, report):
-        self.log.info('Upload report to: %s', self.url)
+    def get_rook_data(self, report: Dict[str, object]) -> None:
+        r, outb, outs = self.mon_command({
+            'prefix': 'config-key dump',
+            'format': 'json'
+        })
+        if r != 0:
+            return
+        try:
+            config_kv_dump = json.loads(outb)
+        except json.decoder.JSONDecodeError:
+            return
+
+        for elem in ROOK_KEYS_BY_COLLECTION:
+            # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
+            # elem[1] is the Collection this key belongs to
+            if self.is_enabled_collection(elem[1]):
+                self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
+
+    def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
+        last_node = key_path.split('/')[-1]
+        for node in key_path.split('/')[0:-1]:
+            if node not in report:
+                report[node] = {}
+            report = report[node]  # type: ignore
+
+            # sanity check of keys correctness
+            if not isinstance(report, dict):
+                self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
+                return
+
+        if last_node in report:
+            self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
+            return
+
+        report[last_node] = value
+
+    def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
+        self.log.info('Sending %s to: %s' % (what, url))
         proxies = dict()
         if self.proxy:
-            self.log.info('Using HTTP(S) proxy: %s', self.proxy)
+            self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
             proxies['http'] = self.proxy
             proxies['https'] = self.proxy
+        try:
+            resp = requests.put(url=url, json=report, proxies=proxies)
+            resp.raise_for_status()
+        except Exception as e:
+            fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
+            self.log.error(fail_reason)
+            return fail_reason
+        return None
+
+    class EndPoint(enum.Enum):
+        ceph = 'ceph'
+        device = 'device'
+
+    def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
+        '''
+        Find collections that are available in the module, but are not in the db
+        '''
+        if self.db_collection is None:
+            return None
+
+        if not channels:
+            channels = ALL_CHANNELS
+        else:
+            for ch in channels:
+                if ch not in ALL_CHANNELS:
+                    self.log.debug(f"invalid channel name: {ch}")
+                    return None
+
+        new_collection : List[Collection] = []
+
+        for c in MODULE_COLLECTION:
+            if c['name'].name not in self.db_collection:
+                if c['channel'] in channels:
+                    new_collection.append(c['name'])
+
+        return new_collection
+
+    def is_major_upgrade(self) -> bool:
+        '''
+        Returns True only if the user last opted-in to an older major
+        '''
+        if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
+            # we do not know what Ceph version was when the user last opted-in,
+            # thus we do not wish to nag in case of a major upgrade
+            return False
+
+        mon_map = self.get('mon_map')
+        mon_min = mon_map.get("min_mon_release", 0)
+
+        if mon_min - self.last_opted_in_ceph_version > 0:
+            self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
+            return True
+
+        return False
+
+    def is_opted_in(self) -> bool:
+        # If len is 0 it means that the user is either opted-out (never
+        # opted-in, or invoked `telemetry off`), or they upgraded from a
+        # telemetry revision 1 or 2, which required to re-opt in to revision 3,
+        # regardless, hence is considered as opted-out
+        if self.db_collection is None:
+            return False
+        return len(self.db_collection) > 0
+
+    def should_nag(self) -> bool:
+        # Find delta between opted-in collections and module collections;
+        # nag only if module has a collection which is not in db, and nag == True.
 
-        resp = requests.put(url=self.url, json=report, proxies=proxies)
-        if not resp.ok:
-            self.log.error("Report send failed: %d %s %s" %
-                           (resp.status_code, resp.reason, resp.text))
-        return resp
-
-    def handle_command(self, inbuf, command):
-        if command['prefix'] == 'telemetry status':
-            r = {}
-            for opt in self.MODULE_OPTIONS:
-                r[opt['name']] = getattr(self, opt['name'])
-            return 0, json.dumps(r, indent=4), ''
-        elif command['prefix'] == 'telemetry on':
+        # We currently do not nag if the user is opted-out (or never opted-in).
+        # If we wish to do this in the future, we need to have a tri-mode state
+        # (opted in, opted out, no action yet), and it needs to be guarded by a
+        # config option (so that nagging can be turned off via config).
+        # We also need to add a last_opted_out_ceph_version variable, for the
+        # major upgrade check.
+
+        # check if there are collections the user is not opt-in to
+        # that we should nag about
+        if self.db_collection is not None:
+            for c in MODULE_COLLECTION:
+                if c['name'].name not in self.db_collection:
+                    if c['nag'] == True:
+                        self.log.debug(f"The collection: {c['name']} is not reported")
+                        return True
+
+        # user might be opted-in to the most recent collection, or there is no
+        # new collection which requires nagging about; thus nag in case it's a
+        # major upgrade and there are new collections
+        # (which their own nag == False):
+        new_collections = False
+        col_delta = self.collection_delta()
+        if col_delta is not None and len(col_delta) > 0:
+            new_collections = True
+
+        return self.is_major_upgrade() and new_collections
+
+    def init_collection(self) -> None:
+        # We fetch from db the collections the user had already opted-in to.
+        # During the transition the results will be empty, but the user might
+        # be opted-in to an older version (e.g. revision = 3)
+
+        collection = self.get_store('collection')
+
+        if collection is not None:
+            self.db_collection = json.loads(collection)
+
+        if self.db_collection is None:
+            # happens once on upgrade
+            if not self.enabled:
+                # user is not opted-in
+                self.set_store('collection', json.dumps([]))
+                self.log.debug("user is not opted-in")
+            else:
+                # user is opted-in, verify the revision:
+                if self.last_opt_revision == REVISION:
+                    self.log.debug(f"telemetry revision is {REVISION}")
+                    base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
+                    self.set_store('collection', json.dumps(base_collection))
+                else:
+                    # user is opted-in to an older version, meaning they need
+                    # to re-opt in regardless
+                    self.set_store('collection', json.dumps([]))
+                    self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
+
+            # reload collection after setting
+            collection = self.get_store('collection')
+            if collection is not None:
+                self.db_collection = json.loads(collection)
+            else:
+                raise RuntimeError('collection is None after initial setting')
+        else:
+            # user has already upgraded
+            self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
+
+    def is_enabled_collection(self, collection: Collection) -> bool:
+        if self.db_collection is None:
+            return False
+        return collection.name in self.db_collection
+
+    def opt_in_all_collections(self) -> None:
+        """
+        Opt-in to all collections; Update db with the currently available collections in the module
+        """
+        if self.db_collection is None:
+            raise RuntimeError('db_collection is None after initial setting')
+
+        for c in MODULE_COLLECTION:
+            if c['name'].name not in self.db_collection:
+                self.db_collection.append(c['name'])
+
+        self.set_store('collection', json.dumps(self.db_collection))
+
+    def send(self,
+             report: Dict[str, Dict[str, str]],
+             endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
+        if not endpoint:
+            endpoint = [self.EndPoint.ceph, self.EndPoint.device]
+        failed = []
+        success = []
+        self.log.debug('Send endpoints %s' % endpoint)
+        for e in endpoint:
+            if e == self.EndPoint.ceph:
+                fail_reason = self._try_post('ceph report', self.url, report)
+                if fail_reason:
+                    failed.append(fail_reason)
+                else:
+                    now = int(time.time())
+                    self.last_upload = now
+                    self.set_store('last_upload', str(now))
+                    success.append('Ceph report sent to {0}'.format(self.url))
+                    self.log.info('Sent report to {0}'.format(self.url))
+            elif e == self.EndPoint.device:
+                if 'device' in self.get_active_channels():
+                    devices = self.gather_device_report()
+                    if devices:
+                        num_devs = 0
+                        num_hosts = 0
+                        for host, ls in devices.items():
+                            self.log.debug('host %s devices %s' % (host, ls))
+                            if not len(ls):
+                                continue
+                            fail_reason = self._try_post('devices', self.device_url,
+                                                         ls)
+                            if fail_reason:
+                                failed.append(fail_reason)
+                            else:
+                                num_devs += len(ls)
+                                num_hosts += 1
+                        if num_devs:
+                            success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
+                                num_devs, num_hosts, len(devices)))
+                    else:
+                        fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
+                        failed.append(fail_reason)
+                        self.log.error(fail_reason)
+        if failed:
+            return 1, '', '\n'.join(success + failed)
+        return 0, '', '\n'.join(success)
+
+    def format_perf_histogram(self, report: Dict[str, Any]) -> None:
+        # Formatting the perf histograms so they are human-readable. This will change the
+        # ranges and values, which are currently in list form, into strings so that
+        # they are displayed horizontally instead of vertically.
+        try:
+            # Formatting ranges and values in osd_perf_histograms
+            mode = 'osd_perf_histograms'
+            for config in report[mode]:
+                for histogram in config:
+                    # Adjust ranges by converting lists into strings
+                    for axis in config[histogram]['axes']:
+                        for i in range(0, len(axis['ranges'])):
+                            axis['ranges'][i] = str(axis['ranges'][i])
+
+                    for osd in config[histogram]['osds']:
+                        for i in range(0, len(osd['values'])):
+                            osd['values'][i] = str(osd['values'][i])
+        except KeyError:
+            # If the perf channel is not enabled, there should be a KeyError since
+            # 'osd_perf_histograms' would not be present in the report. In that case,
+            # the show function should pass as usual without trying to format the
+            # histograms.
+            pass
+
+    def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+        '''
+        Enable or disable a list of channels
+        '''
+        if not self.enabled:
+            # telemetry should be on for channels to be toggled
+            msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+                  'Preview sample reports with `ceph telemetry preview`.'
+            return 0, msg, ''
+
+        if channels is None:
+            msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
+            return 0, msg, ''
+
+        state = action == 'enable'
+        msg = ''
+        for c in channels:
+            if c not in ALL_CHANNELS:
+                msg = f"{msg}{c} is not a valid channel name. "\
+                        f"Available channels: {ALL_CHANNELS}.\n"
+            else:
+                self.set_module_option(f"channel_{c}", state)
+                setattr(self,
+                        f"channel_{c}",
+                        state)
+                msg = f"{msg}channel_{c} is {action}d\n"
+
+        return 0, msg, ''
+
+    @CLIReadCommand('telemetry status')
+    def status(self) -> Tuple[int, str, str]:
+        '''
+        Show current configuration
+        '''
+        r = {}
+        for opt in self.MODULE_OPTIONS:
+            r[opt['name']] = getattr(self, opt['name'])
+        r['last_upload'] = (time.ctime(self.last_upload)
+                            if self.last_upload else self.last_upload)
+        return 0, json.dumps(r, indent=4, sort_keys=True), ''
+
+    @CLIReadCommand('telemetry diff')
+    def diff(self) -> Tuple[int, str, str]:
+        '''
+        Show the diff between opted-in collection and available collection
+        '''
+        diff = []
+        keys = ['nag']
+
+        for c in MODULE_COLLECTION:
+            if not self.is_enabled_collection(c['name']):
+                diff.append({key: val for key, val in c.items() if key not in keys})
+
+        r = None
+        if diff == []:
+            r = "Telemetry is up to date"
+        else:
+            r = json.dumps(diff, indent=4, sort_keys=True)
+
+        return 0, r, ''
+
+    @CLICommand('telemetry on')
+    def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
+        '''
+        Enable telemetry reports from this cluster
+        '''
+        if license != LICENSE:
+            return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
+To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
+        else:
             self.set_module_option('enabled', True)
-            return 0, '', ''
-        elif command['prefix'] == 'telemetry off':
-            self.set_module_option('enabled', False)
-            return 0, '', ''
-        elif command['prefix'] == 'telemetry send':
-            self.last_report = self.compile_report()
-            resp = self.send(self.last_report)
-            if resp.ok:
-                return 0, 'Report sent to {0}'.format(self.url), ''
-            return 1, '', 'Failed to send report to %s: %d %s %s' % (
-                self.url,
-                resp.status_code,
-                resp.reason,
-                resp.text
-            )
-
-        elif command['prefix'] == 'telemetry show':
-            report = self.last_report
-            if not report:
-                report = self.compile_report()
-            return 0, json.dumps(report, indent=4), ''
+            self.enabled = True
+            self.opt_in_all_collections()
+
+            # for major releases upgrade nagging
+            mon_map = self.get('mon_map')
+            mon_min = mon_map.get("min_mon_release", 0)
+            self.set_store('last_opted_in_ceph_version', str(mon_min))
+            self.last_opted_in_ceph_version = mon_min
+
+            msg = 'Telemetry is on.'
+            disabled_channels = ''
+            active_channels = self.get_active_channels()
+            for c in ALL_CHANNELS:
+                if c not in active_channels and c != 'ident':
+                    disabled_channels = f"{disabled_channels} {c}"
+
+            if len(disabled_channels) > 0:
+                msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
+                        f"`ceph telemetry enable channel{disabled_channels}`"
+
+            # wake up serve() to reset health warning
+            self.event.set()
+
+            return 0, msg, ''
+
+    @CLICommand('telemetry off')
+    def off(self) -> Tuple[int, str, str]:
+        '''
+        Disable telemetry reports from this cluster
+        '''
+        if not self.enabled:
+            # telemetry is already off
+            msg = 'Telemetry is currently not enabled, nothing to turn off. '\
+                    'Please consider opting-in with `ceph telemetry on`.\n' \
+                  'Preview sample reports with `ceph telemetry preview`.'
+            return 0, msg, ''
+
+        self.set_module_option('enabled', False)
+        self.enabled = False
+        self.set_store('collection', json.dumps([]))
+        self.db_collection = []
+
+        # we might need this info in the future, in case
+        # of nagging when user is opted-out
+        mon_map = self.get('mon_map')
+        mon_min = mon_map.get("min_mon_release", 0)
+        self.set_store('last_opted_out_ceph_version', str(mon_min))
+        self.last_opted_out_ceph_version = mon_min
+
+        msg = 'Telemetry is now disabled.'
+        return 0, msg, ''
+
+    @CLIReadCommand('telemetry enable channel all')
+    def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
+        '''
+        Enable all channels
+        '''
+        return self.toggle_channel('enable', channels)
+
+    @CLIReadCommand('telemetry enable channel')
+    def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+        '''
+        Enable a list of channels
+        '''
+        return self.toggle_channel('enable', channels)
+
+    @CLIReadCommand('telemetry disable channel all')
+    def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
+        '''
+        Disable all channels
+        '''
+        return self.toggle_channel('disable', channels)
+
+    @CLIReadCommand('telemetry disable channel')
+    def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+        '''
+        Disable a list of channels
+        '''
+        return self.toggle_channel('disable', channels)
+
+    @CLIReadCommand('telemetry channel ls')
+    def channel_ls(self) -> Tuple[int, str, str]:
+        '''
+        List all channels
+        '''
+        table = PrettyTable(
+            [
+                'NAME', 'ENABLED', 'DEFAULT', 'DESC',
+            ],
+            border=False)
+        table.align['NAME'] = 'l'
+        table.align['ENABLED'] = 'l'
+        table.align['DEFAULT'] = 'l'
+        table.align['DESC'] = 'l'
+        table.left_padding_width = 0
+        table.right_padding_width = 4
+
+        for c in ALL_CHANNELS:
+            enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
+            for o in self.MODULE_OPTIONS:
+                if o['name'] == f"channel_{c}":
+                    default = "ON" if o.get('default', None) else "OFF"
+                    desc = o.get('desc', None)
+
+            table.add_row((
+                c,
+                enabled,
+                default,
+                desc,
+            ))
+
+        return 0, table.get_string(sortby="NAME"), ''
+
+    @CLIReadCommand('telemetry collection ls')
+    def collection_ls(self) -> Tuple[int, str, str]:
+        '''
+        List all collections
+        '''
+        col_delta = self.collection_delta()
+        msg = ''
+        if col_delta is not None and len(col_delta) > 0:
+            msg = f"New collections are available:\n" \
+                  f"{sorted([c.name for c in col_delta])}\n" \
+                  f"Run `ceph telemetry on` to opt-in to these collections.\n"
+
+        table = PrettyTable(
+            [
+                'NAME', 'STATUS', 'DESC',
+            ],
+            border=False)
+        table.align['NAME'] = 'l'
+        table.align['STATUS'] = 'l'
+        table.align['DESC'] = 'l'
+        table.left_padding_width = 0
+        table.right_padding_width = 4
+
+        for c in MODULE_COLLECTION:
+            name = c['name']
+            opted_in = self.is_enabled_collection(name)
+            channel_enabled = getattr(self, f"channel_{c['channel']}")
+
+            status = ''
+            if channel_enabled and opted_in:
+                status = "REPORTING"
+            else:
+                why = ''
+                delimiter = ''
+
+                if not opted_in:
+                    why += "NOT OPTED-IN"
+                    delimiter = ', '
+                if not channel_enabled:
+                    why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
+
+                status = f"NOT REPORTING: {why}"
+
+            desc = c['description']
+
+            table.add_row((
+                name,
+                status,
+                desc,
+            ))
+
+        if len(msg):
+            # add a new line between message and table output
+            msg = f"{msg} \n"
+
+        return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
+
+    @CLICommand('telemetry send')
+    def do_send(self,
+                endpoint: Optional[List[EndPoint]] = None,
+                license: Optional[str] = None) -> Tuple[int, str, str]:
+        '''
+        Send a sample report
+        '''
+        if not self.is_opted_in() and license != LICENSE:
+            self.log.debug(('A telemetry send attempt while opted-out. '
+                            'Asking for license agreement'))
+            return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
+To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
+Please consider enabling the telemetry module with 'ceph telemetry on'.'''
         else:
-            return (-errno.EINVAL, '',
-                    "Command not found '{0}'".format(command['prefix']))
+            self.last_report = self.compile_report()
+            return self.send(self.last_report, endpoint)
+
+    @CLIReadCommand('telemetry show')
+    def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+        '''
+        Show a sample report of opted-in collections (except for 'device')
+        '''
+        if not self.enabled:
+            # if telemetry is off, no report is being sent, hence nothing to show
+            msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+                  'Preview sample reports with `ceph telemetry preview`.'
+            return 0, msg, ''
+
+        report = self.get_report_locked(channels=channels)
+        self.format_perf_histogram(report)
+        report = json.dumps(report, indent=4, sort_keys=True)
+
+        if self.channel_device:
+            report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
+
+        return 0, report, ''
+
+    @CLIReadCommand('telemetry preview')
+    def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+        '''
+        Preview a sample report of the most recent collections available (except for 'device')
+        '''
+        report = {}
 
-    def self_test(self):
+        # We use a lock to prevent a scenario where the user wishes to preview
+        # the report, and at the same time the module hits the interval of
+        # sending a report with the opted-in collection, which has less data
+        # than in the preview report.
+        col_delta = self.collection_delta()
+        with self.get_report_lock:
+            if col_delta is not None and len(col_delta) == 0:
+                # user is already opted-in to the most recent collection
+                msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
+                return 0, msg, ''
+            else:
+                # there are collections the user is not opted-in to
+                next_collection = []
+
+                for c in MODULE_COLLECTION:
+                    next_collection.append(c['name'].name)
+
+                opted_in_collection = self.db_collection
+                self.db_collection = next_collection
+                report = self.get_report(channels=channels)
+                self.db_collection = opted_in_collection
+
+        self.format_perf_histogram(report)
+        report = json.dumps(report, indent=4, sort_keys=True)
+
+        if self.channel_device:
+            report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
+
+        return 0, report, ''
+
+    @CLIReadCommand('telemetry show-device')
+    def show_device(self) -> Tuple[int, str, str]:
+        '''
+        Show a sample device report
+        '''
+        if not self.enabled:
+            # if telemetry is off, no report is being sent, hence nothing to show
+            msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+                  'Preview sample device reports with `ceph telemetry preview-device`.'
+            return 0, msg, ''
+
+        if not self.channel_device:
+            # if device channel is off, device report is not being sent, hence nothing to show
+            msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
+                  'Preview sample device reports with `ceph telemetry preview-device`.'
+            return 0, msg, ''
+
+        return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
+
+    @CLIReadCommand('telemetry preview-device')
+    def preview_device(self) -> Tuple[int, str, str]:
+        '''
+        Preview a sample device report of the most recent device collection
+        '''
+        report = {}
+
+        device_col_delta = self.collection_delta(['device'])
+        with self.get_report_lock:
+            if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
+                # user is already opted-in to the most recent device collection,
+                # and device channel is on, thus `show-device` should be called
+                msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
+                return 0, msg, ''
+
+            # either the user is not opted-in at all, or there are collections
+            # they are not opted-in to
+            next_collection = []
+
+            for c in MODULE_COLLECTION:
+                next_collection.append(c['name'].name)
+
+            opted_in_collection = self.db_collection
+            self.db_collection = next_collection
+            report = self.get_report('device')
+            self.db_collection = opted_in_collection
+
+        report = json.dumps(report, indent=4, sort_keys=True)
+        return 0, report, ''
+
+    @CLIReadCommand('telemetry show-all')
+    def show_all(self) -> Tuple[int, str, str]:
+        '''
+        Show a sample report of all enabled channels (including 'device' channel)
+        '''
+        if not self.enabled:
+            # if telemetry is off, no report is being sent, hence nothing to show
+            msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+                  'Preview sample reports with `ceph telemetry preview`.'
+            return 0, msg, ''
+
+        if not self.channel_device:
+            # device channel is off, no need to display its report
+            return 0, json.dumps(self.get_report_locked('default'), indent=4, sort_keys=True), ''
+
+        # telemetry is on and device channel is enabled, show both
+        return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), ''
+
+    @CLIReadCommand('telemetry preview-all')
+    def preview_all(self) -> Tuple[int, str, str]:
+        '''
+        Preview a sample report of the most recent collections available of all channels (including 'device')
+        '''
+        report = {}
+
+        col_delta = self.collection_delta()
+        with self.get_report_lock:
+            if col_delta is not None and len(col_delta) == 0:
+                # user is already opted-in to the most recent collection
+                msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
+                return 0, msg, ''
+
+            # there are collections the user is not opted-in to
+            next_collection = []
+
+            for c in MODULE_COLLECTION:
+                next_collection.append(c['name'].name)
+
+            opted_in_collection = self.db_collection
+            self.db_collection = next_collection
+            report = self.get_report('all')
+            self.db_collection = opted_in_collection
+
+        self.format_perf_histogram(report)
+        report = json.dumps(report, indent=4, sort_keys=True)
+
+        return 0, report, ''
+
+    def get_report_locked(self,
+                          report_type: str = 'default',
+                          channels: Optional[List[str]] = None) -> Dict[str, Any]:
+        '''
+        A wrapper around get_report to allow for compiling a report of the most recent module collections
+        '''
+        with self.get_report_lock:
+            return self.get_report(report_type, channels)
+
+    def get_report(self,
+                   report_type: str = 'default',
+                   channels: Optional[List[str]] = None) -> Dict[str, Any]:
+        if report_type == 'default':
+            return self.compile_report(channels=channels)
+        elif report_type == 'device':
+            return self.gather_device_report()
+        elif report_type == 'all':
+            return {'report': self.compile_report(channels=channels),
+                    'device_report': self.gather_device_report()}
+        return {}
+
+    def self_test(self) -> None:
         report = self.compile_report()
         if len(report) == 0:
             raise RuntimeError('Report is empty')
@@ -313,55 +1984,63 @@ class Module(MgrModule):
         if 'report_id' not in report:
             raise RuntimeError('report_id not found in report')
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self.run = False
         self.event.set()
 
-    def serve(self):
+    def refresh_health_checks(self) -> None:
+        health_checks = {}
+        # TODO do we want to nag also in case the user is not opted-in?
+        if self.enabled and self.should_nag():
+            health_checks['TELEMETRY_CHANGED'] = {
+                'severity': 'warning',
+                'summary': 'Telemetry requires re-opt-in',
+                'detail': [
+                    'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
+                ]
+            }
+        self.set_health_checks(health_checks)
+
+    def serve(self) -> None:
         self.load()
-        self.config_notify()
         self.run = True
 
         self.log.debug('Waiting for mgr to warm up')
-        self.event.wait(10)
+        time.sleep(10)
 
         while self.run:
+            self.event.clear()
+
+            self.refresh_health_checks()
+
+            if not self.is_opted_in():
+                self.log.debug('Not sending report until user re-opts-in')
+                self.event.wait(1800)
+                continue
             if not self.enabled:
-                self.log.info('Not sending report until configured to do so')
+                self.log.debug('Not sending report until configured to do so')
                 self.event.wait(1800)
                 continue
 
             now = int(time.time())
-            if not self.last_upload or (now - self.last_upload) > \
-                            self.interval * 3600:
+            if not self.last_upload or \
+               (now - self.last_upload) > self.interval * 3600:
                 self.log.info('Compiling and sending report to %s',
                               self.url)
 
                 try:
                     self.last_report = self.compile_report()
-                except:
+                except Exception:
                     self.log.exception('Exception while compiling report:')
 
-                try:
-                    resp = self.send(self.last_report)
-                    # self.send logs on failure; only update last_upload
-                    # if we succeed
-                    if resp.ok:
-                        self.last_upload = now
-                        self.set_store('last_upload', str(now))
-                except:
-                    self.log.exception('Exception while sending report:')
+                self.send(self.last_report)
             else:
-                self.log.info('Interval for sending new report has not expired')
+                self.log.debug('Interval for sending new report has not expired')
 
             sleep = 3600
             self.log.debug('Sleeping for %d seconds', sleep)
             self.event.wait(sleep)
 
-    def self_test(self):
-        self.compile_report()
-        return True
-
     @staticmethod
-    def can_run():
+    def can_run() -> Tuple[bool, str]:
         return True, ''