2 Telemetry module for ceph-mgr
4 Collect statistics from Ceph cluster and send this back to the Ceph project
17 from datetime
import datetime
, timedelta
18 from prettytable
import PrettyTable
19 from threading
import Event
, Lock
20 from collections
import defaultdict
21 from typing
import cast
, Any
, DefaultDict
, Dict
, List
, Optional
, Tuple
, TypeVar
, TYPE_CHECKING
, Union
23 from mgr_module
import CLICommand
, CLIReadCommand
, MgrModule
, Option
, OptionValue
, ServiceInfoT
26 ALL_CHANNELS
= ['basic', 'ident', 'crash', 'device', 'perf']
28 LICENSE
= 'sharing-1-0'
29 LICENSE_NAME
= 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL
= 'https://cdla.io/sharing-1-0/'
33 # Latest revision of the telemetry report. Bump this each time we make
37 # History of revisions
38 # --------------------
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
48 # - added explicit license acknowledgement to the opt-in process
51 # - added device health metrics (i.e., SMART data, minus serial number)
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
62 class Collection(str, enum
.Enum
):
63 basic_base
= 'basic_base'
64 device_base
= 'device_base'
65 crash_base
= 'crash_base'
66 ident_base
= 'ident_base'
67 perf_perf
= 'perf_perf'
68 basic_mds_metadata
= 'basic_mds_metadata'
69 basic_pool_usage
= 'basic_pool_usage'
70 basic_usage_by_class
= 'basic_usage_by_class'
71 basic_rook_v01
= 'basic_rook_v01'
72 perf_memory_metrics
= 'perf_memory_metrics'
74 MODULE_COLLECTION
: List
[Dict
] = [
76 "name": Collection
.basic_base
,
77 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
82 "name": Collection
.device_base
,
83 "description": "Information about device health metrics",
88 "name": Collection
.crash_base
,
89 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
94 "name": Collection
.ident_base
,
95 "description": "User-provided identifying information about the cluster",
100 "name": Collection
.perf_perf
,
101 "description": "Information about performance counters of the cluster",
106 "name": Collection
.basic_mds_metadata
,
107 "description": "MDS metadata",
112 "name": Collection
.basic_pool_usage
,
113 "description": "Default pool application and usage statistics",
118 "name": Collection
.basic_usage_by_class
,
119 "description": "Default device class usage statistics",
124 "name": Collection
.basic_rook_v01
,
125 "description": "Basic Rook deployment data",
130 "name": Collection
.perf_memory_metrics
,
131 "description": "Heap stats and mempools for mon and mds",
137 ROOK_KEYS_BY_COLLECTION
: List
[Tuple
[str, Collection
]] = [
138 # Note: a key cannot be both a node and a leaf, e.g.
141 ("rook/version", Collection
.basic_rook_v01
),
142 ("rook/kubernetes/version", Collection
.basic_rook_v01
),
143 ("rook/csi/version", Collection
.basic_rook_v01
),
144 ("rook/node/count/kubernetes-total", Collection
.basic_rook_v01
),
145 ("rook/node/count/with-ceph-daemons", Collection
.basic_rook_v01
),
146 ("rook/node/count/with-csi-rbd-plugin", Collection
.basic_rook_v01
),
147 ("rook/node/count/with-csi-cephfs-plugin", Collection
.basic_rook_v01
),
148 ("rook/node/count/with-csi-nfs-plugin", Collection
.basic_rook_v01
),
149 ("rook/usage/storage-class/count/total", Collection
.basic_rook_v01
),
150 ("rook/usage/storage-class/count/rbd", Collection
.basic_rook_v01
),
151 ("rook/usage/storage-class/count/cephfs", Collection
.basic_rook_v01
),
152 ("rook/usage/storage-class/count/nfs", Collection
.basic_rook_v01
),
153 ("rook/usage/storage-class/count/bucket", Collection
.basic_rook_v01
),
154 ("rook/cluster/storage/device-set/count/total", Collection
.basic_rook_v01
),
155 ("rook/cluster/storage/device-set/count/portable", Collection
.basic_rook_v01
),
156 ("rook/cluster/storage/device-set/count/non-portable", Collection
.basic_rook_v01
),
157 ("rook/cluster/mon/count", Collection
.basic_rook_v01
),
158 ("rook/cluster/mon/allow-multiple-per-node", Collection
.basic_rook_v01
),
159 ("rook/cluster/mon/max-id", Collection
.basic_rook_v01
),
160 ("rook/cluster/mon/pvc/enabled", Collection
.basic_rook_v01
),
161 ("rook/cluster/mon/stretch/enabled", Collection
.basic_rook_v01
),
162 ("rook/cluster/network/provider", Collection
.basic_rook_v01
),
163 ("rook/cluster/external-mode", Collection
.basic_rook_v01
),
166 class Module(MgrModule
):
172 "kernel_description",
174 "distro_description",
181 default
='https://telemetry.ceph.com/report'),
182 Option(name
='device_url',
184 default
='https://telemetry.ceph.com/device'),
185 Option(name
='enabled',
188 Option(name
='last_opt_revision',
191 Option(name
='leaderboard',
194 Option(name
='description',
197 Option(name
='contact',
200 Option(name
='organization',
206 Option(name
='interval',
210 Option(name
='channel_basic',
213 desc
='Share basic cluster information (size, version)'),
214 Option(name
='channel_ident',
217 desc
='Share a user-provided description and/or contact email for the cluster'),
218 Option(name
='channel_crash',
221 desc
='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
222 Option(name
='channel_device',
225 desc
=('Share device health metrics '
226 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
227 Option(name
='channel_perf',
230 desc
='Share various performance metrics of a cluster'),
234 def config_keys(self
) -> Dict
[str, OptionValue
]:
235 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
237 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
238 super(Module
, self
).__init
__(*args
, **kwargs
)
241 self
.db_collection
: Optional
[List
[str]] = None
242 self
.last_opted_in_ceph_version
: Optional
[int] = None
243 self
.last_opted_out_ceph_version
: Optional
[int] = None
244 self
.last_upload
: Optional
[int] = None
245 self
.last_report
: Dict
[str, Any
] = dict()
246 self
.report_id
: Optional
[str] = None
247 self
.salt
: Optional
[str] = None
248 self
.get_report_lock
= Lock()
249 self
.config_update_module_option()
250 # for mypy which does not run the code
255 self
.last_opt_revision
= 0
256 self
.leaderboard
= ''
259 self
.channel_basic
= True
260 self
.channel_ident
= False
261 self
.channel_crash
= True
262 self
.channel_device
= True
263 self
.channel_perf
= False
264 self
.db_collection
= ['basic_base', 'device_base']
265 self
.last_opted_in_ceph_version
= 17
266 self
.last_opted_out_ceph_version
= 0
268 def config_update_module_option(self
) -> None:
269 for opt
in self
.MODULE_OPTIONS
:
272 self
.get_module_option(opt
['name']))
273 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
275 def config_notify(self
) -> None:
276 self
.config_update_module_option()
277 # wake up serve() thread
280 def load(self
) -> None:
281 last_upload
= self
.get_store('last_upload', None)
282 if last_upload
is None:
283 self
.last_upload
= None
285 self
.last_upload
= int(last_upload
)
287 report_id
= self
.get_store('report_id', None)
288 if report_id
is None:
289 self
.report_id
= str(uuid
.uuid4())
290 self
.set_store('report_id', self
.report_id
)
292 self
.report_id
= report_id
294 salt
= self
.get_store('salt', None)
296 self
.salt
= str(uuid
.uuid4())
297 self
.set_store('salt', self
.salt
)
301 self
.init_collection()
303 last_opted_in_ceph_version
= self
.get_store('last_opted_in_ceph_version', None)
304 if last_opted_in_ceph_version
is None:
305 self
.last_opted_in_ceph_version
= None
307 self
.last_opted_in_ceph_version
= int(last_opted_in_ceph_version
)
309 last_opted_out_ceph_version
= self
.get_store('last_opted_out_ceph_version', None)
310 if last_opted_out_ceph_version
is None:
311 self
.last_opted_out_ceph_version
= None
313 self
.last_opted_out_ceph_version
= int(last_opted_out_ceph_version
)
315 def gather_osd_metadata(self
,
316 osd_map
: Dict
[str, List
[Dict
[str, int]]]) -> Dict
[str, Dict
[str, int]]:
317 keys
= ["osd_objectstore", "rotational"]
318 keys
+= self
.metadata_keys
320 metadata
: Dict
[str, Dict
[str, int]] = dict()
322 metadata
[key
] = defaultdict(int)
324 for osd
in osd_map
['osds']:
325 res
= self
.get_metadata('osd', str(osd
['osd']))
327 self
.log
.debug('Could not get metadata for osd.%s' % str(osd
['osd']))
329 for k
, v
in res
.items():
337 def gather_mon_metadata(self
,
338 mon_map
: Dict
[str, List
[Dict
[str, str]]]) -> Dict
[str, Dict
[str, int]]:
340 keys
+= self
.metadata_keys
342 metadata
: Dict
[str, Dict
[str, int]] = dict()
344 metadata
[key
] = defaultdict(int)
346 for mon
in mon_map
['mons']:
347 res
= self
.get_metadata('mon', mon
['name'])
349 self
.log
.debug('Could not get metadata for mon.%s' % (mon
['name']))
351 for k
, v
in res
.items():
359 def gather_mds_metadata(self
) -> Dict
[str, Dict
[str, int]]:
360 metadata
: Dict
[str, Dict
[str, int]] = dict()
362 res
= self
.get('mds_metadata') # metadata of *all* mds daemons
363 if res
is None or not res
:
364 self
.log
.debug('Could not get metadata for mds daemons')
368 keys
+= self
.metadata_keys
371 metadata
[key
] = defaultdict(int)
373 for mds
in res
.values():
374 for k
, v
in mds
.items():
382 def gather_crush_info(self
) -> Dict
[str, Union
[int,
387 osdmap
= self
.get_osdmap()
388 crush_raw
= osdmap
.get_crush()
389 crush
= crush_raw
.dump()
391 BucketKeyT
= TypeVar('BucketKeyT', int, str)
393 def inc(d
: Dict
[BucketKeyT
, int], k
: BucketKeyT
) -> None:
399 device_classes
: Dict
[str, int] = {}
400 for dev
in crush
['devices']:
401 inc(device_classes
, dev
.get('class', ''))
403 bucket_algs
: Dict
[str, int] = {}
404 bucket_types
: Dict
[str, int] = {}
405 bucket_sizes
: Dict
[int, int] = {}
406 for bucket
in crush
['buckets']:
407 if '~' in bucket
['name']: # ignore shadow buckets
409 inc(bucket_algs
, bucket
['alg'])
410 inc(bucket_types
, bucket
['type_id'])
411 inc(bucket_sizes
, len(bucket
['items']))
414 'num_devices': len(crush
['devices']),
415 'num_types': len(crush
['types']),
416 'num_buckets': len(crush
['buckets']),
417 'num_rules': len(crush
['rules']),
418 'device_classes': list(device_classes
.values()),
419 'tunables': crush
['tunables'],
420 'compat_weight_set': '-1' in crush
['choose_args'],
421 'num_weight_sets': len(crush
['choose_args']),
422 'bucket_algs': bucket_algs
,
423 'bucket_sizes': bucket_sizes
,
424 'bucket_types': bucket_types
,
427 def gather_configs(self
) -> Dict
[str, List
[str]]:
428 # cluster config options
430 r
, outb
, outs
= self
.mon_command({
431 'prefix': 'config dump',
437 dump
= json
.loads(outb
)
438 except json
.decoder
.JSONDecodeError
:
441 name
= opt
.get('name')
444 # daemon-reported options (which may include ceph.conf)
446 ls
= self
.get("modified_config_options")
447 for opt
in ls
.get('options', {}):
450 'cluster_changed': sorted(list(cluster
)),
451 'active_changed': sorted(list(active
)),
454 def anonymize_entity_name(self
, entity_name
:str) -> str:
455 if '.' not in entity_name
:
456 self
.log
.debug(f
"Cannot split entity name ({entity_name}), no '.' is found")
459 (etype
, eid
) = entity_name
.split('.', 1)
462 if self
.salt
is not None:
464 # avoid asserting that salt exists
466 # do not set self.salt to a temp value
467 salt
= f
"no_salt_found_{NO_SALT_CNT}"
469 self
.log
.debug(f
"No salt found, created a temp one: {salt}")
470 m
.update(salt
.encode('utf-8'))
471 m
.update(eid
.encode('utf-8'))
472 m
.update(salt
.encode('utf-8'))
474 return etype
+ '.' + m
.hexdigest()
476 def get_heap_stats(self
) -> Dict
[str, dict]:
477 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
478 anonymized_daemons
= {}
479 osd_map
= self
.get('osd_map')
481 # Combine available daemons
483 for osd
in osd_map
['osds']:
484 daemons
.append('osd'+'.'+str(osd
['osd']))
485 # perf_memory_metrics collection (1/2)
486 if self
.is_enabled_collection(Collection
.perf_memory_metrics
):
487 mon_map
= self
.get('mon_map')
488 mds_metadata
= self
.get('mds_metadata')
489 for mon
in mon_map
['mons']:
490 daemons
.append('mon'+'.'+mon
['name'])
491 for mds
in mds_metadata
:
492 daemons
.append('mds'+'.'+mds
)
494 # Grab output from the "daemon.x heap stats" command
495 for daemon
in daemons
:
496 daemon_type
, daemon_id
= daemon
.split('.', 1)
497 heap_stats
= self
.parse_heap_stats(daemon_type
, daemon_id
)
499 if (daemon_type
!= 'osd'):
500 # Anonymize mon and mds
501 anonymized_daemons
[daemon
] = self
.anonymize_entity_name(daemon
)
502 daemon
= anonymized_daemons
[daemon
]
503 result
[daemon_type
][daemon
] = heap_stats
507 if anonymized_daemons
:
508 # for debugging purposes only, this data is never reported
509 self
.log
.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons
))
512 def parse_heap_stats(self
, daemon_type
: str, daemon_id
: Any
) -> Dict
[str, int]:
519 r
, outb
, outs
= self
.tell_command(daemon_type
, str(daemon_id
), cmd_dict
)
522 self
.log
.error("Invalid command dictionary: {}".format(cmd_dict
))
524 if 'tcmalloc heap stats' in outb
:
525 values
= [int(i
) for i
in outb
.split() if i
.isdigit()]
526 # `categories` must be ordered this way for the correct output to be parsed
527 categories
= ['use_by_application',
528 'page_heap_freelist',
529 'central_cache_freelist',
530 'transfer_cache_freelist',
531 'thread_cache_freelists',
533 'actual_memory_used',
535 'virtual_address_space_used',
537 'thread_heaps_in_use',
538 'tcmalloc_page_size']
539 if len(values
) != len(categories
):
540 self
.log
.error('Received unexpected output from {}.{}; ' \
541 'number of values should match the number' \
542 'of expected categories:\n values: len={} {} '\
543 '~ categories: len={} {} ~ outs: {}'.format(daemon_type
, daemon_id
, len(values
), values
, len(categories
), categories
, outs
))
545 parsed_output
= dict(zip(categories
, values
))
547 self
.log
.error('No heap stats available on {}.{}: {}'.format(daemon_type
, daemon_id
, outs
))
551 def get_mempool(self
, mode
: str = 'separated') -> Dict
[str, dict]:
552 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
553 anonymized_daemons
= {}
554 osd_map
= self
.get('osd_map')
556 # Combine available daemons
558 for osd
in osd_map
['osds']:
559 daemons
.append('osd'+'.'+str(osd
['osd']))
560 # perf_memory_metrics collection (2/2)
561 if self
.is_enabled_collection(Collection
.perf_memory_metrics
):
562 mon_map
= self
.get('mon_map')
563 mds_metadata
= self
.get('mds_metadata')
564 for mon
in mon_map
['mons']:
565 daemons
.append('mon'+'.'+mon
['name'])
566 for mds
in mds_metadata
:
567 daemons
.append('mds'+'.'+mds
)
569 # Grab output from the "dump_mempools" command
570 for daemon
in daemons
:
571 daemon_type
, daemon_id
= daemon
.split('.', 1)
573 'prefix': 'dump_mempools',
576 r
, outb
, outs
= self
.tell_command(daemon_type
, daemon_id
, cmd_dict
)
578 self
.log
.error("Invalid command dictionary: {}".format(cmd_dict
))
582 # This is where the mempool will land.
583 dump
= json
.loads(outb
)
584 if mode
== 'separated':
585 # Anonymize mon and mds
586 if daemon_type
!= 'osd':
587 anonymized_daemons
[daemon
] = self
.anonymize_entity_name(daemon
)
588 daemon
= anonymized_daemons
[daemon
]
589 result
[daemon_type
][daemon
] = dump
['mempool']['by_pool']
590 elif mode
== 'aggregated':
591 for mem_type
in dump
['mempool']['by_pool']:
592 result
[daemon_type
][mem_type
]['bytes'] += dump
['mempool']['by_pool'][mem_type
]['bytes']
593 result
[daemon_type
][mem_type
]['items'] += dump
['mempool']['by_pool'][mem_type
]['items']
595 self
.log
.error("Incorrect mode specified in get_mempool: {}".format(mode
))
596 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
597 self
.log
.error("Error caught on {}.{}: {}".format(daemon_type
, daemon_id
, e
))
600 if anonymized_daemons
:
601 # for debugging purposes only, this data is never reported
602 self
.log
.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons
))
606 def get_osd_histograms(self
, mode
: str = 'separated') -> List
[Dict
[str, dict]]:
607 # Initialize result dict
608 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
612 lambda: defaultdict(int))))))
614 # Get list of osd ids from the metadata
615 osd_metadata
= self
.get('osd_metadata')
617 # Grab output from the "osd.x perf histogram dump" command
618 for osd_id
in osd_metadata
:
620 'prefix': 'perf histogram dump',
624 r
, outb
, outs
= self
.osd_command(cmd_dict
)
625 # Check for invalid calls
627 self
.log
.error("Invalid command dictionary: {}".format(cmd_dict
))
631 # This is where the histograms will land if there are any.
632 dump
= json
.loads(outb
)
634 for histogram
in dump
['osd']:
635 # Log axis information. There are two axes, each represented
636 # as a dictionary. Both dictionaries are contained inside a
637 # list called 'axes'.
639 for axis
in dump
['osd'][histogram
]['axes']:
641 # This is the dict that contains information for an individual
642 # axis. It will be appended to the 'axes' list at the end.
643 axis_dict
: Dict
[str, Any
] = defaultdict()
645 # Collecting information for buckets, min, name, etc.
646 axis_dict
['buckets'] = axis
['buckets']
647 axis_dict
['min'] = axis
['min']
648 axis_dict
['name'] = axis
['name']
649 axis_dict
['quant_size'] = axis
['quant_size']
650 axis_dict
['scale_type'] = axis
['scale_type']
652 # Collecting ranges; placing them in lists to
653 # improve readability later on.
655 for _range
in axis
['ranges']:
656 _max
, _min
= None, None
661 ranges
.append([_min
, _max
])
662 axis_dict
['ranges'] = ranges
664 # Now that 'axis_dict' contains all the appropriate
665 # information for the current axis, append it to the 'axes' list.
666 # There will end up being two axes in the 'axes' list, since the
668 axes
.append(axis_dict
)
670 # Add the 'axes' list, containing both axes, to result.
671 # At this point, you will see that the name of the key is the string
672 # form of our axes list (str(axes)). This is there so that histograms
673 # with different axis configs will not be combined.
674 # These key names are later dropped when only the values are returned.
675 result
[str(axes
)][histogram
]['axes'] = axes
677 # Collect current values and make sure they are in
680 for value_list
in dump
['osd'][histogram
]['values']:
681 values
.append([int(v
) for v
in value_list
])
683 if mode
== 'separated':
684 if 'osds' not in result
[str(axes
)][histogram
]:
685 result
[str(axes
)][histogram
]['osds'] = []
686 result
[str(axes
)][histogram
]['osds'].append({'osd_id': int(osd_id
), 'values': values
})
688 elif mode
== 'aggregated':
689 # Aggregate values. If 'values' have already been initialized,
691 if 'values' in result
[str(axes
)][histogram
]:
692 for i
in range (0, len(values
)):
693 for j
in range (0, len(values
[i
])):
694 values
[i
][j
] += result
[str(axes
)][histogram
]['values'][i
][j
]
696 # Add the values to result.
697 result
[str(axes
)][histogram
]['values'] = values
699 # Update num_combined_osds
700 if 'num_combined_osds' not in result
[str(axes
)][histogram
]:
701 result
[str(axes
)][histogram
]['num_combined_osds'] = 1
703 result
[str(axes
)][histogram
]['num_combined_osds'] += 1
705 self
.log
.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode
))
708 # Sometimes, json errors occur if you give it an empty string.
709 # I am also putting in a catch for a KeyError since it could
710 # happen where the code is assuming that a key exists in the
711 # schema when it doesn't. In either case, we'll handle that
712 # by continuing and collecting what we can from other osds.
713 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
714 self
.log
.error("Error caught on osd.{}: {}".format(osd_id
, e
))
717 return list(result
.values())
719 def get_io_rate(self
) -> dict:
720 return self
.get('io_rate')
722 def get_stats_per_pool(self
) -> dict:
723 result
= self
.get('pg_dump')['pool_stats']
725 # collect application metadata from osd_map
726 osd_map
= self
.get('osd_map')
727 application_metadata
= {pool
['pool']: pool
['application_metadata'] for pool
in osd_map
['pools']}
729 # add application to each pool from pg_dump
731 pool
['application'] = []
732 # Only include default applications
733 for application
in application_metadata
[pool
['poolid']]:
734 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
735 pool
['application'].append(application
)
739 def get_stats_per_pg(self
) -> dict:
740 return self
.get('pg_dump')['pg_stats']
742 def get_rocksdb_stats(self
) -> Dict
[str, str]:
744 result
: Dict
[str, str] = defaultdict()
745 version
= self
.get_rocksdb_version()
748 result
['version'] = version
752 def gather_crashinfo(self
) -> List
[Dict
[str, str]]:
753 crashlist
: List
[Dict
[str, str]] = list()
754 errno
, crashids
, err
= self
.remote('crash', 'ls')
757 for crashid
in crashids
.split():
758 errno
, crashinfo
, err
= self
.remote('crash', 'do_info', crashid
)
761 c
= json
.loads(crashinfo
)
764 del c
['utsname_hostname']
766 # entity_name might have more than one '.', beware
767 (etype
, eid
) = c
.get('entity_name', '').split('.', 1)
770 m
.update(self
.salt
.encode('utf-8'))
771 m
.update(eid
.encode('utf-8'))
772 m
.update(self
.salt
.encode('utf-8'))
773 c
['entity_name'] = etype
+ '.' + m
.hexdigest()
775 # redact final line of python tracebacks, as the exception
776 # payload may contain identifying information
777 if 'mgr_module' in c
and 'backtrace' in c
:
778 # backtrace might be empty
779 if len(c
['backtrace']) > 0:
780 c
['backtrace'][-1] = '<redacted>'
785 def gather_perf_counters(self
, mode
: str = 'separated') -> Dict
[str, dict]:
786 # Extract perf counter data with get_all_perf_counters(), a method
787 # from mgr/mgr_module.py. This method returns a nested dictionary that
788 # looks a lot like perf schema, except with some additional fields.
790 # Example of output, a snapshot of a mon daemon:
792 # "bluestore.kv_flush_lat": {
794 # "description": "Average kv_thread flush latency",
802 all_perf_counters
= self
.get_all_perf_counters()
804 # Initialize 'result' dict
805 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
806 lambda: defaultdict(lambda: defaultdict(int))))
809 anonymized_daemon_dict
= {}
811 for daemon
, all_perf_counters_by_daemon
in all_perf_counters
.items():
812 daemon_type
= daemon
[0:3] # i.e. 'mds', 'osd', 'rgw'
814 if mode
== 'separated':
815 # anonymize individual daemon names except osds
816 if (daemon_type
!= 'osd'):
817 anonymized_daemon
= self
.anonymize_entity_name(daemon
)
818 anonymized_daemon_dict
[anonymized_daemon
] = daemon
819 daemon
= anonymized_daemon
821 # Calculate num combined daemon types if in aggregated mode
822 if mode
== 'aggregated':
823 if 'num_combined_daemons' not in result
[daemon_type
]:
824 result
[daemon_type
]['num_combined_daemons'] = 1
826 result
[daemon_type
]['num_combined_daemons'] += 1
828 for collection
in all_perf_counters_by_daemon
:
829 # Split the collection to avoid redundancy in final report; i.e.:
830 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
831 # bluestore: kv_flush_lat, kv_final_lat
832 col_0
, col_1
= collection
.split('.')
834 # Debug log for empty keys. This initially was a problem for prioritycache
835 # perf counters, where the col_0 was empty for certain mon counters:
837 # "mon.a": { instead of "mon.a": {
838 # "": { "prioritycache": {
839 # "cache_bytes": {...}, "cache_bytes": {...},
841 # This log is here to detect any future instances of a similar issue.
842 if (daemon
== "") or (col_0
== "") or (col_1
== ""):
843 self
.log
.debug("Instance of an empty key: {}{}".format(daemon
, collection
))
845 if mode
== 'separated':
846 # Add value to result
847 result
[daemon
][col_0
][col_1
]['value'] = \
848 all_perf_counters_by_daemon
[collection
]['value']
850 # Check that 'count' exists, as not all counters have a count field.
851 if 'count' in all_perf_counters_by_daemon
[collection
]:
852 result
[daemon
][col_0
][col_1
]['count'] = \
853 all_perf_counters_by_daemon
[collection
]['count']
854 elif mode
== 'aggregated':
855 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
856 # has a uniquely-named collection that starts off identically (i.e.
857 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
858 # This bit of code combines these unique counters all under one rgw instance.
859 # Without this check, the schema would remain separeted out in the final report.
860 if col_0
[0:11] == "objecter-0x":
861 col_0
= "objecter-0x"
863 # Check that the value can be incremented. In some cases,
864 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
865 # In those cases, the value is a dictionary, and not a number.
866 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
867 if isinstance(all_perf_counters_by_daemon
[collection
]['value'], numbers
.Number
):
868 result
[daemon_type
][col_0
][col_1
]['value'] += \
869 all_perf_counters_by_daemon
[collection
]['value']
871 # Check that 'count' exists, as not all counters have a count field.
872 if 'count' in all_perf_counters_by_daemon
[collection
]:
873 result
[daemon_type
][col_0
][col_1
]['count'] += \
874 all_perf_counters_by_daemon
[collection
]['count']
876 self
.log
.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode
))
879 if mode
== 'separated':
880 # for debugging purposes only, this data is never reported
881 self
.log
.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict
))
885 def get_active_channels(self
) -> List
[str]:
887 if self
.channel_basic
:
889 if self
.channel_crash
:
891 if self
.channel_device
:
893 if self
.channel_ident
:
895 if self
.channel_perf
:
899 def gather_device_report(self
) -> Dict
[str, Dict
[str, Dict
[str, str]]]:
901 time_format
= self
.remote('devicehealth', 'get_time_format')
902 except Exception as e
:
903 self
.log
.debug('Unable to format time: {}'.format(e
))
905 cutoff
= datetime
.utcnow() - timedelta(hours
=self
.interval
* 2)
906 min_sample
= cutoff
.strftime(time_format
)
908 devices
= self
.get('devices')['devices']
910 self
.log
.debug('Unable to get device info from the mgr.')
913 # anon-host-id -> anon-devid -> { timestamp -> record }
914 res
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
918 # this is a map of stamp -> {device info}
919 m
= self
.remote('devicehealth', 'get_recent_device_metrics',
921 except Exception as e
:
922 self
.log
.error('Unable to get recent metrics from device with id "{}": {}'.format(devid
, e
))
927 host
= d
['location'][0]['host']
928 except (KeyError, IndexError) as e
:
929 self
.log
.error('Unable to get host from device with id "{}": {}'.format(devid
, e
))
931 anon_host
= self
.get_store('host-id/%s' % host
)
933 anon_host
= str(uuid
.uuid1())
934 self
.set_store('host-id/%s' % host
, anon_host
)
936 for dev
, rep
in m
.items():
937 rep
['host_id'] = anon_host
938 if serial
is None and 'serial_number' in rep
:
939 serial
= rep
['serial_number']
941 # anonymize device id
942 anon_devid
= self
.get_store('devid-id/%s' % devid
)
944 # ideally devid is 'vendor_model_serial',
945 # but can also be 'model_serial', 'serial'
947 anon_devid
= f
"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
949 anon_devid
= str(uuid
.uuid1())
950 self
.set_store('devid-id/%s' % devid
, anon_devid
)
951 self
.log
.info('devid %s / %s, host %s / %s' % (devid
, anon_devid
,
954 # anonymize the smartctl report itself
956 m_str
= json
.dumps(m
)
957 m
= json
.loads(m_str
.replace(serial
, 'deleted'))
959 if anon_host
not in res
:
961 res
[anon_host
][anon_devid
] = m
964 def get_latest(self
, daemon_type
: str, daemon_name
: str, stat
: str) -> int:
965 data
= self
.get_counter(daemon_type
, daemon_name
, stat
)[stat
]
971 def compile_report(self
, channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
973 channels
= self
.get_active_channels()
975 'leaderboard': self
.leaderboard
,
977 'report_timestamp': datetime
.utcnow().isoformat(),
978 'report_id': self
.report_id
,
979 'channels': channels
,
980 'channels_available': ALL_CHANNELS
,
982 'collections_available': [c
['name'].name
for c
in MODULE_COLLECTION
],
983 'collections_opted_in': [c
['name'].name
for c
in MODULE_COLLECTION
if self
.is_enabled_collection(c
['name'])],
986 if 'ident' in channels
:
987 for option
in ['description', 'contact', 'organization']:
988 report
[option
] = getattr(self
, option
)
990 if 'basic' in channels
:
991 mon_map
= self
.get('mon_map')
992 osd_map
= self
.get('osd_map')
993 service_map
= self
.get('service_map')
994 fs_map
= self
.get('fs_map')
996 df_pools
= {pool
['id']: pool
for pool
in df
['pools']}
998 report
['created'] = mon_map
['created']
1005 for mon
in mon_map
['mons']:
1006 for a
in mon
['public_addrs']['addrvec']:
1007 if a
['type'] == 'v2':
1009 elif a
['type'] == 'v1':
1011 if a
['addr'].startswith('['):
1016 'count': len(mon_map
['mons']),
1017 'features': mon_map
['features'],
1018 'min_mon_release': mon_map
['min_mon_release'],
1019 'v1_addr_mons': v1_mons
,
1020 'v2_addr_mons': v2_mons
,
1021 'ipv4_addr_mons': ipv4_mons
,
1022 'ipv6_addr_mons': ipv6_mons
,
1025 report
['config'] = self
.gather_configs()
1030 rbd_num_images_by_pool
= []
1031 rbd_mirroring_by_pool
= []
1033 report
['pools'] = list()
1034 for pool
in osd_map
['pools']:
1035 num_pg
+= pool
['pg_num']
1037 if pool
['erasure_code_profile']:
1038 orig
= osd_map
['erasure_code_profiles'].get(
1039 pool
['erasure_code_profile'], {})
1041 k
: orig
[k
] for k
in orig
.keys()
1042 if k
in ['k', 'm', 'plugin', 'technique',
1043 'crush-failure-domain', 'l']
1046 'pool': pool
['pool'],
1047 'pg_num': pool
['pg_num'],
1048 'pgp_num': pool
['pg_placement_num'],
1049 'size': pool
['size'],
1050 'min_size': pool
['min_size'],
1051 'pg_autoscale_mode': pool
['pg_autoscale_mode'],
1052 'target_max_bytes': pool
['target_max_bytes'],
1053 'target_max_objects': pool
['target_max_objects'],
1054 'type': ['', 'replicated', '', 'erasure'][pool
['type']],
1055 'erasure_code_profile': ec_profile
,
1056 'cache_mode': pool
['cache_mode'],
1059 # basic_pool_usage collection
1060 if self
.is_enabled_collection(Collection
.basic_pool_usage
):
1061 pool_data
['application'] = []
1062 for application
in pool
['application_metadata']:
1063 # Only include default applications
1064 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
1065 pool_data
['application'].append(application
)
1066 pool_stats
= df_pools
[pool
['pool']]['stats']
1067 pool_data
['stats'] = { # filter out kb_used
1068 'avail_raw': pool_stats
['avail_raw'],
1069 'bytes_used': pool_stats
['bytes_used'],
1070 'compress_bytes_used': pool_stats
['compress_bytes_used'],
1071 'compress_under_bytes': pool_stats
['compress_under_bytes'],
1072 'data_bytes_used': pool_stats
['data_bytes_used'],
1073 'dirty': pool_stats
['dirty'],
1074 'max_avail': pool_stats
['max_avail'],
1075 'objects': pool_stats
['objects'],
1076 'omap_bytes_used': pool_stats
['omap_bytes_used'],
1077 'percent_used': pool_stats
['percent_used'],
1078 'quota_bytes': pool_stats
['quota_bytes'],
1079 'quota_objects': pool_stats
['quota_objects'],
1080 'rd': pool_stats
['rd'],
1081 'rd_bytes': pool_stats
['rd_bytes'],
1082 'stored': pool_stats
['stored'],
1083 'stored_data': pool_stats
['stored_data'],
1084 'stored_omap': pool_stats
['stored_omap'],
1085 'stored_raw': pool_stats
['stored_raw'],
1086 'wr': pool_stats
['wr'],
1087 'wr_bytes': pool_stats
['wr_bytes']
1090 cast(List
[Dict
[str, Any
]], report
['pools']).append(pool_data
)
1091 if 'rbd' in pool
['application_metadata']:
1093 ioctx
= self
.rados
.open_ioctx(pool
['pool_name'])
1094 rbd_num_images_by_pool
.append(
1095 sum(1 for _
in rbd
.RBD().list2(ioctx
)))
1096 rbd_mirroring_by_pool
.append(
1097 rbd
.RBD().mirror_mode_get(ioctx
) != rbd
.RBD_MIRROR_MODE_DISABLED
)
1099 'num_pools': rbd_num_pools
,
1100 'num_images_by_pool': rbd_num_images_by_pool
,
1101 'mirroring_by_pool': rbd_mirroring_by_pool
}
1104 cluster_network
= False
1105 for osd
in osd_map
['osds']:
1106 if osd
['up'] and not cluster_network
:
1107 front_ip
= osd
['public_addrs']['addrvec'][0]['addr'].split(':')[0]
1108 back_ip
= osd
['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
1109 if front_ip
!= back_ip
:
1110 cluster_network
= True
1112 'count': len(osd_map
['osds']),
1113 'require_osd_release': osd_map
['require_osd_release'],
1114 'require_min_compat_client': osd_map
['require_min_compat_client'],
1115 'cluster_network': cluster_network
,
1119 report
['crush'] = self
.gather_crush_info()
1123 'count': len(fs_map
['filesystems']),
1124 'feature_flags': fs_map
['feature_flags'],
1125 'num_standby_mds': len(fs_map
['standbys']),
1128 num_mds
= len(fs_map
['standbys'])
1129 for fsm
in fs_map
['filesystems']:
1139 for gid
, mds
in fs
['info'].items():
1140 num_sessions
+= self
.get_latest('mds', mds
['name'],
1141 'mds_sessions.session_count')
1142 cached_ino
+= self
.get_latest('mds', mds
['name'],
1144 cached_dn
+= self
.get_latest('mds', mds
['name'],
1146 cached_cap
+= self
.get_latest('mds', mds
['name'],
1148 subtrees
+= self
.get_latest('mds', mds
['name'],
1150 if mds
['rank'] == 0:
1151 rfiles
= self
.get_latest('mds', mds
['name'],
1153 rbytes
= self
.get_latest('mds', mds
['name'],
1155 rsnaps
= self
.get_latest('mds', mds
['name'],
1157 report
['fs']['filesystems'].append({ # type: ignore
1158 'max_mds': fs
['max_mds'],
1159 'ever_allowed_features': fs
['ever_allowed_features'],
1160 'explicitly_allowed_features': fs
['explicitly_allowed_features'],
1161 'num_in': len(fs
['in']),
1162 'num_up': len(fs
['up']),
1163 'num_standby_replay': len(
1164 [mds
for gid
, mds
in fs
['info'].items()
1165 if mds
['state'] == 'up:standby-replay']),
1166 'num_mds': len(fs
['info']),
1167 'num_sessions': num_sessions
,
1168 'cached_inos': cached_ino
,
1169 'cached_dns': cached_dn
,
1170 'cached_caps': cached_cap
,
1171 'cached_subtrees': subtrees
,
1172 'balancer_enabled': len(fs
['balancer']) > 0,
1173 'num_data_pools': len(fs
['data_pools']),
1174 'standby_count_wanted': fs
['standby_count_wanted'],
1175 'approx_ctime': fs
['created'][0:7],
1180 num_mds
+= len(fs
['info'])
1181 report
['fs']['total_num_mds'] = num_mds
# type: ignore
1184 report
['metadata'] = dict(osd
=self
.gather_osd_metadata(osd_map
),
1185 mon
=self
.gather_mon_metadata(mon_map
))
1187 if self
.is_enabled_collection(Collection
.basic_mds_metadata
):
1188 report
['metadata']['mds'] = self
.gather_mds_metadata() # type: ignore
1191 servers
= self
.list_servers()
1192 self
.log
.debug('servers %s' % servers
)
1194 'num': len([h
for h
in servers
if h
['hostname']]),
1196 for t
in ['mon', 'mds', 'osd', 'mgr']:
1197 nr_services
= sum(1 for host
in servers
if
1198 any(service
for service
in cast(List
[ServiceInfoT
],
1200 if service
['type'] == t
))
1201 hosts
['num_with_' + t
] = nr_services
1202 report
['hosts'] = hosts
1205 'pools': len(df
['pools']),
1207 'total_used_bytes': df
['stats']['total_used_bytes'],
1208 'total_bytes': df
['stats']['total_bytes'],
1209 'total_avail_bytes': df
['stats']['total_avail_bytes']
1211 # basic_usage_by_class collection
1212 if self
.is_enabled_collection(Collection
.basic_usage_by_class
):
1213 report
['usage']['stats_by_class'] = {} # type: ignore
1214 for device_class
in df
['stats_by_class']:
1215 if device_class
in ['hdd', 'ssd', 'nvme']:
1216 report
['usage']['stats_by_class'][device_class
] = df
['stats_by_class'][device_class
] # type: ignore
1218 services
: DefaultDict
[str, int] = defaultdict(int)
1219 for key
, value
in service_map
['services'].items():
1227 d
= value
.get('daemons', dict())
1228 for k
, v
in d
.items():
1229 if k
== 'summary' and v
:
1231 elif isinstance(v
, dict) and 'metadata' in v
:
1233 zones
.add(v
['metadata']['zone_id'])
1234 zonegroups
.add(v
['metadata']['zonegroup_id'])
1235 frontends
.add(v
['metadata']['frontend_type#0'])
1237 # we could actually iterate over all the keys of
1238 # the dict and check for how many frontends there
1239 # are, but it is unlikely that one would be running
1240 # more than 2 supported ones
1241 f2
= v
['metadata'].get('frontend_type#1', None)
1245 rgw
['count'] = count
1246 rgw
['zones'] = len(zones
)
1247 rgw
['zonegroups'] = len(zonegroups
)
1248 rgw
['frontends'] = list(frontends
) # sets aren't json-serializable
1250 report
['services'] = services
1253 report
['balancer'] = self
.remote('balancer', 'gather_telemetry')
1255 report
['balancer'] = {
1260 self
.get_rook_data(report
)
1262 if 'crash' in channels
:
1263 report
['crashes'] = self
.gather_crashinfo()
1265 if 'perf' in channels
:
1266 if self
.is_enabled_collection(Collection
.perf_perf
):
1267 report
['perf_counters'] = self
.gather_perf_counters('separated')
1268 report
['stats_per_pool'] = self
.get_stats_per_pool()
1269 report
['stats_per_pg'] = self
.get_stats_per_pg()
1270 report
['io_rate'] = self
.get_io_rate()
1271 report
['osd_perf_histograms'] = self
.get_osd_histograms('separated')
1272 report
['mempool'] = self
.get_mempool('separated')
1273 report
['heap_stats'] = self
.get_heap_stats()
1274 report
['rocksdb_stats'] = self
.get_rocksdb_stats()
1276 # NOTE: We do not include the 'device' channel in this report; it is
1277 # sent to a different endpoint.
1281 def get_rook_data(self
, report
: Dict
[str, object]) -> None:
1282 r
, outb
, outs
= self
.mon_command({
1283 'prefix': 'config-key dump',
1289 config_kv_dump
= json
.loads(outb
)
1290 except json
.decoder
.JSONDecodeError
:
1293 for elem
in ROOK_KEYS_BY_COLLECTION
:
1294 # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
1295 # elem[1] is the Collection this key belongs to
1296 if self
.is_enabled_collection(elem
[1]):
1297 self
.add_kv_to_report(report
, elem
[0], config_kv_dump
.get(elem
[0]))
1299 def add_kv_to_report(self
, report
: Dict
[str, object], key_path
: str, value
: Any
) -> None:
1300 last_node
= key_path
.split('/')[-1]
1301 for node
in key_path
.split('/')[0:-1]:
1302 if node
not in report
:
1304 report
= report
[node
] # type: ignore
1306 # sanity check of keys correctness
1307 if not isinstance(report
, dict):
1308 self
.log
.error(f
"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
1311 if last_node
in report
:
1312 self
.log
.error(f
"'{key_path}' is an invalid key, last part must not exist at this point")
1315 report
[last_node
] = value
1317 def _try_post(self
, what
: str, url
: str, report
: Dict
[str, Dict
[str, str]]) -> Optional
[str]:
1318 self
.log
.info('Sending %s to: %s' % (what
, url
))
1321 self
.log
.info('Send using HTTP(S) proxy: %s', self
.proxy
)
1322 proxies
['http'] = self
.proxy
1323 proxies
['https'] = self
.proxy
1325 resp
= requests
.put(url
=url
, json
=report
, proxies
=proxies
)
1326 resp
.raise_for_status()
1327 except Exception as e
:
1328 fail_reason
= 'Failed to send %s to %s: %s' % (what
, url
, str(e
))
1329 self
.log
.error(fail_reason
)
1333 class EndPoint(enum
.Enum
):
1337 def collection_delta(self
, channels
: Optional
[List
[str]] = None) -> Optional
[List
[Collection
]]:
1339 Find collections that are available in the module, but are not in the db
1341 if self
.db_collection
is None:
1345 channels
= ALL_CHANNELS
1348 if ch
not in ALL_CHANNELS
:
1349 self
.log
.debug(f
"invalid channel name: {ch}")
1352 new_collection
: List
[Collection
] = []
1354 for c
in MODULE_COLLECTION
:
1355 if c
['name'].name
not in self
.db_collection
:
1356 if c
['channel'] in channels
:
1357 new_collection
.append(c
['name'])
1359 return new_collection
1361 def is_major_upgrade(self
) -> bool:
1363 Returns True only if the user last opted-in to an older major
1365 if self
.last_opted_in_ceph_version
is None or self
.last_opted_in_ceph_version
== 0:
1366 # we do not know what Ceph version was when the user last opted-in,
1367 # thus we do not wish to nag in case of a major upgrade
1370 mon_map
= self
.get('mon_map')
1371 mon_min
= mon_map
.get("min_mon_release", 0)
1373 if mon_min
- self
.last_opted_in_ceph_version
> 0:
1374 self
.log
.debug(f
"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1379 def is_opted_in(self
) -> bool:
1380 # If len is 0 it means that the user is either opted-out (never
1381 # opted-in, or invoked `telemetry off`), or they upgraded from a
1382 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1383 # regardless, hence is considered as opted-out
1384 if self
.db_collection
is None:
1386 return len(self
.db_collection
) > 0
1388 def should_nag(self
) -> bool:
1389 # Find delta between opted-in collections and module collections;
1390 # nag only if module has a collection which is not in db, and nag == True.
1392 # We currently do not nag if the user is opted-out (or never opted-in).
1393 # If we wish to do this in the future, we need to have a tri-mode state
1394 # (opted in, opted out, no action yet), and it needs to be guarded by a
1395 # config option (so that nagging can be turned off via config).
1396 # We also need to add a last_opted_out_ceph_version variable, for the
1397 # major upgrade check.
1399 # check if there are collections the user is not opt-in to
1400 # that we should nag about
1401 if self
.db_collection
is not None:
1402 for c
in MODULE_COLLECTION
:
1403 if c
['name'].name
not in self
.db_collection
:
1404 if c
['nag'] == True:
1405 self
.log
.debug(f
"The collection: {c['name']} is not reported")
1408 # user might be opted-in to the most recent collection, or there is no
1409 # new collection which requires nagging about; thus nag in case it's a
1410 # major upgrade and there are new collections
1411 # (which their own nag == False):
1412 new_collections
= False
1413 col_delta
= self
.collection_delta()
1414 if col_delta
is not None and len(col_delta
) > 0:
1415 new_collections
= True
1417 return self
.is_major_upgrade() and new_collections
1419 def init_collection(self
) -> None:
1420 # We fetch from db the collections the user had already opted-in to.
1421 # During the transition the results will be empty, but the user might
1422 # be opted-in to an older version (e.g. revision = 3)
1424 collection
= self
.get_store('collection')
1426 if collection
is not None:
1427 self
.db_collection
= json
.loads(collection
)
1429 if self
.db_collection
is None:
1430 # happens once on upgrade
1431 if not self
.enabled
:
1432 # user is not opted-in
1433 self
.set_store('collection', json
.dumps([]))
1434 self
.log
.debug("user is not opted-in")
1436 # user is opted-in, verify the revision:
1437 if self
.last_opt_revision
== REVISION
:
1438 self
.log
.debug(f
"telemetry revision is {REVISION}")
1439 base_collection
= [Collection
.basic_base
.name
, Collection
.device_base
.name
, Collection
.crash_base
.name
, Collection
.ident_base
.name
]
1440 self
.set_store('collection', json
.dumps(base_collection
))
1442 # user is opted-in to an older version, meaning they need
1443 # to re-opt in regardless
1444 self
.set_store('collection', json
.dumps([]))
1445 self
.log
.debug(f
"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1447 # reload collection after setting
1448 collection
= self
.get_store('collection')
1449 if collection
is not None:
1450 self
.db_collection
= json
.loads(collection
)
1452 raise RuntimeError('collection is None after initial setting')
1454 # user has already upgraded
1455 self
.log
.debug(f
"user has upgraded already: collection: {self.db_collection}")
1457 def is_enabled_collection(self
, collection
: Collection
) -> bool:
1458 if self
.db_collection
is None:
1460 return collection
.name
in self
.db_collection
1462 def opt_in_all_collections(self
) -> None:
1464 Opt-in to all collections; Update db with the currently available collections in the module
1466 if self
.db_collection
is None:
1467 raise RuntimeError('db_collection is None after initial setting')
1469 for c
in MODULE_COLLECTION
:
1470 if c
['name'].name
not in self
.db_collection
:
1471 self
.db_collection
.append(c
['name'])
1473 self
.set_store('collection', json
.dumps(self
.db_collection
))
1476 report
: Dict
[str, Dict
[str, str]],
1477 endpoint
: Optional
[List
[EndPoint
]] = None) -> Tuple
[int, str, str]:
1479 endpoint
= [self
.EndPoint
.ceph
, self
.EndPoint
.device
]
1482 self
.log
.debug('Send endpoints %s' % endpoint
)
1484 if e
== self
.EndPoint
.ceph
:
1485 fail_reason
= self
._try
_post
('ceph report', self
.url
, report
)
1487 failed
.append(fail_reason
)
1489 now
= int(time
.time())
1490 self
.last_upload
= now
1491 self
.set_store('last_upload', str(now
))
1492 success
.append('Ceph report sent to {0}'.format(self
.url
))
1493 self
.log
.info('Sent report to {0}'.format(self
.url
))
1494 elif e
== self
.EndPoint
.device
:
1495 if 'device' in self
.get_active_channels():
1496 devices
= self
.gather_device_report()
1500 for host
, ls
in devices
.items():
1501 self
.log
.debug('host %s devices %s' % (host
, ls
))
1504 fail_reason
= self
._try
_post
('devices', self
.device_url
,
1507 failed
.append(fail_reason
)
1512 success
.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1513 num_devs
, num_hosts
, len(devices
)))
1515 fail_reason
= 'Unable to send device report: Device channel is on, but the generated report was empty.'
1516 failed
.append(fail_reason
)
1517 self
.log
.error(fail_reason
)
1519 return 1, '', '\n'.join(success
+ failed
)
1520 return 0, '', '\n'.join(success
)
1522 def format_perf_histogram(self
, report
: Dict
[str, Any
]) -> None:
1523 # Formatting the perf histograms so they are human-readable. This will change the
1524 # ranges and values, which are currently in list form, into strings so that
1525 # they are displayed horizontally instead of vertically.
1527 # Formatting ranges and values in osd_perf_histograms
1528 mode
= 'osd_perf_histograms'
1529 for config
in report
[mode
]:
1530 for histogram
in config
:
1531 # Adjust ranges by converting lists into strings
1532 for axis
in config
[histogram
]['axes']:
1533 for i
in range(0, len(axis
['ranges'])):
1534 axis
['ranges'][i
] = str(axis
['ranges'][i
])
1536 for osd
in config
[histogram
]['osds']:
1537 for i
in range(0, len(osd
['values'])):
1538 osd
['values'][i
] = str(osd
['values'][i
])
1540 # If the perf channel is not enabled, there should be a KeyError since
1541 # 'osd_perf_histograms' would not be present in the report. In that case,
1542 # the show function should pass as usual without trying to format the
1546 def toggle_channel(self
, action
: str, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1548 Enable or disable a list of channels
1550 if not self
.enabled
:
1551 # telemetry should be on for channels to be toggled
1552 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1553 'Preview sample reports with `ceph telemetry preview`.'
1556 if channels
is None:
1557 msg
= f
'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1560 state
= action
== 'enable'
1563 if c
not in ALL_CHANNELS
:
1564 msg
= f
"{msg}{c} is not a valid channel name. "\
1565 f
"Available channels: {ALL_CHANNELS}.\n"
1567 self
.set_module_option(f
"channel_{c}", state
)
1571 msg
= f
"{msg}channel_{c} is {action}d\n"
1575 @CLIReadCommand('telemetry status')
1576 def status(self
) -> Tuple
[int, str, str]:
1578 Show current configuration
1581 for opt
in self
.MODULE_OPTIONS
:
1582 r
[opt
['name']] = getattr(self
, opt
['name'])
1583 r
['last_upload'] = (time
.ctime(self
.last_upload
)
1584 if self
.last_upload
else self
.last_upload
)
1585 return 0, json
.dumps(r
, indent
=4, sort_keys
=True), ''
1587 @CLIReadCommand('telemetry diff')
1588 def diff(self
) -> Tuple
[int, str, str]:
1590 Show the diff between opted-in collection and available collection
1595 for c
in MODULE_COLLECTION
:
1596 if not self
.is_enabled_collection(c
['name']):
1597 diff
.append({key
: val
for key
, val
in c
.items() if key
not in keys
})
1601 r
= "Telemetry is up to date"
1603 r
= json
.dumps(diff
, indent
=4, sort_keys
=True)
1607 @CLICommand('telemetry on')
1608 def on(self
, license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1610 Enable telemetry reports from this cluster
1612 if license
!= LICENSE
:
1613 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1614 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1616 self
.set_module_option('enabled', True)
1618 self
.opt_in_all_collections()
1620 # for major releases upgrade nagging
1621 mon_map
= self
.get('mon_map')
1622 mon_min
= mon_map
.get("min_mon_release", 0)
1623 self
.set_store('last_opted_in_ceph_version', str(mon_min
))
1624 self
.last_opted_in_ceph_version
= mon_min
1626 msg
= 'Telemetry is on.'
1627 disabled_channels
= ''
1628 active_channels
= self
.get_active_channels()
1629 for c
in ALL_CHANNELS
:
1630 if c
not in active_channels
and c
!= 'ident':
1631 disabled_channels
= f
"{disabled_channels} {c}"
1633 if len(disabled_channels
) > 0:
1634 msg
= f
"{msg}\nSome channels are disabled, please enable with:\n"\
1635 f
"`ceph telemetry enable channel{disabled_channels}`"
1637 # wake up serve() to reset health warning
1642 @CLICommand('telemetry off')
1643 def off(self
) -> Tuple
[int, str, str]:
1645 Disable telemetry reports from this cluster
1647 if not self
.enabled
:
1648 # telemetry is already off
1649 msg
= 'Telemetry is currently not enabled, nothing to turn off. '\
1650 'Please consider opting-in with `ceph telemetry on`.\n' \
1651 'Preview sample reports with `ceph telemetry preview`.'
1654 self
.set_module_option('enabled', False)
1655 self
.enabled
= False
1656 self
.set_store('collection', json
.dumps([]))
1657 self
.db_collection
= []
1659 # we might need this info in the future, in case
1660 # of nagging when user is opted-out
1661 mon_map
= self
.get('mon_map')
1662 mon_min
= mon_map
.get("min_mon_release", 0)
1663 self
.set_store('last_opted_out_ceph_version', str(mon_min
))
1664 self
.last_opted_out_ceph_version
= mon_min
1666 msg
= 'Telemetry is now disabled.'
1669 @CLIReadCommand('telemetry enable channel all')
1670 def enable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1674 return self
.toggle_channel('enable', channels
)
1676 @CLIReadCommand('telemetry enable channel')
1677 def enable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1679 Enable a list of channels
1681 return self
.toggle_channel('enable', channels
)
1683 @CLIReadCommand('telemetry disable channel all')
1684 def disable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1686 Disable all channels
1688 return self
.toggle_channel('disable', channels
)
1690 @CLIReadCommand('telemetry disable channel')
1691 def disable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1693 Disable a list of channels
1695 return self
.toggle_channel('disable', channels
)
1697 @CLIReadCommand('telemetry channel ls')
1698 def channel_ls(self
) -> Tuple
[int, str, str]:
1702 table
= PrettyTable(
1704 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1707 table
.align
['NAME'] = 'l'
1708 table
.align
['ENABLED'] = 'l'
1709 table
.align
['DEFAULT'] = 'l'
1710 table
.align
['DESC'] = 'l'
1711 table
.left_padding_width
= 0
1712 table
.right_padding_width
= 4
1714 for c
in ALL_CHANNELS
:
1715 enabled
= "ON" if getattr(self
, f
"channel_{c}") else "OFF"
1716 for o
in self
.MODULE_OPTIONS
:
1717 if o
['name'] == f
"channel_{c}":
1718 default
= "ON" if o
.get('default', None) else "OFF"
1719 desc
= o
.get('desc', None)
1728 return 0, table
.get_string(sortby
="NAME"), ''
1730 @CLIReadCommand('telemetry collection ls')
1731 def collection_ls(self
) -> Tuple
[int, str, str]:
1733 List all collections
1735 col_delta
= self
.collection_delta()
1737 if col_delta
is not None and len(col_delta
) > 0:
1738 msg
= f
"New collections are available:\n" \
1739 f
"{sorted([c.name for c in col_delta])}\n" \
1740 f
"Run `ceph telemetry on` to opt-in to these collections.\n"
1742 table
= PrettyTable(
1744 'NAME', 'STATUS', 'DESC',
1747 table
.align
['NAME'] = 'l'
1748 table
.align
['STATUS'] = 'l'
1749 table
.align
['DESC'] = 'l'
1750 table
.left_padding_width
= 0
1751 table
.right_padding_width
= 4
1753 for c
in MODULE_COLLECTION
:
1755 opted_in
= self
.is_enabled_collection(name
)
1756 channel_enabled
= getattr(self
, f
"channel_{c['channel']}")
1759 if channel_enabled
and opted_in
:
1760 status
= "REPORTING"
1766 why
+= "NOT OPTED-IN"
1768 if not channel_enabled
:
1769 why
+= f
"{delimiter}CHANNEL {c['channel']} IS OFF"
1771 status
= f
"NOT REPORTING: {why}"
1773 desc
= c
['description']
1782 # add a new line between message and table output
1785 return 0, f
'{msg}{table.get_string(sortby="NAME")}', ''
1787 @CLICommand('telemetry send')
1789 endpoint
: Optional
[List
[EndPoint
]] = None,
1790 license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1792 Send a sample report
1794 if not self
.is_opted_in() and license
!= LICENSE
:
1795 self
.log
.debug(('A telemetry send attempt while opted-out. '
1796 'Asking for license agreement'))
1797 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1798 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1799 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1801 self
.last_report
= self
.compile_report()
1802 return self
.send(self
.last_report
, endpoint
)
1804 @CLIReadCommand('telemetry show')
1805 def show(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1807 Show a sample report of opted-in collections (except for 'device')
1809 if not self
.enabled
:
1810 # if telemetry is off, no report is being sent, hence nothing to show
1811 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1812 'Preview sample reports with `ceph telemetry preview`.'
1815 report
= self
.get_report_locked(channels
=channels
)
1816 self
.format_perf_histogram(report
)
1817 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1819 if self
.channel_device
:
1820 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1822 return 0, report
, ''
1824 @CLIReadCommand('telemetry preview')
1825 def preview(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1827 Preview a sample report of the most recent collections available (except for 'device')
1831 # We use a lock to prevent a scenario where the user wishes to preview
1832 # the report, and at the same time the module hits the interval of
1833 # sending a report with the opted-in collection, which has less data
1834 # than in the preview report.
1835 col_delta
= self
.collection_delta()
1836 with self
.get_report_lock
:
1837 if col_delta
is not None and len(col_delta
) == 0:
1838 # user is already opted-in to the most recent collection
1839 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1842 # there are collections the user is not opted-in to
1843 next_collection
= []
1845 for c
in MODULE_COLLECTION
:
1846 next_collection
.append(c
['name'].name
)
1848 opted_in_collection
= self
.db_collection
1849 self
.db_collection
= next_collection
1850 report
= self
.get_report(channels
=channels
)
1851 self
.db_collection
= opted_in_collection
1853 self
.format_perf_histogram(report
)
1854 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1856 if self
.channel_device
:
1857 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1859 return 0, report
, ''
1861 @CLIReadCommand('telemetry show-device')
1862 def show_device(self
) -> Tuple
[int, str, str]:
1864 Show a sample device report
1866 if not self
.enabled
:
1867 # if telemetry is off, no report is being sent, hence nothing to show
1868 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1869 'Preview sample device reports with `ceph telemetry preview-device`.'
1872 if not self
.channel_device
:
1873 # if device channel is off, device report is not being sent, hence nothing to show
1874 msg
= 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1875 'Preview sample device reports with `ceph telemetry preview-device`.'
1878 return 0, json
.dumps(self
.get_report_locked('device'), indent
=4, sort_keys
=True), ''
1880 @CLIReadCommand('telemetry preview-device')
1881 def preview_device(self
) -> Tuple
[int, str, str]:
1883 Preview a sample device report of the most recent device collection
1887 device_col_delta
= self
.collection_delta(['device'])
1888 with self
.get_report_lock
:
1889 if device_col_delta
is not None and len(device_col_delta
) == 0 and self
.channel_device
:
1890 # user is already opted-in to the most recent device collection,
1891 # and device channel is on, thus `show-device` should be called
1892 msg
= 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1895 # either the user is not opted-in at all, or there are collections
1896 # they are not opted-in to
1897 next_collection
= []
1899 for c
in MODULE_COLLECTION
:
1900 next_collection
.append(c
['name'].name
)
1902 opted_in_collection
= self
.db_collection
1903 self
.db_collection
= next_collection
1904 report
= self
.get_report('device')
1905 self
.db_collection
= opted_in_collection
1907 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1908 return 0, report
, ''
1910 @CLIReadCommand('telemetry show-all')
1911 def show_all(self
) -> Tuple
[int, str, str]:
1913 Show a sample report of all enabled channels (including 'device' channel)
1915 if not self
.enabled
:
1916 # if telemetry is off, no report is being sent, hence nothing to show
1917 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1918 'Preview sample reports with `ceph telemetry preview`.'
1921 if not self
.channel_device
:
1922 # device channel is off, no need to display its report
1923 return 0, json
.dumps(self
.get_report_locked('default'), indent
=4, sort_keys
=True), ''
1925 # telemetry is on and device channel is enabled, show both
1926 return 0, json
.dumps(self
.get_report_locked('all'), indent
=4, sort_keys
=True), ''
1928 @CLIReadCommand('telemetry preview-all')
1929 def preview_all(self
) -> Tuple
[int, str, str]:
1931 Preview a sample report of the most recent collections available of all channels (including 'device')
1935 col_delta
= self
.collection_delta()
1936 with self
.get_report_lock
:
1937 if col_delta
is not None and len(col_delta
) == 0:
1938 # user is already opted-in to the most recent collection
1939 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1942 # there are collections the user is not opted-in to
1943 next_collection
= []
1945 for c
in MODULE_COLLECTION
:
1946 next_collection
.append(c
['name'].name
)
1948 opted_in_collection
= self
.db_collection
1949 self
.db_collection
= next_collection
1950 report
= self
.get_report('all')
1951 self
.db_collection
= opted_in_collection
1953 self
.format_perf_histogram(report
)
1954 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1956 return 0, report
, ''
1958 def get_report_locked(self
,
1959 report_type
: str = 'default',
1960 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1962 A wrapper around get_report to allow for compiling a report of the most recent module collections
1964 with self
.get_report_lock
:
1965 return self
.get_report(report_type
, channels
)
1967 def get_report(self
,
1968 report_type
: str = 'default',
1969 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1970 if report_type
== 'default':
1971 return self
.compile_report(channels
=channels
)
1972 elif report_type
== 'device':
1973 return self
.gather_device_report()
1974 elif report_type
== 'all':
1975 return {'report': self
.compile_report(channels
=channels
),
1976 'device_report': self
.gather_device_report()}
1979 def self_test(self
) -> None:
1980 report
= self
.compile_report()
1981 if len(report
) == 0:
1982 raise RuntimeError('Report is empty')
1984 if 'report_id' not in report
:
1985 raise RuntimeError('report_id not found in report')
1987 def shutdown(self
) -> None:
1991 def refresh_health_checks(self
) -> None:
1993 # TODO do we want to nag also in case the user is not opted-in?
1994 if self
.enabled
and self
.should_nag():
1995 health_checks
['TELEMETRY_CHANGED'] = {
1996 'severity': 'warning',
1997 'summary': 'Telemetry requires re-opt-in',
1999 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
2002 self
.set_health_checks(health_checks
)
2004 def serve(self
) -> None:
2008 self
.log
.debug('Waiting for mgr to warm up')
2014 self
.refresh_health_checks()
2016 if not self
.is_opted_in():
2017 self
.log
.debug('Not sending report until user re-opts-in')
2018 self
.event
.wait(1800)
2020 if not self
.enabled
:
2021 self
.log
.debug('Not sending report until configured to do so')
2022 self
.event
.wait(1800)
2025 now
= int(time
.time())
2026 if not self
.last_upload
or \
2027 (now
- self
.last_upload
) > self
.interval
* 3600:
2028 self
.log
.info('Compiling and sending report to %s',
2032 self
.last_report
= self
.compile_report()
2034 self
.log
.exception('Exception while compiling report:')
2036 self
.send(self
.last_report
)
2038 self
.log
.debug('Interval for sending new report has not expired')
2041 self
.log
.debug('Sleeping for %d seconds', sleep
)
2042 self
.event
.wait(sleep
)
2045 def can_run() -> Tuple
[bool, str]: