Collect statistics from Ceph cluster and send this back to the Ceph project
when user has opted-in
"""
+import logging
+import numbers
+import enum
import errno
import hashlib
import json
import rbd
-import re
import requests
import uuid
import time
from datetime import datetime, timedelta
-from threading import Event
+from prettytable import PrettyTable
+from threading import Event, Lock
from collections import defaultdict
+from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
-from mgr_module import MgrModule
+from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
-ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
+ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
-LICENSE='sharing-1-0'
-LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
-LICENSE_URL='https://cdla.io/sharing-1-0/'
-
-# If the telemetry revision has changed since this point, re-require
-# an opt-in. This should happen each time we add new information to
-# the telemetry report.
-LAST_REVISION_RE_OPT_IN = 2
+LICENSE = 'sharing-1-0'
+LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
+LICENSE_URL = 'https://cdla.io/sharing-1-0/'
+NO_SALT_CNT = 0
# Latest revision of the telemetry report. Bump this each time we make
# *any* change.
# - rgw daemons, zones, zonegroups; which rgw frontends
# - crush map stats
-class Module(MgrModule):
- config = dict()
+class Collection(str, enum.Enum):
+ basic_base = 'basic_base'
+ device_base = 'device_base'
+ crash_base = 'crash_base'
+ ident_base = 'ident_base'
+ perf_perf = 'perf_perf'
+ basic_mds_metadata = 'basic_mds_metadata'
+ basic_pool_usage = 'basic_pool_usage'
+ basic_usage_by_class = 'basic_usage_by_class'
+ basic_rook_v01 = 'basic_rook_v01'
+ perf_memory_metrics = 'perf_memory_metrics'
+
+MODULE_COLLECTION : List[Dict] = [
+ {
+ "name": Collection.basic_base,
+ "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.device_base,
+ "description": "Information about device health metrics",
+ "channel": "device",
+ "nag": False
+ },
+ {
+ "name": Collection.crash_base,
+ "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
+ "channel": "crash",
+ "nag": False
+ },
+ {
+ "name": Collection.ident_base,
+ "description": "User-provided identifying information about the cluster",
+ "channel": "ident",
+ "nag": False
+ },
+ {
+ "name": Collection.perf_perf,
+ "description": "Information about performance counters of the cluster",
+ "channel": "perf",
+ "nag": True
+ },
+ {
+ "name": Collection.basic_mds_metadata,
+ "description": "MDS metadata",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.basic_pool_usage,
+ "description": "Default pool application and usage statistics",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.basic_usage_by_class,
+ "description": "Default device class usage statistics",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.basic_rook_v01,
+ "description": "Basic Rook deployment data",
+ "channel": "basic",
+ "nag": True
+ },
+ {
+ "name": Collection.perf_memory_metrics,
+ "description": "Heap stats and mempools for mon and mds",
+ "channel": "perf",
+ "nag": False
+ },
+]
+
+ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
+ # Note: a key cannot be both a node and a leaf, e.g.
+ # "rook/a/b"
+ # "rook/a/b/c"
+ ("rook/version", Collection.basic_rook_v01),
+ ("rook/kubernetes/version", Collection.basic_rook_v01),
+ ("rook/csi/version", Collection.basic_rook_v01),
+ ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
+ ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
+ ("rook/cluster/mon/count", Collection.basic_rook_v01),
+ ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
+ ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
+ ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
+ ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
+ ("rook/cluster/network/provider", Collection.basic_rook_v01),
+ ("rook/cluster/external-mode", Collection.basic_rook_v01),
+]
+class Module(MgrModule):
metadata_keys = [
- "arch",
- "ceph_version",
- "os",
- "cpu",
- "kernel_description",
- "kernel_version",
- "distro_description",
- "distro"
+ "arch",
+ "ceph_version",
+ "os",
+ "cpu",
+ "kernel_description",
+ "kernel_version",
+ "distro_description",
+ "distro"
]
MODULE_OPTIONS = [
- {
- 'name': 'url',
- 'type': 'str',
- 'default': 'https://telemetry.ceph.com/report'
- },
- {
- 'name': 'device_url',
- 'type': 'str',
- 'default': 'https://telemetry.ceph.com/device'
- },
- {
- 'name': 'enabled',
- 'type': 'bool',
- 'default': False
- },
- {
- 'name': 'last_opt_revision',
- 'type': 'int',
- 'default': 1,
- },
- {
- 'name': 'leaderboard',
- 'type': 'bool',
- 'default': False
- },
- {
- 'name': 'description',
- 'type': 'str',
- 'default': None
- },
- {
- 'name': 'contact',
- 'type': 'str',
- 'default': None
- },
- {
- 'name': 'organization',
- 'type': 'str',
- 'default': None
- },
- {
- 'name': 'proxy',
- 'type': 'str',
- 'default': None
- },
- {
- 'name': 'interval',
- 'type': 'int',
- 'default': 24,
- 'min': 8
- },
- {
- 'name': 'channel_basic',
- 'type': 'bool',
- 'default': True,
- 'desc': 'Share basic cluster information (size, version)',
- },
- {
- 'name': 'channel_ident',
- 'type': 'bool',
- 'default': False,
- 'description': 'Share a user-provided description and/or contact email for the cluster',
- },
- {
- 'name': 'channel_crash',
- 'type': 'bool',
- 'default': True,
- 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
- },
- {
- 'name': 'channel_device',
- 'type': 'bool',
- 'default': True,
- 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
- },
- ]
-
- COMMANDS = [
- {
- "cmd": "telemetry status",
- "desc": "Show current configuration",
- "perm": "r"
- },
- {
- "cmd": "telemetry send "
- "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false "
- "name=license,type=CephString,req=false",
- "desc": "Force sending data to Ceph telemetry",
- "perm": "rw"
- },
- {
- "cmd": "telemetry show "
- "name=channels,type=CephString,n=N,req=False",
- "desc": "Show last report or report to be sent",
- "perm": "r"
- },
- {
- "cmd": "telemetry show-device",
- "desc": "Show last device report or device report to be sent",
- "perm": "r"
- },
- {
- "cmd": "telemetry show-all",
- "desc": "Show report of all channels",
- "perm": "r"
- },
- {
- "cmd": "telemetry on name=license,type=CephString,req=false",
- "desc": "Enable telemetry reports from this cluster",
- "perm": "rw",
- },
- {
- "cmd": "telemetry off",
- "desc": "Disable telemetry reports from this cluster",
- "perm": "rw",
- },
+ Option(name='url',
+ type='str',
+ default='https://telemetry.ceph.com/report'),
+ Option(name='device_url',
+ type='str',
+ default='https://telemetry.ceph.com/device'),
+ Option(name='enabled',
+ type='bool',
+ default=False),
+ Option(name='last_opt_revision',
+ type='int',
+ default=1),
+ Option(name='leaderboard',
+ type='bool',
+ default=False),
+ Option(name='description',
+ type='str',
+ default=None),
+ Option(name='contact',
+ type='str',
+ default=None),
+ Option(name='organization',
+ type='str',
+ default=None),
+ Option(name='proxy',
+ type='str',
+ default=None),
+ Option(name='interval',
+ type='int',
+ default=24,
+ min=8),
+ Option(name='channel_basic',
+ type='bool',
+ default=True,
+ desc='Share basic cluster information (size, version)'),
+ Option(name='channel_ident',
+ type='bool',
+ default=False,
+ desc='Share a user-provided description and/or contact email for the cluster'),
+ Option(name='channel_crash',
+ type='bool',
+ default=True,
+ desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
+ Option(name='channel_device',
+ type='bool',
+ default=True,
+ desc=('Share device health metrics '
+ '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
+ Option(name='channel_perf',
+ type='bool',
+ default=False,
+ desc='Share various performance metrics of a cluster'),
]
@property
- def config_keys(self):
+ def config_keys(self) -> Dict[str, OptionValue]:
return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
self.run = False
- self.last_upload = None
- self.last_report = dict()
- self.report_id = None
- self.salt = None
-
- def config_notify(self):
+ self.db_collection: Optional[List[str]] = None
+ self.last_opted_in_ceph_version: Optional[int] = None
+ self.last_opted_out_ceph_version: Optional[int] = None
+ self.last_upload: Optional[int] = None
+ self.last_report: Dict[str, Any] = dict()
+ self.report_id: Optional[str] = None
+ self.salt: Optional[str] = None
+ self.get_report_lock = Lock()
+ self.config_update_module_option()
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.url = ''
+ self.device_url = ''
+ self.enabled = False
+ self.last_opt_revision = 0
+ self.leaderboard = ''
+ self.interval = 0
+ self.proxy = ''
+ self.channel_basic = True
+ self.channel_ident = False
+ self.channel_crash = True
+ self.channel_device = True
+ self.channel_perf = False
+ self.db_collection = ['basic_base', 'device_base']
+ self.last_opted_in_ceph_version = 17
+ self.last_opted_out_ceph_version = 0
+
+ def config_update_module_option(self) -> None:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.get_module_option(opt['name']))
self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+
+ def config_notify(self) -> None:
+ self.config_update_module_option()
# wake up serve() thread
self.event.set()
- def load(self):
- self.last_upload = self.get_store('last_upload', None)
- if self.last_upload is not None:
- self.last_upload = int(self.last_upload)
+ def load(self) -> None:
+ last_upload = self.get_store('last_upload', None)
+ if last_upload is None:
+ self.last_upload = None
+ else:
+ self.last_upload = int(last_upload)
- self.report_id = self.get_store('report_id', None)
- if self.report_id is None:
+ report_id = self.get_store('report_id', None)
+ if report_id is None:
self.report_id = str(uuid.uuid4())
self.set_store('report_id', self.report_id)
+ else:
+ self.report_id = report_id
- self.salt = self.get_store('salt', None)
- if not self.salt:
+ salt = self.get_store('salt', None)
+ if salt is None:
self.salt = str(uuid.uuid4())
self.set_store('salt', self.salt)
+ else:
+ self.salt = salt
+
+ self.init_collection()
+
+ last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
+ if last_opted_in_ceph_version is None:
+ self.last_opted_in_ceph_version = None
+ else:
+ self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
- def gather_osd_metadata(self, osd_map):
+ last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
+ if last_opted_out_ceph_version is None:
+ self.last_opted_out_ceph_version = None
+ else:
+ self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
+
+ def gather_osd_metadata(self,
+ osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
keys = ["osd_objectstore", "rotational"]
keys += self.metadata_keys
- metadata = dict()
+ metadata: Dict[str, Dict[str, int]] = dict()
for key in keys:
metadata[key] = defaultdict(int)
for osd in osd_map['osds']:
- res = self.get_metadata('osd', str(osd['osd'])).items()
+ res = self.get_metadata('osd', str(osd['osd']))
if res is None:
self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
continue
- for k, v in res:
+ for k, v in res.items():
if k not in keys:
continue
return metadata
- def gather_mon_metadata(self, mon_map):
+ def gather_mon_metadata(self,
+ mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
keys = list()
keys += self.metadata_keys
- metadata = dict()
+ metadata: Dict[str, Dict[str, int]] = dict()
for key in keys:
metadata[key] = defaultdict(int)
for mon in mon_map['mons']:
- res = self.get_metadata('mon', mon['name']).items()
+ res = self.get_metadata('mon', mon['name'])
if res is None:
self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
continue
- for k, v in res:
+ for k, v in res.items():
if k not in keys:
continue
return metadata
- def gather_crush_info(self):
+ def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
+ metadata: Dict[str, Dict[str, int]] = dict()
+
+ res = self.get('mds_metadata') # metadata of *all* mds daemons
+ if res is None or not res:
+ self.log.debug('Could not get metadata for mds daemons')
+ return metadata
+
+ keys = list()
+ keys += self.metadata_keys
+
+ for key in keys:
+ metadata[key] = defaultdict(int)
+
+ for mds in res.values():
+ for k, v in mds.items():
+ if k not in keys:
+ continue
+
+ metadata[k][v] += 1
+
+ return metadata
+
+ def gather_crush_info(self) -> Dict[str, Union[int,
+ bool,
+ List[int],
+ Dict[str, int],
+ Dict[int, int]]]:
osdmap = self.get_osdmap()
crush_raw = osdmap.get_crush()
crush = crush_raw.dump()
- def inc(d, k):
+ BucketKeyT = TypeVar('BucketKeyT', int, str)
+
+ def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
if k in d:
d[k] += 1
else:
d[k] = 1
- device_classes = {}
+ device_classes: Dict[str, int] = {}
for dev in crush['devices']:
inc(device_classes, dev.get('class', ''))
- bucket_algs = {}
- bucket_types = {}
- bucket_sizes = {}
+ bucket_algs: Dict[str, int] = {}
+ bucket_types: Dict[str, int] = {}
+ bucket_sizes: Dict[int, int] = {}
for bucket in crush['buckets']:
if '~' in bucket['name']: # ignore shadow buckets
continue
'bucket_types': bucket_types,
}
- def gather_configs(self):
+ def gather_configs(self) -> Dict[str, List[str]]:
# cluster config options
cluster = set()
r, outb, outs = self.mon_command({
'prefix': 'config dump',
'format': 'json'
- });
+ })
if r != 0:
return {}
try:
cluster.add(name)
# daemon-reported options (which may include ceph.conf)
active = set()
- ls = self.get("modified_config_options");
+ ls = self.get("modified_config_options")
for opt in ls.get('options', {}):
active.add(opt)
return {
'active_changed': sorted(list(active)),
}
- def gather_crashinfo(self):
- crashlist = list()
+ def anonymize_entity_name(self, entity_name:str) -> str:
+ if '.' not in entity_name:
+ self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
+ return entity_name
+
+ (etype, eid) = entity_name.split('.', 1)
+ m = hashlib.sha1()
+ salt = ''
+ if self.salt is not None:
+ salt = self.salt
+ # avoid asserting that salt exists
+ if not self.salt:
+ # do not set self.salt to a temp value
+ salt = f"no_salt_found_{NO_SALT_CNT}"
+ NO_SALT_CNT += 1
+ self.log.debug(f"No salt found, created a temp one: {salt}")
+ m.update(salt.encode('utf-8'))
+ m.update(eid.encode('utf-8'))
+ m.update(salt.encode('utf-8'))
+
+ return etype + '.' + m.hexdigest()
+
+ def get_heap_stats(self) -> Dict[str, dict]:
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
+
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (1/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
+
+ # Grab output from the "daemon.x heap stats" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.', 1)
+ heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+ if heap_stats:
+ if (daemon_type != 'osd'):
+ # Anonymize mon and mds
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = heap_stats
+ else:
+ continue
+
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
+ return result
+
+ def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+ parsed_output = {}
+
+ cmd_dict = {
+ 'prefix': 'heap',
+ 'heapcmd': 'stats'
+ }
+ r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ else:
+ if 'tcmalloc heap stats' in outb:
+ values = [int(i) for i in outb.split() if i.isdigit()]
+ # `categories` must be ordered this way for the correct output to be parsed
+ categories = ['use_by_application',
+ 'page_heap_freelist',
+ 'central_cache_freelist',
+ 'transfer_cache_freelist',
+ 'thread_cache_freelists',
+ 'malloc_metadata',
+ 'actual_memory_used',
+ 'released_to_os',
+ 'virtual_address_space_used',
+ 'spans_in_use',
+ 'thread_heaps_in_use',
+ 'tcmalloc_page_size']
+ if len(values) != len(categories):
+ self.log.error('Received unexpected output from {}.{}; ' \
+ 'number of values should match the number' \
+ 'of expected categories:\n values: len={} {} '\
+ '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+ else:
+ parsed_output = dict(zip(categories, values))
+ else:
+ self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+
+ return parsed_output
+
+ def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
+
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (2/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
+
+ # Grab output from the "dump_mempools" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.', 1)
+ cmd_dict = {
+ 'prefix': 'dump_mempools',
+ 'format': 'json'
+ }
+ r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ continue
+ else:
+ try:
+ # This is where the mempool will land.
+ dump = json.loads(outb)
+ if mode == 'separated':
+ # Anonymize mon and mds
+ if daemon_type != 'osd':
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = dump['mempool']['by_pool']
+ elif mode == 'aggregated':
+ for mem_type in dump['mempool']['by_pool']:
+ result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+ result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+ else:
+ self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
+ except (json.decoder.JSONDecodeError, KeyError) as e:
+ self.log.error("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
+ continue
+
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
+ return result
+
+ def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
+ # Initialize result dict
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(
+ lambda: defaultdict(
+ lambda: defaultdict(
+ lambda: defaultdict(
+ lambda: defaultdict(int))))))
+
+ # Get list of osd ids from the metadata
+ osd_metadata = self.get('osd_metadata')
+
+ # Grab output from the "osd.x perf histogram dump" command
+ for osd_id in osd_metadata:
+ cmd_dict = {
+ 'prefix': 'perf histogram dump',
+ 'id': str(osd_id),
+ 'format': 'json'
+ }
+ r, outb, outs = self.osd_command(cmd_dict)
+ # Check for invalid calls
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ continue
+ else:
+ try:
+ # This is where the histograms will land if there are any.
+ dump = json.loads(outb)
+
+ for histogram in dump['osd']:
+ # Log axis information. There are two axes, each represented
+ # as a dictionary. Both dictionaries are contained inside a
+ # list called 'axes'.
+ axes = []
+ for axis in dump['osd'][histogram]['axes']:
+
+ # This is the dict that contains information for an individual
+ # axis. It will be appended to the 'axes' list at the end.
+ axis_dict: Dict[str, Any] = defaultdict()
+
+ # Collecting information for buckets, min, name, etc.
+ axis_dict['buckets'] = axis['buckets']
+ axis_dict['min'] = axis['min']
+ axis_dict['name'] = axis['name']
+ axis_dict['quant_size'] = axis['quant_size']
+ axis_dict['scale_type'] = axis['scale_type']
+
+ # Collecting ranges; placing them in lists to
+ # improve readability later on.
+ ranges = []
+ for _range in axis['ranges']:
+ _max, _min = None, None
+ if 'max' in _range:
+ _max = _range['max']
+ if 'min' in _range:
+ _min = _range['min']
+ ranges.append([_min, _max])
+ axis_dict['ranges'] = ranges
+
+ # Now that 'axis_dict' contains all the appropriate
+ # information for the current axis, append it to the 'axes' list.
+ # There will end up being two axes in the 'axes' list, since the
+ # histograms are 2D.
+ axes.append(axis_dict)
+
+ # Add the 'axes' list, containing both axes, to result.
+ # At this point, you will see that the name of the key is the string
+ # form of our axes list (str(axes)). This is there so that histograms
+ # with different axis configs will not be combined.
+ # These key names are later dropped when only the values are returned.
+ result[str(axes)][histogram]['axes'] = axes
+
+ # Collect current values and make sure they are in
+ # integer form.
+ values = []
+ for value_list in dump['osd'][histogram]['values']:
+ values.append([int(v) for v in value_list])
+
+ if mode == 'separated':
+ if 'osds' not in result[str(axes)][histogram]:
+ result[str(axes)][histogram]['osds'] = []
+ result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
+
+ elif mode == 'aggregated':
+ # Aggregate values. If 'values' have already been initialized,
+ # we can safely add.
+ if 'values' in result[str(axes)][histogram]:
+ for i in range (0, len(values)):
+ for j in range (0, len(values[i])):
+ values[i][j] += result[str(axes)][histogram]['values'][i][j]
+
+ # Add the values to result.
+ result[str(axes)][histogram]['values'] = values
+
+ # Update num_combined_osds
+ if 'num_combined_osds' not in result[str(axes)][histogram]:
+ result[str(axes)][histogram]['num_combined_osds'] = 1
+ else:
+ result[str(axes)][histogram]['num_combined_osds'] += 1
+ else:
+ self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
+ return list()
+
+ # Sometimes, json errors occur if you give it an empty string.
+ # I am also putting in a catch for a KeyError since it could
+ # happen where the code is assuming that a key exists in the
+ # schema when it doesn't. In either case, we'll handle that
+ # by continuing and collecting what we can from other osds.
+ except (json.decoder.JSONDecodeError, KeyError) as e:
+ self.log.error("Error caught on osd.{}: {}".format(osd_id, e))
+ continue
+
+ return list(result.values())
+
+ def get_io_rate(self) -> dict:
+ return self.get('io_rate')
+
+ def get_stats_per_pool(self) -> dict:
+ result = self.get('pg_dump')['pool_stats']
+
+ # collect application metadata from osd_map
+ osd_map = self.get('osd_map')
+ application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
+
+ # add application to each pool from pg_dump
+ for pool in result:
+ pool['application'] = []
+ # Only include default applications
+ for application in application_metadata[pool['poolid']]:
+ if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
+ pool['application'].append(application)
+
+ return result
+
+ def get_stats_per_pg(self) -> dict:
+ return self.get('pg_dump')['pg_stats']
+
+ def get_rocksdb_stats(self) -> Dict[str, str]:
+ # Initalizers
+ result: Dict[str, str] = defaultdict()
+ version = self.get_rocksdb_version()
+
+ # Update result
+ result['version'] = version
+
+ return result
+
+ def gather_crashinfo(self) -> List[Dict[str, str]]:
+ crashlist: List[Dict[str, str]] = list()
errno, crashids, err = self.remote('crash', 'ls')
if errno:
- return ''
+ return crashlist
for crashid in crashids.split():
- cmd = {'id': crashid}
- errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
+ errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
if errno:
continue
c = json.loads(crashinfo)
+
+ # redact hostname
del c['utsname_hostname']
+
# entity_name might have more than one '.', beware
(etype, eid) = c.get('entity_name', '').split('.', 1)
m = hashlib.sha1()
+ assert self.salt
m.update(self.salt.encode('utf-8'))
m.update(eid.encode('utf-8'))
m.update(self.salt.encode('utf-8'))
c['entity_name'] = etype + '.' + m.hexdigest()
+
+ # redact final line of python tracebacks, as the exception
+ # payload may contain identifying information
+ if 'mgr_module' in c and 'backtrace' in c:
+ # backtrace might be empty
+ if len(c['backtrace']) > 0:
+ c['backtrace'][-1] = '<redacted>'
+
crashlist.append(c)
return crashlist
- def get_active_channels(self):
+ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
+ # Extract perf counter data with get_all_perf_counters(), a method
+ # from mgr/mgr_module.py. This method returns a nested dictionary that
+ # looks a lot like perf schema, except with some additional fields.
+ #
+ # Example of output, a snapshot of a mon daemon:
+ # "mon.b": {
+ # "bluestore.kv_flush_lat": {
+ # "count": 2431,
+ # "description": "Average kv_thread flush latency",
+ # "nick": "fl_l",
+ # "priority": 8,
+ # "type": 5,
+ # "units": 1,
+ # "value": 88814109
+ # },
+ # },
+ all_perf_counters = self.get_all_perf_counters()
+
+ # Initialize 'result' dict
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(
+ lambda: defaultdict(lambda: defaultdict(int))))
+
+ # 'separated' mode
+ anonymized_daemon_dict = {}
+
+ for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
+ daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
+
+ if mode == 'separated':
+ # anonymize individual daemon names except osds
+ if (daemon_type != 'osd'):
+ anonymized_daemon = self.anonymize_entity_name(daemon)
+ anonymized_daemon_dict[anonymized_daemon] = daemon
+ daemon = anonymized_daemon
+
+ # Calculate num combined daemon types if in aggregated mode
+ if mode == 'aggregated':
+ if 'num_combined_daemons' not in result[daemon_type]:
+ result[daemon_type]['num_combined_daemons'] = 1
+ else:
+ result[daemon_type]['num_combined_daemons'] += 1
+
+ for collection in all_perf_counters_by_daemon:
+ # Split the collection to avoid redundancy in final report; i.e.:
+ # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
+ # bluestore: kv_flush_lat, kv_final_lat
+ col_0, col_1 = collection.split('.')
+
+ # Debug log for empty keys. This initially was a problem for prioritycache
+ # perf counters, where the col_0 was empty for certain mon counters:
+ #
+ # "mon.a": { instead of "mon.a": {
+ # "": { "prioritycache": {
+ # "cache_bytes": {...}, "cache_bytes": {...},
+ #
+ # This log is here to detect any future instances of a similar issue.
+ if (daemon == "") or (col_0 == "") or (col_1 == ""):
+ self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
+
+ if mode == 'separated':
+ # Add value to result
+ result[daemon][col_0][col_1]['value'] = \
+ all_perf_counters_by_daemon[collection]['value']
+
+ # Check that 'count' exists, as not all counters have a count field.
+ if 'count' in all_perf_counters_by_daemon[collection]:
+ result[daemon][col_0][col_1]['count'] = \
+ all_perf_counters_by_daemon[collection]['count']
+ elif mode == 'aggregated':
+ # Not every rgw daemon has the same schema. Specifically, each rgw daemon
+ # has a uniquely-named collection that starts off identically (i.e.
+ # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
+ # This bit of code combines these unique counters all under one rgw instance.
+ # Without this check, the schema would remain separeted out in the final report.
+ if col_0[0:11] == "objecter-0x":
+ col_0 = "objecter-0x"
+
+ # Check that the value can be incremented. In some cases,
+ # the files are of type 'pair' (real-integer-pair, integer-integer pair).
+ # In those cases, the value is a dictionary, and not a number.
+ # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
+ if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
+ result[daemon_type][col_0][col_1]['value'] += \
+ all_perf_counters_by_daemon[collection]['value']
+
+ # Check that 'count' exists, as not all counters have a count field.
+ if 'count' in all_perf_counters_by_daemon[collection]:
+ result[daemon_type][col_0][col_1]['count'] += \
+ all_perf_counters_by_daemon[collection]['count']
+ else:
+ self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
+ return {}
+
+ if mode == 'separated':
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
+
+ return result
+
+ def get_active_channels(self) -> List[str]:
r = []
if self.channel_basic:
r.append('basic')
r.append('device')
if self.channel_ident:
r.append('ident')
+ if self.channel_perf:
+ r.append('perf')
return r
- def gather_device_report(self):
+ def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
try:
time_format = self.remote('devicehealth', 'get_time_format')
- except:
- return None
+ except Exception as e:
+ self.log.debug('Unable to format time: {}'.format(e))
+ return {}
cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
min_sample = cutoff.strftime(time_format)
devices = self.get('devices')['devices']
+ if not devices:
+ self.log.debug('Unable to get device info from the mgr.')
+ return {}
- res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
+ # anon-host-id -> anon-devid -> { timestamp -> record }
+ res: Dict[str, Dict[str, Dict[str, str]]] = {}
for d in devices:
devid = d['devid']
try:
# this is a map of stamp -> {device info}
m = self.remote('devicehealth', 'get_recent_device_metrics',
devid, min_sample)
- except:
+ except Exception as e:
+ self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
continue
# anonymize host id
try:
host = d['location'][0]['host']
- except:
+ except (KeyError, IndexError) as e:
+ self.log.error('Unable to get host from device with id "{}": {}'.format(devid, e))
continue
anon_host = self.get_store('host-id/%s' % host)
if not anon_host:
res[anon_host][anon_devid] = m
return res
- def get_latest(self, daemon_type, daemon_name, stat):
+ def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
data = self.get_counter(daemon_type, daemon_name, stat)[stat]
- #self.log.error("get_latest {0} data={1}".format(stat, data))
if data:
return data[-1][1]
else:
return 0
- def compile_report(self, channels=[]):
+ def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
if not channels:
channels = self.get_active_channels()
report = {
- 'leaderboard': False,
+ 'leaderboard': self.leaderboard,
'report_version': 1,
'report_timestamp': datetime.utcnow().isoformat(),
'report_id': self.report_id,
'channels': channels,
'channels_available': ALL_CHANNELS,
'license': LICENSE,
+ 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
+ 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
}
if 'ident' in channels:
- if self.leaderboard:
- report['leaderboard'] = True
for option in ['description', 'contact', 'organization']:
report[option] = getattr(self, option)
service_map = self.get('service_map')
fs_map = self.get('fs_map')
df = self.get('df')
+ df_pools = {pool['id']: pool for pool in df['pools']}
report['created'] = mon_map['created']
report['config'] = self.gather_configs()
# pools
- report['rbd'] = {
- 'num_pools': 0,
- 'num_images_by_pool': [],
- 'mirroring_by_pool': [],
- }
+
+ rbd_num_pools = 0
+ rbd_num_images_by_pool = []
+ rbd_mirroring_by_pool = []
num_pg = 0
report['pools'] = list()
for pool in osd_map['pools']:
if k in ['k', 'm', 'plugin', 'technique',
'crush-failure-domain', 'l']
}
- report['pools'].append(
- {
+ pool_data = {
'pool': pool['pool'],
- 'type': pool['type'],
'pg_num': pool['pg_num'],
'pgp_num': pool['pg_placement_num'],
'size': pool['size'],
'erasure_code_profile': ec_profile,
'cache_mode': pool['cache_mode'],
}
- )
+
+ # basic_pool_usage collection
+ if self.is_enabled_collection(Collection.basic_pool_usage):
+ pool_data['application'] = []
+ for application in pool['application_metadata']:
+ # Only include default applications
+ if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
+ pool_data['application'].append(application)
+ pool_stats = df_pools[pool['pool']]['stats']
+ pool_data['stats'] = { # filter out kb_used
+ 'avail_raw': pool_stats['avail_raw'],
+ 'bytes_used': pool_stats['bytes_used'],
+ 'compress_bytes_used': pool_stats['compress_bytes_used'],
+ 'compress_under_bytes': pool_stats['compress_under_bytes'],
+ 'data_bytes_used': pool_stats['data_bytes_used'],
+ 'dirty': pool_stats['dirty'],
+ 'max_avail': pool_stats['max_avail'],
+ 'objects': pool_stats['objects'],
+ 'omap_bytes_used': pool_stats['omap_bytes_used'],
+ 'percent_used': pool_stats['percent_used'],
+ 'quota_bytes': pool_stats['quota_bytes'],
+ 'quota_objects': pool_stats['quota_objects'],
+ 'rd': pool_stats['rd'],
+ 'rd_bytes': pool_stats['rd_bytes'],
+ 'stored': pool_stats['stored'],
+ 'stored_data': pool_stats['stored_data'],
+ 'stored_omap': pool_stats['stored_omap'],
+ 'stored_raw': pool_stats['stored_raw'],
+ 'wr': pool_stats['wr'],
+ 'wr_bytes': pool_stats['wr_bytes']
+ }
+
+ cast(List[Dict[str, Any]], report['pools']).append(pool_data)
if 'rbd' in pool['application_metadata']:
- report['rbd']['num_pools'] += 1
+ rbd_num_pools += 1
ioctx = self.rados.open_ioctx(pool['pool_name'])
- report['rbd']['num_images_by_pool'].append(
+ rbd_num_images_by_pool.append(
sum(1 for _ in rbd.RBD().list2(ioctx)))
- report['rbd']['mirroring_by_pool'].append(
+ rbd_mirroring_by_pool.append(
rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
+ report['rbd'] = {
+ 'num_pools': rbd_num_pools,
+ 'num_images_by_pool': rbd_num_images_by_pool,
+ 'mirroring_by_pool': rbd_mirroring_by_pool}
# osds
cluster_network = False
'mds.root_rbytes')
rsnaps = self.get_latest('mds', mds['name'],
'mds.root_rsnaps')
- report['fs']['filesystems'].append({
+ report['fs']['filesystems'].append({ # type: ignore
'max_mds': fs['max_mds'],
'ever_allowed_features': fs['ever_allowed_features'],
'explicitly_allowed_features': fs['explicitly_allowed_features'],
'snaps': rsnaps,
})
num_mds += len(fs['info'])
- report['fs']['total_num_mds'] = num_mds
+ report['fs']['total_num_mds'] = num_mds # type: ignore
# daemons
- report['metadata'] = dict()
- report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
- report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
+ report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
+ mon=self.gather_mon_metadata(mon_map))
+
+ if self.is_enabled_collection(Collection.basic_mds_metadata):
+ report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
# host counts
servers = self.list_servers()
self.log.debug('servers %s' % servers)
- report['hosts'] = {
+ hosts = {
'num': len([h for h in servers if h['hostname']]),
}
for t in ['mon', 'mds', 'osd', 'mgr']:
- report['hosts']['num_with_' + t] = len(
- [h for h in servers
- if len([s for s in h['services'] if s['type'] == t])]
- )
+ nr_services = sum(1 for host in servers if
+ any(service for service in cast(List[ServiceInfoT],
+ host['services'])
+ if service['type'] == t))
+ hosts['num_with_' + t] = nr_services
+ report['hosts'] = hosts
report['usage'] = {
'pools': len(df['pools']),
'total_bytes': df['stats']['total_bytes'],
'total_avail_bytes': df['stats']['total_avail_bytes']
}
-
- report['services'] = defaultdict(int)
+ # basic_usage_by_class collection
+ if self.is_enabled_collection(Collection.basic_usage_by_class):
+ report['usage']['stats_by_class'] = {} # type: ignore
+ for device_class in df['stats_by_class']:
+ if device_class in ['hdd', 'ssd', 'nvme']:
+ report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
+
+ services: DefaultDict[str, int] = defaultdict(int)
for key, value in service_map['services'].items():
- report['services'][key] += 1
+ services[key] += 1
if key == 'rgw':
- report['rgw'] = {
- 'count': 0,
- }
+ rgw = {}
zones = set()
- realms = set()
zonegroups = set()
frontends = set()
+ count = 0
d = value.get('daemons', dict())
-
- for k,v in d.items():
+ for k, v in d.items():
if k == 'summary' and v:
- report['rgw'][k] = v
+ rgw[k] = v
elif isinstance(v, dict) and 'metadata' in v:
- report['rgw']['count'] += 1
+ count += 1
zones.add(v['metadata']['zone_id'])
zonegroups.add(v['metadata']['zonegroup_id'])
frontends.add(v['metadata']['frontend_type#0'])
if f2:
frontends.add(f2)
- report['rgw']['zones'] = len(zones)
- report['rgw']['zonegroups'] = len(zonegroups)
- report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
+ rgw['count'] = count
+ rgw['zones'] = len(zones)
+ rgw['zonegroups'] = len(zonegroups)
+ rgw['frontends'] = list(frontends) # sets aren't json-serializable
+ report['rgw'] = rgw
+ report['services'] = services
try:
report['balancer'] = self.remote('balancer', 'gather_telemetry')
'active': False
}
+ # Rook
+ self.get_rook_data(report)
+
if 'crash' in channels:
report['crashes'] = self.gather_crashinfo()
+ if 'perf' in channels:
+ if self.is_enabled_collection(Collection.perf_perf):
+ report['perf_counters'] = self.gather_perf_counters('separated')
+ report['stats_per_pool'] = self.get_stats_per_pool()
+ report['stats_per_pg'] = self.get_stats_per_pg()
+ report['io_rate'] = self.get_io_rate()
+ report['osd_perf_histograms'] = self.get_osd_histograms('separated')
+ report['mempool'] = self.get_mempool('separated')
+ report['heap_stats'] = self.get_heap_stats()
+ report['rocksdb_stats'] = self.get_rocksdb_stats()
+
# NOTE: We do not include the 'device' channel in this report; it is
# sent to a different endpoint.
return report
- def _try_post(self, what, url, report):
+ def get_rook_data(self, report: Dict[str, object]) -> None:
+ r, outb, outs = self.mon_command({
+ 'prefix': 'config-key dump',
+ 'format': 'json'
+ })
+ if r != 0:
+ return
+ try:
+ config_kv_dump = json.loads(outb)
+ except json.decoder.JSONDecodeError:
+ return
+
+ for elem in ROOK_KEYS_BY_COLLECTION:
+ # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
+ # elem[1] is the Collection this key belongs to
+ if self.is_enabled_collection(elem[1]):
+ self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
+
+ def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
+ last_node = key_path.split('/')[-1]
+ for node in key_path.split('/')[0:-1]:
+ if node not in report:
+ report[node] = {}
+ report = report[node] # type: ignore
+
+ # sanity check of keys correctness
+ if not isinstance(report, dict):
+ self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
+ return
+
+ if last_node in report:
+ self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
+ return
+
+ report[last_node] = value
+
+ def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
self.log.info('Sending %s to: %s' % (what, url))
proxies = dict()
if self.proxy:
return fail_reason
return None
- def send(self, report, endpoint=None):
+ class EndPoint(enum.Enum):
+ ceph = 'ceph'
+ device = 'device'
+
+ def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
+ '''
+ Find collections that are available in the module, but are not in the db
+ '''
+ if self.db_collection is None:
+ return None
+
+ if not channels:
+ channels = ALL_CHANNELS
+ else:
+ for ch in channels:
+ if ch not in ALL_CHANNELS:
+ self.log.debug(f"invalid channel name: {ch}")
+ return None
+
+ new_collection : List[Collection] = []
+
+ for c in MODULE_COLLECTION:
+ if c['name'].name not in self.db_collection:
+ if c['channel'] in channels:
+ new_collection.append(c['name'])
+
+ return new_collection
+
+ def is_major_upgrade(self) -> bool:
+ '''
+ Returns True only if the user last opted-in to an older major
+ '''
+ if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
+ # we do not know what Ceph version was when the user last opted-in,
+ # thus we do not wish to nag in case of a major upgrade
+ return False
+
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+
+ if mon_min - self.last_opted_in_ceph_version > 0:
+ self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
+ return True
+
+ return False
+
+ def is_opted_in(self) -> bool:
+ # If len is 0 it means that the user is either opted-out (never
+ # opted-in, or invoked `telemetry off`), or they upgraded from a
+ # telemetry revision 1 or 2, which required to re-opt in to revision 3,
+ # regardless, hence is considered as opted-out
+ if self.db_collection is None:
+ return False
+ return len(self.db_collection) > 0
+
+ def should_nag(self) -> bool:
+ # Find delta between opted-in collections and module collections;
+ # nag only if module has a collection which is not in db, and nag == True.
+
+ # We currently do not nag if the user is opted-out (or never opted-in).
+ # If we wish to do this in the future, we need to have a tri-mode state
+ # (opted in, opted out, no action yet), and it needs to be guarded by a
+ # config option (so that nagging can be turned off via config).
+ # We also need to add a last_opted_out_ceph_version variable, for the
+ # major upgrade check.
+
+ # check if there are collections the user is not opt-in to
+ # that we should nag about
+ if self.db_collection is not None:
+ for c in MODULE_COLLECTION:
+ if c['name'].name not in self.db_collection:
+ if c['nag'] == True:
+ self.log.debug(f"The collection: {c['name']} is not reported")
+ return True
+
+ # user might be opted-in to the most recent collection, or there is no
+ # new collection which requires nagging about; thus nag in case it's a
+ # major upgrade and there are new collections
+ # (which their own nag == False):
+ new_collections = False
+ col_delta = self.collection_delta()
+ if col_delta is not None and len(col_delta) > 0:
+ new_collections = True
+
+ return self.is_major_upgrade() and new_collections
+
+ def init_collection(self) -> None:
+ # We fetch from db the collections the user had already opted-in to.
+ # During the transition the results will be empty, but the user might
+ # be opted-in to an older version (e.g. revision = 3)
+
+ collection = self.get_store('collection')
+
+ if collection is not None:
+ self.db_collection = json.loads(collection)
+
+ if self.db_collection is None:
+ # happens once on upgrade
+ if not self.enabled:
+ # user is not opted-in
+ self.set_store('collection', json.dumps([]))
+ self.log.debug("user is not opted-in")
+ else:
+ # user is opted-in, verify the revision:
+ if self.last_opt_revision == REVISION:
+ self.log.debug(f"telemetry revision is {REVISION}")
+ base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
+ self.set_store('collection', json.dumps(base_collection))
+ else:
+ # user is opted-in to an older version, meaning they need
+ # to re-opt in regardless
+ self.set_store('collection', json.dumps([]))
+ self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
+
+ # reload collection after setting
+ collection = self.get_store('collection')
+ if collection is not None:
+ self.db_collection = json.loads(collection)
+ else:
+ raise RuntimeError('collection is None after initial setting')
+ else:
+ # user has already upgraded
+ self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
+
+ def is_enabled_collection(self, collection: Collection) -> bool:
+ if self.db_collection is None:
+ return False
+ return collection.name in self.db_collection
+
+ def opt_in_all_collections(self) -> None:
+ """
+ Opt-in to all collections; Update db with the currently available collections in the module
+ """
+ if self.db_collection is None:
+ raise RuntimeError('db_collection is None after initial setting')
+
+ for c in MODULE_COLLECTION:
+ if c['name'].name not in self.db_collection:
+ self.db_collection.append(c['name'])
+
+ self.set_store('collection', json.dumps(self.db_collection))
+
+ def send(self,
+ report: Dict[str, Dict[str, str]],
+ endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
if not endpoint:
- endpoint = ['ceph', 'device']
+ endpoint = [self.EndPoint.ceph, self.EndPoint.device]
failed = []
success = []
self.log.debug('Send endpoints %s' % endpoint)
for e in endpoint:
- if e == 'ceph':
+ if e == self.EndPoint.ceph:
fail_reason = self._try_post('ceph report', self.url, report)
if fail_reason:
failed.append(fail_reason)
self.set_store('last_upload', str(now))
success.append('Ceph report sent to {0}'.format(self.url))
self.log.info('Sent report to {0}'.format(self.url))
- elif e == 'device':
+ elif e == self.EndPoint.device:
if 'device' in self.get_active_channels():
devices = self.gather_device_report()
- num_devs = 0
- num_hosts = 0
- for host, ls in devices.items():
- self.log.debug('host %s devices %s' % (host, ls))
- if not len(ls):
- continue
- fail_reason = self._try_post('devices', self.device_url,
- ls)
- if fail_reason:
- failed.append(fail_reason)
- else:
- num_devs += len(ls)
- num_hosts += 1
- if num_devs:
- success.append('Reported %d devices across %d hosts' % (
- num_devs, len(devices)))
+ if devices:
+ num_devs = 0
+ num_hosts = 0
+ for host, ls in devices.items():
+ self.log.debug('host %s devices %s' % (host, ls))
+ if not len(ls):
+ continue
+ fail_reason = self._try_post('devices', self.device_url,
+ ls)
+ if fail_reason:
+ failed.append(fail_reason)
+ else:
+ num_devs += len(ls)
+ num_hosts += 1
+ if num_devs:
+ success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
+ num_devs, num_hosts, len(devices)))
+ else:
+ fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
+ failed.append(fail_reason)
+ self.log.error(fail_reason)
if failed:
return 1, '', '\n'.join(success + failed)
return 0, '', '\n'.join(success)
- def handle_command(self, inbuf, command):
- if command['prefix'] == 'telemetry status':
- r = {}
- for opt in self.MODULE_OPTIONS:
- r[opt['name']] = getattr(self, opt['name'])
- r['last_upload'] = time.ctime(self.last_upload) if self.last_upload else self.last_upload
- return 0, json.dumps(r, indent=4, sort_keys=True), ''
- elif command['prefix'] == 'telemetry on':
- if command.get('license') != LICENSE:
- return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
- self.on()
- return 0, '', ''
- elif command['prefix'] == 'telemetry off':
- self.off()
- return 0, '', ''
- elif command['prefix'] == 'telemetry send':
- if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and command.get('license') != LICENSE:
- self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
- return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
- self.last_report = self.compile_report()
- return self.send(self.last_report, command.get('endpoint'))
-
- elif command['prefix'] == 'telemetry show':
- report = self.get_report(channels=command.get('channels', None))
- report = json.dumps(report, indent=4, sort_keys=True)
- if self.channel_device:
- report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.'
- return 0, report, ''
- elif command['prefix'] == 'telemetry show-device':
- return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), ''
- elif command['prefix'] == 'telemetry show-all':
- return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), ''
+ def format_perf_histogram(self, report: Dict[str, Any]) -> None:
+ # Formatting the perf histograms so they are human-readable. This will change the
+ # ranges and values, which are currently in list form, into strings so that
+ # they are displayed horizontally instead of vertically.
+ try:
+ # Formatting ranges and values in osd_perf_histograms
+ mode = 'osd_perf_histograms'
+ for config in report[mode]:
+ for histogram in config:
+ # Adjust ranges by converting lists into strings
+ for axis in config[histogram]['axes']:
+ for i in range(0, len(axis['ranges'])):
+ axis['ranges'][i] = str(axis['ranges'][i])
+
+ for osd in config[histogram]['osds']:
+ for i in range(0, len(osd['values'])):
+ osd['values'][i] = str(osd['values'][i])
+ except KeyError:
+ # If the perf channel is not enabled, there should be a KeyError since
+ # 'osd_perf_histograms' would not be present in the report. In that case,
+ # the show function should pass as usual without trying to format the
+ # histograms.
+ pass
+
+ def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Enable or disable a list of channels
+ '''
+ if not self.enabled:
+ # telemetry should be on for channels to be toggled
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ if channels is None:
+ msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
+ return 0, msg, ''
+
+ state = action == 'enable'
+ msg = ''
+ for c in channels:
+ if c not in ALL_CHANNELS:
+ msg = f"{msg}{c} is not a valid channel name. "\
+ f"Available channels: {ALL_CHANNELS}.\n"
+ else:
+ self.set_module_option(f"channel_{c}", state)
+ setattr(self,
+ f"channel_{c}",
+ state)
+ msg = f"{msg}channel_{c} is {action}d\n"
+
+ return 0, msg, ''
+
+ @CLIReadCommand('telemetry status')
+ def status(self) -> Tuple[int, str, str]:
+ '''
+ Show current configuration
+ '''
+ r = {}
+ for opt in self.MODULE_OPTIONS:
+ r[opt['name']] = getattr(self, opt['name'])
+ r['last_upload'] = (time.ctime(self.last_upload)
+ if self.last_upload else self.last_upload)
+ return 0, json.dumps(r, indent=4, sort_keys=True), ''
+
+ @CLIReadCommand('telemetry diff')
+ def diff(self) -> Tuple[int, str, str]:
+ '''
+ Show the diff between opted-in collection and available collection
+ '''
+ diff = []
+ keys = ['nag']
+
+ for c in MODULE_COLLECTION:
+ if not self.is_enabled_collection(c['name']):
+ diff.append({key: val for key, val in c.items() if key not in keys})
+
+ r = None
+ if diff == []:
+ r = "Telemetry is up to date"
+ else:
+ r = json.dumps(diff, indent=4, sort_keys=True)
+
+ return 0, r, ''
+
+ @CLICommand('telemetry on')
+ def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Enable telemetry reports from this cluster
+ '''
+ if license != LICENSE:
+ return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
+To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
else:
- return (-errno.EINVAL, '',
- "Command not found '{0}'".format(command['prefix']))
+ self.set_module_option('enabled', True)
+ self.enabled = True
+ self.opt_in_all_collections()
- def on(self):
- self.set_module_option('enabled', True)
- self.set_module_option('last_opt_revision', REVISION)
+ # for major releases upgrade nagging
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+ self.set_store('last_opted_in_ceph_version', str(mon_min))
+ self.last_opted_in_ceph_version = mon_min
+
+ msg = 'Telemetry is on.'
+ disabled_channels = ''
+ active_channels = self.get_active_channels()
+ for c in ALL_CHANNELS:
+ if c not in active_channels and c != 'ident':
+ disabled_channels = f"{disabled_channels} {c}"
+
+ if len(disabled_channels) > 0:
+ msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
+ f"`ceph telemetry enable channel{disabled_channels}`"
+
+ # wake up serve() to reset health warning
+ self.event.set()
+
+ return 0, msg, ''
+
+ @CLICommand('telemetry off')
+ def off(self) -> Tuple[int, str, str]:
+ '''
+ Disable telemetry reports from this cluster
+ '''
+ if not self.enabled:
+ # telemetry is already off
+ msg = 'Telemetry is currently not enabled, nothing to turn off. '\
+ 'Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
- def off(self):
self.set_module_option('enabled', False)
- self.set_module_option('last_opt_revision', 1)
+ self.enabled = False
+ self.set_store('collection', json.dumps([]))
+ self.db_collection = []
+
+ # we might need this info in the future, in case
+ # of nagging when user is opted-out
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+ self.set_store('last_opted_out_ceph_version', str(mon_min))
+ self.last_opted_out_ceph_version = mon_min
+
+ msg = 'Telemetry is now disabled.'
+ return 0, msg, ''
+
+ @CLIReadCommand('telemetry enable channel all')
+ def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
+ '''
+ Enable all channels
+ '''
+ return self.toggle_channel('enable', channels)
+
+ @CLIReadCommand('telemetry enable channel')
+ def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Enable a list of channels
+ '''
+ return self.toggle_channel('enable', channels)
+
+ @CLIReadCommand('telemetry disable channel all')
+ def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
+ '''
+ Disable all channels
+ '''
+ return self.toggle_channel('disable', channels)
+
+ @CLIReadCommand('telemetry disable channel')
+ def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Disable a list of channels
+ '''
+ return self.toggle_channel('disable', channels)
+
+ @CLIReadCommand('telemetry channel ls')
+ def channel_ls(self) -> Tuple[int, str, str]:
+ '''
+ List all channels
+ '''
+ table = PrettyTable(
+ [
+ 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
+ ],
+ border=False)
+ table.align['NAME'] = 'l'
+ table.align['ENABLED'] = 'l'
+ table.align['DEFAULT'] = 'l'
+ table.align['DESC'] = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 4
+
+ for c in ALL_CHANNELS:
+ enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
+ for o in self.MODULE_OPTIONS:
+ if o['name'] == f"channel_{c}":
+ default = "ON" if o.get('default', None) else "OFF"
+ desc = o.get('desc', None)
+
+ table.add_row((
+ c,
+ enabled,
+ default,
+ desc,
+ ))
+
+ return 0, table.get_string(sortby="NAME"), ''
+
+ @CLIReadCommand('telemetry collection ls')
+ def collection_ls(self) -> Tuple[int, str, str]:
+ '''
+ List all collections
+ '''
+ col_delta = self.collection_delta()
+ msg = ''
+ if col_delta is not None and len(col_delta) > 0:
+ msg = f"New collections are available:\n" \
+ f"{sorted([c.name for c in col_delta])}\n" \
+ f"Run `ceph telemetry on` to opt-in to these collections.\n"
+
+ table = PrettyTable(
+ [
+ 'NAME', 'STATUS', 'DESC',
+ ],
+ border=False)
+ table.align['NAME'] = 'l'
+ table.align['STATUS'] = 'l'
+ table.align['DESC'] = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 4
+
+ for c in MODULE_COLLECTION:
+ name = c['name']
+ opted_in = self.is_enabled_collection(name)
+ channel_enabled = getattr(self, f"channel_{c['channel']}")
+
+ status = ''
+ if channel_enabled and opted_in:
+ status = "REPORTING"
+ else:
+ why = ''
+ delimiter = ''
+
+ if not opted_in:
+ why += "NOT OPTED-IN"
+ delimiter = ', '
+ if not channel_enabled:
+ why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
+
+ status = f"NOT REPORTING: {why}"
+
+ desc = c['description']
+
+ table.add_row((
+ name,
+ status,
+ desc,
+ ))
+
+ if len(msg):
+ # add a new line between message and table output
+ msg = f"{msg} \n"
+
+ return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
+
+ @CLICommand('telemetry send')
+ def do_send(self,
+ endpoint: Optional[List[EndPoint]] = None,
+ license: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Send a sample report
+ '''
+ if not self.is_opted_in() and license != LICENSE:
+ self.log.debug(('A telemetry send attempt while opted-out. '
+ 'Asking for license agreement'))
+ return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
+To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
+Please consider enabling the telemetry module with 'ceph telemetry on'.'''
+ else:
+ self.last_report = self.compile_report()
+ return self.send(self.last_report, endpoint)
+
+ @CLIReadCommand('telemetry show')
+ def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Show a sample report of opted-in collections (except for 'device')
+ '''
+ if not self.enabled:
+ # if telemetry is off, no report is being sent, hence nothing to show
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ report = self.get_report_locked(channels=channels)
+ self.format_perf_histogram(report)
+ report = json.dumps(report, indent=4, sort_keys=True)
- def get_report(self, report_type='default', channels=None):
+ if self.channel_device:
+ report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
+
+ return 0, report, ''
+
+ @CLIReadCommand('telemetry preview')
+ def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Preview a sample report of the most recent collections available (except for 'device')
+ '''
+ report = {}
+
+ # We use a lock to prevent a scenario where the user wishes to preview
+ # the report, and at the same time the module hits the interval of
+ # sending a report with the opted-in collection, which has less data
+ # than in the preview report.
+ col_delta = self.collection_delta()
+ with self.get_report_lock:
+ if col_delta is not None and len(col_delta) == 0:
+ # user is already opted-in to the most recent collection
+ msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
+ return 0, msg, ''
+ else:
+ # there are collections the user is not opted-in to
+ next_collection = []
+
+ for c in MODULE_COLLECTION:
+ next_collection.append(c['name'].name)
+
+ opted_in_collection = self.db_collection
+ self.db_collection = next_collection
+ report = self.get_report(channels=channels)
+ self.db_collection = opted_in_collection
+
+ self.format_perf_histogram(report)
+ report = json.dumps(report, indent=4, sort_keys=True)
+
+ if self.channel_device:
+ report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
+
+ return 0, report, ''
+
+ @CLIReadCommand('telemetry show-device')
+ def show_device(self) -> Tuple[int, str, str]:
+ '''
+ Show a sample device report
+ '''
+ if not self.enabled:
+ # if telemetry is off, no report is being sent, hence nothing to show
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample device reports with `ceph telemetry preview-device`.'
+ return 0, msg, ''
+
+ if not self.channel_device:
+ # if device channel is off, device report is not being sent, hence nothing to show
+ msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
+ 'Preview sample device reports with `ceph telemetry preview-device`.'
+ return 0, msg, ''
+
+ return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
+
+ @CLIReadCommand('telemetry preview-device')
+ def preview_device(self) -> Tuple[int, str, str]:
+ '''
+ Preview a sample device report of the most recent device collection
+ '''
+ report = {}
+
+ device_col_delta = self.collection_delta(['device'])
+ with self.get_report_lock:
+ if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
+ # user is already opted-in to the most recent device collection,
+ # and device channel is on, thus `show-device` should be called
+ msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
+ return 0, msg, ''
+
+ # either the user is not opted-in at all, or there are collections
+ # they are not opted-in to
+ next_collection = []
+
+ for c in MODULE_COLLECTION:
+ next_collection.append(c['name'].name)
+
+ opted_in_collection = self.db_collection
+ self.db_collection = next_collection
+ report = self.get_report('device')
+ self.db_collection = opted_in_collection
+
+ report = json.dumps(report, indent=4, sort_keys=True)
+ return 0, report, ''
+
+ @CLIReadCommand('telemetry show-all')
+ def show_all(self) -> Tuple[int, str, str]:
+ '''
+ Show a sample report of all enabled channels (including 'device' channel)
+ '''
+ if not self.enabled:
+ # if telemetry is off, no report is being sent, hence nothing to show
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ if not self.channel_device:
+ # device channel is off, no need to display its report
+ return 0, json.dumps(self.get_report_locked('default'), indent=4, sort_keys=True), ''
+
+ # telemetry is on and device channel is enabled, show both
+ return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), ''
+
+ @CLIReadCommand('telemetry preview-all')
+ def preview_all(self) -> Tuple[int, str, str]:
+ '''
+ Preview a sample report of the most recent collections available of all channels (including 'device')
+ '''
+ report = {}
+
+ col_delta = self.collection_delta()
+ with self.get_report_lock:
+ if col_delta is not None and len(col_delta) == 0:
+ # user is already opted-in to the most recent collection
+ msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
+ return 0, msg, ''
+
+ # there are collections the user is not opted-in to
+ next_collection = []
+
+ for c in MODULE_COLLECTION:
+ next_collection.append(c['name'].name)
+
+ opted_in_collection = self.db_collection
+ self.db_collection = next_collection
+ report = self.get_report('all')
+ self.db_collection = opted_in_collection
+
+ self.format_perf_histogram(report)
+ report = json.dumps(report, indent=4, sort_keys=True)
+
+ return 0, report, ''
+
+ def get_report_locked(self,
+ report_type: str = 'default',
+ channels: Optional[List[str]] = None) -> Dict[str, Any]:
+ '''
+ A wrapper around get_report to allow for compiling a report of the most recent module collections
+ '''
+ with self.get_report_lock:
+ return self.get_report(report_type, channels)
+
+ def get_report(self,
+ report_type: str = 'default',
+ channels: Optional[List[str]] = None) -> Dict[str, Any]:
if report_type == 'default':
return self.compile_report(channels=channels)
elif report_type == 'device':
'device_report': self.gather_device_report()}
return {}
- def self_test(self):
+ def self_test(self) -> None:
report = self.compile_report()
if len(report) == 0:
raise RuntimeError('Report is empty')
if 'report_id' not in report:
raise RuntimeError('report_id not found in report')
- def shutdown(self):
+ def shutdown(self) -> None:
self.run = False
self.event.set()
- def refresh_health_checks(self):
+ def refresh_health_checks(self) -> None:
health_checks = {}
- if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
+ # TODO do we want to nag also in case the user is not opted-in?
+ if self.enabled and self.should_nag():
health_checks['TELEMETRY_CHANGED'] = {
'severity': 'warning',
'summary': 'Telemetry requires re-opt-in',
'detail': [
- 'telemetry report includes new information; must re-opt-in (or out)'
+ 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
]
}
self.set_health_checks(health_checks)
- def serve(self):
+ def serve(self) -> None:
self.load()
- self.config_notify()
self.run = True
self.log.debug('Waiting for mgr to warm up')
- self.event.wait(10)
+ time.sleep(10)
while self.run:
self.event.clear()
self.refresh_health_checks()
- if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
+ if not self.is_opted_in():
self.log.debug('Not sending report until user re-opts-in')
self.event.wait(1800)
continue
continue
now = int(time.time())
- if not self.last_upload or (now - self.last_upload) > \
- self.interval * 3600:
+ if not self.last_upload or \
+ (now - self.last_upload) > self.interval * 3600:
self.log.info('Compiling and sending report to %s',
self.url)
try:
self.last_report = self.compile_report()
- except:
+ except Exception:
self.log.exception('Exception while compiling report:')
self.send(self.last_report)
self.log.debug('Sleeping for %d seconds', sleep)
self.event.wait(sleep)
- def self_test(self):
- self.compile_report()
- return True
-
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
return True, ''