2 Telemetry module for ceph-mgr
4 Collect statistics from Ceph cluster and send this back to the Ceph project
17 from datetime
import datetime
, timedelta
18 from prettytable
import PrettyTable
19 from threading
import Event
, Lock
20 from collections
import defaultdict
21 from typing
import cast
, Any
, DefaultDict
, Dict
, List
, Optional
, Tuple
, TypeVar
, TYPE_CHECKING
, Union
23 from mgr_module
import CLICommand
, CLIReadCommand
, MgrModule
, Option
, OptionValue
, ServiceInfoT
26 ALL_CHANNELS
= ['basic', 'ident', 'crash', 'device', 'perf']
28 LICENSE
= 'sharing-1-0'
29 LICENSE_NAME
= 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL
= 'https://cdla.io/sharing-1-0/'
32 # Latest revision of the telemetry report. Bump this each time we make
36 # History of revisions
37 # --------------------
40 # Mimic and/or nautilus are lumped together here, since
41 # we didn't track revisions yet.
44 # - added revision tracking, nagging, etc.
45 # - added config option changes
47 # - added explicit license acknowledgement to the opt-in process
50 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
53 # how much metadata is cached, rfiles, rbytes, rsnapshots)
54 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
55 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
56 # - whether an OSD cluster network is in use
57 # - rbd pool and image count, and rbd mirror mode (pool-level)
58 # - rgw daemons, zones, zonegroups; which rgw frontends
61 class Collection(str, enum
.Enum
):
62 basic_base
= 'basic_base'
63 device_base
= 'device_base'
64 crash_base
= 'crash_base'
65 ident_base
= 'ident_base'
66 perf_perf
= 'perf_perf'
67 basic_mds_metadata
= 'basic_mds_metadata'
68 basic_pool_usage
= 'basic_pool_usage'
69 basic_usage_by_class
= 'basic_usage_by_class'
71 MODULE_COLLECTION
: List
[Dict
] = [
73 "name": Collection
.basic_base
,
74 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
79 "name": Collection
.device_base
,
80 "description": "Information about device health metrics",
85 "name": Collection
.crash_base
,
86 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
91 "name": Collection
.ident_base
,
92 "description": "User-provided identifying information about the cluster",
97 "name": Collection
.perf_perf
,
98 "description": "Information about performance counters of the cluster",
103 "name": Collection
.basic_mds_metadata
,
104 "description": "MDS metadata",
109 "name": Collection
.basic_pool_usage
,
110 "description": "Default pool application and usage statistics",
115 "name": Collection
.basic_usage_by_class
,
116 "description": "Default device class usage statistics",
122 class Module(MgrModule
):
128 "kernel_description",
130 "distro_description",
137 default
='https://telemetry.ceph.com/report'),
138 Option(name
='device_url',
140 default
='https://telemetry.ceph.com/device'),
141 Option(name
='enabled',
144 Option(name
='last_opt_revision',
147 Option(name
='leaderboard',
150 Option(name
='description',
153 Option(name
='contact',
156 Option(name
='organization',
162 Option(name
='interval',
166 Option(name
='channel_basic',
169 desc
='Share basic cluster information (size, version)'),
170 Option(name
='channel_ident',
173 desc
='Share a user-provided description and/or contact email for the cluster'),
174 Option(name
='channel_crash',
177 desc
='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
178 Option(name
='channel_device',
181 desc
=('Share device health metrics '
182 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
183 Option(name
='channel_perf',
186 desc
='Share various performance metrics of a cluster'),
190 def config_keys(self
) -> Dict
[str, OptionValue
]:
191 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
193 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
194 super(Module
, self
).__init
__(*args
, **kwargs
)
197 self
.db_collection
: Optional
[List
[str]] = None
198 self
.last_opted_in_ceph_version
: Optional
[int] = None
199 self
.last_opted_out_ceph_version
: Optional
[int] = None
200 self
.last_upload
: Optional
[int] = None
201 self
.last_report
: Dict
[str, Any
] = dict()
202 self
.report_id
: Optional
[str] = None
203 self
.salt
: Optional
[str] = None
204 self
.get_report_lock
= Lock()
205 self
.config_update_module_option()
206 # for mypy which does not run the code
211 self
.last_opt_revision
= 0
212 self
.leaderboard
= ''
215 self
.channel_basic
= True
216 self
.channel_ident
= False
217 self
.channel_crash
= True
218 self
.channel_device
= True
219 self
.channel_perf
= False
220 self
.db_collection
= ['basic_base', 'device_base']
221 self
.last_opted_in_ceph_version
= 17
222 self
.last_opted_out_ceph_version
= 0
224 def config_update_module_option(self
) -> None:
225 for opt
in self
.MODULE_OPTIONS
:
228 self
.get_module_option(opt
['name']))
229 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
231 def config_notify(self
) -> None:
232 self
.config_update_module_option()
233 # wake up serve() thread
236 def load(self
) -> None:
237 last_upload
= self
.get_store('last_upload', None)
238 if last_upload
is None:
239 self
.last_upload
= None
241 self
.last_upload
= int(last_upload
)
243 report_id
= self
.get_store('report_id', None)
244 if report_id
is None:
245 self
.report_id
= str(uuid
.uuid4())
246 self
.set_store('report_id', self
.report_id
)
248 self
.report_id
= report_id
250 salt
= self
.get_store('salt', None)
252 self
.salt
= str(uuid
.uuid4())
253 self
.set_store('salt', self
.salt
)
257 self
.init_collection()
259 last_opted_in_ceph_version
= self
.get_store('last_opted_in_ceph_version', None)
260 if last_opted_in_ceph_version
is None:
261 self
.last_opted_in_ceph_version
= None
263 self
.last_opted_in_ceph_version
= int(last_opted_in_ceph_version
)
265 last_opted_out_ceph_version
= self
.get_store('last_opted_out_ceph_version', None)
266 if last_opted_out_ceph_version
is None:
267 self
.last_opted_out_ceph_version
= None
269 self
.last_opted_out_ceph_version
= int(last_opted_out_ceph_version
)
271 def gather_osd_metadata(self
,
272 osd_map
: Dict
[str, List
[Dict
[str, int]]]) -> Dict
[str, Dict
[str, int]]:
273 keys
= ["osd_objectstore", "rotational"]
274 keys
+= self
.metadata_keys
276 metadata
: Dict
[str, Dict
[str, int]] = dict()
278 metadata
[key
] = defaultdict(int)
280 for osd
in osd_map
['osds']:
281 res
= self
.get_metadata('osd', str(osd
['osd']))
283 self
.log
.debug('Could not get metadata for osd.%s' % str(osd
['osd']))
285 for k
, v
in res
.items():
293 def gather_mon_metadata(self
,
294 mon_map
: Dict
[str, List
[Dict
[str, str]]]) -> Dict
[str, Dict
[str, int]]:
296 keys
+= self
.metadata_keys
298 metadata
: Dict
[str, Dict
[str, int]] = dict()
300 metadata
[key
] = defaultdict(int)
302 for mon
in mon_map
['mons']:
303 res
= self
.get_metadata('mon', mon
['name'])
305 self
.log
.debug('Could not get metadata for mon.%s' % (mon
['name']))
307 for k
, v
in res
.items():
315 def gather_mds_metadata(self
) -> Dict
[str, Dict
[str, int]]:
316 metadata
: Dict
[str, Dict
[str, int]] = dict()
318 res
= self
.get('mds_metadata') # metadata of *all* mds daemons
319 if res
is None or not res
:
320 self
.log
.debug('Could not get metadata for mds daemons')
324 keys
+= self
.metadata_keys
327 metadata
[key
] = defaultdict(int)
329 for mds
in res
.values():
330 for k
, v
in mds
.items():
338 def gather_crush_info(self
) -> Dict
[str, Union
[int,
343 osdmap
= self
.get_osdmap()
344 crush_raw
= osdmap
.get_crush()
345 crush
= crush_raw
.dump()
347 BucketKeyT
= TypeVar('BucketKeyT', int, str)
349 def inc(d
: Dict
[BucketKeyT
, int], k
: BucketKeyT
) -> None:
355 device_classes
: Dict
[str, int] = {}
356 for dev
in crush
['devices']:
357 inc(device_classes
, dev
.get('class', ''))
359 bucket_algs
: Dict
[str, int] = {}
360 bucket_types
: Dict
[str, int] = {}
361 bucket_sizes
: Dict
[int, int] = {}
362 for bucket
in crush
['buckets']:
363 if '~' in bucket
['name']: # ignore shadow buckets
365 inc(bucket_algs
, bucket
['alg'])
366 inc(bucket_types
, bucket
['type_id'])
367 inc(bucket_sizes
, len(bucket
['items']))
370 'num_devices': len(crush
['devices']),
371 'num_types': len(crush
['types']),
372 'num_buckets': len(crush
['buckets']),
373 'num_rules': len(crush
['rules']),
374 'device_classes': list(device_classes
.values()),
375 'tunables': crush
['tunables'],
376 'compat_weight_set': '-1' in crush
['choose_args'],
377 'num_weight_sets': len(crush
['choose_args']),
378 'bucket_algs': bucket_algs
,
379 'bucket_sizes': bucket_sizes
,
380 'bucket_types': bucket_types
,
383 def gather_configs(self
) -> Dict
[str, List
[str]]:
384 # cluster config options
386 r
, outb
, outs
= self
.mon_command({
387 'prefix': 'config dump',
393 dump
= json
.loads(outb
)
394 except json
.decoder
.JSONDecodeError
:
397 name
= opt
.get('name')
400 # daemon-reported options (which may include ceph.conf)
402 ls
= self
.get("modified_config_options")
403 for opt
in ls
.get('options', {}):
406 'cluster_changed': sorted(list(cluster
)),
407 'active_changed': sorted(list(active
)),
410 def get_heap_stats(self
) -> Dict
[str, dict]:
411 # Initialize result dict
412 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(int))
414 # Get list of osd ids from the metadata
415 osd_metadata
= self
.get('osd_metadata')
417 # Grab output from the "osd.x heap stats" command
418 for osd_id
in osd_metadata
:
424 r
, outb
, outs
= self
.osd_command(cmd_dict
)
426 self
.log
.debug("Invalid command dictionary.")
429 if 'tcmalloc heap stats' in outs
:
430 values
= [int(i
) for i
in outs
.split() if i
.isdigit()]
431 # `categories` must be ordered this way for the correct output to be parsed
432 categories
= ['use_by_application',
433 'page_heap_freelist',
434 'central_cache_freelist',
435 'transfer_cache_freelist',
436 'thread_cache_freelists',
438 'actual_memory_used',
440 'virtual_address_space_used',
442 'thread_heaps_in_use',
443 'tcmalloc_page_size']
444 if len(values
) != len(categories
):
445 self
.log
.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
446 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id
, len(values
), values
, len(categories
), categories
, outs
))
448 osd
= 'osd.' + str(osd_id
)
449 result
[osd
] = dict(zip(categories
, values
))
451 self
.log
.debug('No heap stats available on osd.{}: {}'.format(osd_id
, outs
))
456 def get_mempool(self
, mode
: str = 'separated') -> Dict
[str, dict]:
457 # Initialize result dict
458 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(int))
460 # Get list of osd ids from the metadata
461 osd_metadata
= self
.get('osd_metadata')
463 # Grab output from the "osd.x dump_mempools" command
464 for osd_id
in osd_metadata
:
466 'prefix': 'dump_mempools',
470 r
, outb
, outs
= self
.osd_command(cmd_dict
)
472 self
.log
.debug("Invalid command dictionary.")
476 # This is where the mempool will land.
477 dump
= json
.loads(outb
)
478 if mode
== 'separated':
479 result
["osd." + str(osd_id
)] = dump
['mempool']['by_pool']
480 elif mode
== 'aggregated':
481 for mem_type
in dump
['mempool']['by_pool']:
482 result
[mem_type
]['bytes'] += dump
['mempool']['by_pool'][mem_type
]['bytes']
483 result
[mem_type
]['items'] += dump
['mempool']['by_pool'][mem_type
]['items']
485 self
.log
.debug("Incorrect mode specified in get_mempool")
486 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
487 self
.log
.debug("Error caught on osd.{}: {}".format(osd_id
, e
))
492 def get_osd_histograms(self
, mode
: str = 'separated') -> List
[Dict
[str, dict]]:
493 # Initialize result dict
494 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
498 lambda: defaultdict(int))))))
500 # Get list of osd ids from the metadata
501 osd_metadata
= self
.get('osd_metadata')
503 # Grab output from the "osd.x perf histogram dump" command
504 for osd_id
in osd_metadata
:
506 'prefix': 'perf histogram dump',
510 r
, outb
, outs
= self
.osd_command(cmd_dict
)
511 # Check for invalid calls
513 self
.log
.debug("Invalid command dictionary.")
517 # This is where the histograms will land if there are any.
518 dump
= json
.loads(outb
)
520 for histogram
in dump
['osd']:
521 # Log axis information. There are two axes, each represented
522 # as a dictionary. Both dictionaries are contained inside a
523 # list called 'axes'.
525 for axis
in dump
['osd'][histogram
]['axes']:
527 # This is the dict that contains information for an individual
528 # axis. It will be appended to the 'axes' list at the end.
529 axis_dict
: Dict
[str, Any
] = defaultdict()
531 # Collecting information for buckets, min, name, etc.
532 axis_dict
['buckets'] = axis
['buckets']
533 axis_dict
['min'] = axis
['min']
534 axis_dict
['name'] = axis
['name']
535 axis_dict
['quant_size'] = axis
['quant_size']
536 axis_dict
['scale_type'] = axis
['scale_type']
538 # Collecting ranges; placing them in lists to
539 # improve readability later on.
541 for _range
in axis
['ranges']:
542 _max
, _min
= None, None
547 ranges
.append([_min
, _max
])
548 axis_dict
['ranges'] = ranges
550 # Now that 'axis_dict' contains all the appropriate
551 # information for the current axis, append it to the 'axes' list.
552 # There will end up being two axes in the 'axes' list, since the
554 axes
.append(axis_dict
)
556 # Add the 'axes' list, containing both axes, to result.
557 # At this point, you will see that the name of the key is the string
558 # form of our axes list (str(axes)). This is there so that histograms
559 # with different axis configs will not be combined.
560 # These key names are later dropped when only the values are returned.
561 result
[str(axes
)][histogram
]['axes'] = axes
563 # Collect current values and make sure they are in
566 for value_list
in dump
['osd'][histogram
]['values']:
567 values
.append([int(v
) for v
in value_list
])
569 if mode
== 'separated':
570 if 'osds' not in result
[str(axes
)][histogram
]:
571 result
[str(axes
)][histogram
]['osds'] = []
572 result
[str(axes
)][histogram
]['osds'].append({'osd_id': int(osd_id
), 'values': values
})
574 elif mode
== 'aggregated':
575 # Aggregate values. If 'values' have already been initialized,
577 if 'values' in result
[str(axes
)][histogram
]:
578 for i
in range (0, len(values
)):
579 for j
in range (0, len(values
[i
])):
580 values
[i
][j
] += result
[str(axes
)][histogram
]['values'][i
][j
]
582 # Add the values to result.
583 result
[str(axes
)][histogram
]['values'] = values
585 # Update num_combined_osds
586 if 'num_combined_osds' not in result
[str(axes
)][histogram
]:
587 result
[str(axes
)][histogram
]['num_combined_osds'] = 1
589 result
[str(axes
)][histogram
]['num_combined_osds'] += 1
591 self
.log
.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode
))
594 # Sometimes, json errors occur if you give it an empty string.
595 # I am also putting in a catch for a KeyError since it could
596 # happen where the code is assuming that a key exists in the
597 # schema when it doesn't. In either case, we'll handle that
598 # by continuing and collecting what we can from other osds.
599 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
600 self
.log
.debug("Error caught on osd.{}: {}".format(osd_id
, e
))
603 return list(result
.values())
605 def get_io_rate(self
) -> dict:
606 return self
.get('io_rate')
608 def get_stats_per_pool(self
) -> dict:
609 result
= self
.get('pg_dump')['pool_stats']
611 # collect application metadata from osd_map
612 osd_map
= self
.get('osd_map')
613 application_metadata
= {pool
['pool']: pool
['application_metadata'] for pool
in osd_map
['pools']}
615 # add application to each pool from pg_dump
617 pool
['application'] = []
618 # Only include default applications
619 for application
in application_metadata
[pool
['poolid']]:
620 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
621 pool
['application'].append(application
)
625 def get_stats_per_pg(self
) -> dict:
626 return self
.get('pg_dump')['pg_stats']
628 def get_rocksdb_stats(self
) -> Dict
[str, str]:
630 result
: Dict
[str, str] = defaultdict()
631 version
= self
.get_rocksdb_version()
634 result
['version'] = version
638 def gather_crashinfo(self
) -> List
[Dict
[str, str]]:
639 crashlist
: List
[Dict
[str, str]] = list()
640 errno
, crashids
, err
= self
.remote('crash', 'ls')
643 for crashid
in crashids
.split():
644 errno
, crashinfo
, err
= self
.remote('crash', 'do_info', crashid
)
647 c
= json
.loads(crashinfo
)
650 del c
['utsname_hostname']
652 # entity_name might have more than one '.', beware
653 (etype
, eid
) = c
.get('entity_name', '').split('.', 1)
656 m
.update(self
.salt
.encode('utf-8'))
657 m
.update(eid
.encode('utf-8'))
658 m
.update(self
.salt
.encode('utf-8'))
659 c
['entity_name'] = etype
+ '.' + m
.hexdigest()
661 # redact final line of python tracebacks, as the exception
662 # payload may contain identifying information
663 if 'mgr_module' in c
and 'backtrace' in c
:
664 # backtrace might be empty
665 if len(c
['backtrace']) > 0:
666 c
['backtrace'][-1] = '<redacted>'
671 def gather_perf_counters(self
, mode
: str = 'separated') -> Dict
[str, dict]:
672 # Extract perf counter data with get_all_perf_counters(), a method
673 # from mgr/mgr_module.py. This method returns a nested dictionary that
674 # looks a lot like perf schema, except with some additional fields.
676 # Example of output, a snapshot of a mon daemon:
678 # "bluestore.kv_flush_lat": {
680 # "description": "Average kv_thread flush latency",
688 all_perf_counters
= self
.get_all_perf_counters()
690 # Initialize 'result' dict
691 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
692 lambda: defaultdict(lambda: defaultdict(int))))
694 for daemon
in all_perf_counters
:
696 # Calculate num combined daemon types if in aggregated mode
697 if mode
== 'aggregated':
698 daemon_type
= daemon
[0:3] # i.e. 'mds', 'osd', 'rgw'
699 if 'num_combined_daemons' not in result
[daemon_type
]:
700 result
[daemon_type
]['num_combined_daemons'] = 1
702 result
[daemon_type
]['num_combined_daemons'] += 1
704 for collection
in all_perf_counters
[daemon
]:
705 # Split the collection to avoid redundancy in final report; i.e.:
706 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
707 # bluestore: kv_flush_lat, kv_final_lat
708 col_0
, col_1
= collection
.split('.')
710 # Debug log for empty keys. This initially was a problem for prioritycache
711 # perf counters, where the col_0 was empty for certain mon counters:
713 # "mon.a": { instead of "mon.a": {
714 # "": { "prioritycache": {
715 # "cache_bytes": {...}, "cache_bytes": {...},
717 # This log is here to detect any future instances of a similar issue.
718 if (daemon
== "") or (col_0
== "") or (col_1
== ""):
719 self
.log
.debug("Instance of an empty key: {}{}".format(daemon
, collection
))
721 if mode
== 'separated':
722 # Add value to result
723 result
[daemon
][col_0
][col_1
]['value'] = \
724 all_perf_counters
[daemon
][collection
]['value']
726 # Check that 'count' exists, as not all counters have a count field.
727 if 'count' in all_perf_counters
[daemon
][collection
]:
728 result
[daemon
][col_0
][col_1
]['count'] = \
729 all_perf_counters
[daemon
][collection
]['count']
730 elif mode
== 'aggregated':
731 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
732 # has a uniquely-named collection that starts off identically (i.e.
733 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
734 # This bit of code combines these unique counters all under one rgw instance.
735 # Without this check, the schema would remain separeted out in the final report.
736 if col_0
[0:11] == "objecter-0x":
737 col_0
= "objecter-0x"
739 # Check that the value can be incremented. In some cases,
740 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
741 # In those cases, the value is a dictionary, and not a number.
742 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
743 if isinstance(all_perf_counters
[daemon
][collection
]['value'], numbers
.Number
):
744 result
[daemon_type
][col_0
][col_1
]['value'] += \
745 all_perf_counters
[daemon
][collection
]['value']
747 # Check that 'count' exists, as not all counters have a count field.
748 if 'count' in all_perf_counters
[daemon
][collection
]:
749 result
[daemon_type
][col_0
][col_1
]['count'] += \
750 all_perf_counters
[daemon
][collection
]['count']
752 self
.log
.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode
))
757 def get_active_channels(self
) -> List
[str]:
759 if self
.channel_basic
:
761 if self
.channel_crash
:
763 if self
.channel_device
:
765 if self
.channel_ident
:
767 if self
.channel_perf
:
771 def gather_device_report(self
) -> Dict
[str, Dict
[str, Dict
[str, str]]]:
773 time_format
= self
.remote('devicehealth', 'get_time_format')
774 except Exception as e
:
775 self
.log
.debug('Unable to format time: {}'.format(e
))
777 cutoff
= datetime
.utcnow() - timedelta(hours
=self
.interval
* 2)
778 min_sample
= cutoff
.strftime(time_format
)
780 devices
= self
.get('devices')['devices']
782 self
.log
.debug('Unable to get device info from the mgr.')
785 # anon-host-id -> anon-devid -> { timestamp -> record }
786 res
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
790 # this is a map of stamp -> {device info}
791 m
= self
.remote('devicehealth', 'get_recent_device_metrics',
793 except Exception as e
:
794 self
.log
.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid
, e
))
799 host
= d
['location'][0]['host']
800 except (KeyError, IndexError) as e
:
801 self
.log
.debug('Unable to get host from device with id "{}": {}'.format(devid
, e
))
803 anon_host
= self
.get_store('host-id/%s' % host
)
805 anon_host
= str(uuid
.uuid1())
806 self
.set_store('host-id/%s' % host
, anon_host
)
808 for dev
, rep
in m
.items():
809 rep
['host_id'] = anon_host
810 if serial
is None and 'serial_number' in rep
:
811 serial
= rep
['serial_number']
813 # anonymize device id
814 anon_devid
= self
.get_store('devid-id/%s' % devid
)
816 # ideally devid is 'vendor_model_serial',
817 # but can also be 'model_serial', 'serial'
819 anon_devid
= f
"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
821 anon_devid
= str(uuid
.uuid1())
822 self
.set_store('devid-id/%s' % devid
, anon_devid
)
823 self
.log
.info('devid %s / %s, host %s / %s' % (devid
, anon_devid
,
826 # anonymize the smartctl report itself
828 m_str
= json
.dumps(m
)
829 m
= json
.loads(m_str
.replace(serial
, 'deleted'))
831 if anon_host
not in res
:
833 res
[anon_host
][anon_devid
] = m
836 def get_latest(self
, daemon_type
: str, daemon_name
: str, stat
: str) -> int:
837 data
= self
.get_counter(daemon_type
, daemon_name
, stat
)[stat
]
843 def compile_report(self
, channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
845 channels
= self
.get_active_channels()
847 'leaderboard': self
.leaderboard
,
849 'report_timestamp': datetime
.utcnow().isoformat(),
850 'report_id': self
.report_id
,
851 'channels': channels
,
852 'channels_available': ALL_CHANNELS
,
854 'collections_available': [c
['name'].name
for c
in MODULE_COLLECTION
],
855 'collections_opted_in': [c
['name'].name
for c
in MODULE_COLLECTION
if self
.is_enabled_collection(c
['name'])],
858 if 'ident' in channels
:
859 for option
in ['description', 'contact', 'organization']:
860 report
[option
] = getattr(self
, option
)
862 if 'basic' in channels
:
863 mon_map
= self
.get('mon_map')
864 osd_map
= self
.get('osd_map')
865 service_map
= self
.get('service_map')
866 fs_map
= self
.get('fs_map')
868 df_pools
= {pool
['id']: pool
for pool
in df
['pools']}
870 report
['created'] = mon_map
['created']
877 for mon
in mon_map
['mons']:
878 for a
in mon
['public_addrs']['addrvec']:
879 if a
['type'] == 'v2':
881 elif a
['type'] == 'v1':
883 if a
['addr'].startswith('['):
888 'count': len(mon_map
['mons']),
889 'features': mon_map
['features'],
890 'min_mon_release': mon_map
['min_mon_release'],
891 'v1_addr_mons': v1_mons
,
892 'v2_addr_mons': v2_mons
,
893 'ipv4_addr_mons': ipv4_mons
,
894 'ipv6_addr_mons': ipv6_mons
,
897 report
['config'] = self
.gather_configs()
902 rbd_num_images_by_pool
= []
903 rbd_mirroring_by_pool
= []
905 report
['pools'] = list()
906 for pool
in osd_map
['pools']:
907 num_pg
+= pool
['pg_num']
909 if pool
['erasure_code_profile']:
910 orig
= osd_map
['erasure_code_profiles'].get(
911 pool
['erasure_code_profile'], {})
913 k
: orig
[k
] for k
in orig
.keys()
914 if k
in ['k', 'm', 'plugin', 'technique',
915 'crush-failure-domain', 'l']
918 'pool': pool
['pool'],
919 'pg_num': pool
['pg_num'],
920 'pgp_num': pool
['pg_placement_num'],
921 'size': pool
['size'],
922 'min_size': pool
['min_size'],
923 'pg_autoscale_mode': pool
['pg_autoscale_mode'],
924 'target_max_bytes': pool
['target_max_bytes'],
925 'target_max_objects': pool
['target_max_objects'],
926 'type': ['', 'replicated', '', 'erasure'][pool
['type']],
927 'erasure_code_profile': ec_profile
,
928 'cache_mode': pool
['cache_mode'],
931 # basic_pool_usage collection
932 if self
.is_enabled_collection(Collection
.basic_pool_usage
):
933 pool_data
['application'] = []
934 for application
in pool
['application_metadata']:
935 # Only include default applications
936 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
937 pool_data
['application'].append(application
)
938 pool_stats
= df_pools
[pool
['pool']]['stats']
939 pool_data
['stats'] = { # filter out kb_used
940 'avail_raw': pool_stats
['avail_raw'],
941 'bytes_used': pool_stats
['bytes_used'],
942 'compress_bytes_used': pool_stats
['compress_bytes_used'],
943 'compress_under_bytes': pool_stats
['compress_under_bytes'],
944 'data_bytes_used': pool_stats
['data_bytes_used'],
945 'dirty': pool_stats
['dirty'],
946 'max_avail': pool_stats
['max_avail'],
947 'objects': pool_stats
['objects'],
948 'omap_bytes_used': pool_stats
['omap_bytes_used'],
949 'percent_used': pool_stats
['percent_used'],
950 'quota_bytes': pool_stats
['quota_bytes'],
951 'quota_objects': pool_stats
['quota_objects'],
952 'rd': pool_stats
['rd'],
953 'rd_bytes': pool_stats
['rd_bytes'],
954 'stored': pool_stats
['stored'],
955 'stored_data': pool_stats
['stored_data'],
956 'stored_omap': pool_stats
['stored_omap'],
957 'stored_raw': pool_stats
['stored_raw'],
958 'wr': pool_stats
['wr'],
959 'wr_bytes': pool_stats
['wr_bytes']
962 cast(List
[Dict
[str, Any
]], report
['pools']).append(pool_data
)
963 if 'rbd' in pool
['application_metadata']:
965 ioctx
= self
.rados
.open_ioctx(pool
['pool_name'])
966 rbd_num_images_by_pool
.append(
967 sum(1 for _
in rbd
.RBD().list2(ioctx
)))
968 rbd_mirroring_by_pool
.append(
969 rbd
.RBD().mirror_mode_get(ioctx
) != rbd
.RBD_MIRROR_MODE_DISABLED
)
971 'num_pools': rbd_num_pools
,
972 'num_images_by_pool': rbd_num_images_by_pool
,
973 'mirroring_by_pool': rbd_mirroring_by_pool
}
976 cluster_network
= False
977 for osd
in osd_map
['osds']:
978 if osd
['up'] and not cluster_network
:
979 front_ip
= osd
['public_addrs']['addrvec'][0]['addr'].split(':')[0]
980 back_ip
= osd
['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
981 if front_ip
!= back_ip
:
982 cluster_network
= True
984 'count': len(osd_map
['osds']),
985 'require_osd_release': osd_map
['require_osd_release'],
986 'require_min_compat_client': osd_map
['require_min_compat_client'],
987 'cluster_network': cluster_network
,
991 report
['crush'] = self
.gather_crush_info()
995 'count': len(fs_map
['filesystems']),
996 'feature_flags': fs_map
['feature_flags'],
997 'num_standby_mds': len(fs_map
['standbys']),
1000 num_mds
= len(fs_map
['standbys'])
1001 for fsm
in fs_map
['filesystems']:
1011 for gid
, mds
in fs
['info'].items():
1012 num_sessions
+= self
.get_latest('mds', mds
['name'],
1013 'mds_sessions.session_count')
1014 cached_ino
+= self
.get_latest('mds', mds
['name'],
1016 cached_dn
+= self
.get_latest('mds', mds
['name'],
1018 cached_cap
+= self
.get_latest('mds', mds
['name'],
1020 subtrees
+= self
.get_latest('mds', mds
['name'],
1022 if mds
['rank'] == 0:
1023 rfiles
= self
.get_latest('mds', mds
['name'],
1025 rbytes
= self
.get_latest('mds', mds
['name'],
1027 rsnaps
= self
.get_latest('mds', mds
['name'],
1029 report
['fs']['filesystems'].append({ # type: ignore
1030 'max_mds': fs
['max_mds'],
1031 'ever_allowed_features': fs
['ever_allowed_features'],
1032 'explicitly_allowed_features': fs
['explicitly_allowed_features'],
1033 'num_in': len(fs
['in']),
1034 'num_up': len(fs
['up']),
1035 'num_standby_replay': len(
1036 [mds
for gid
, mds
in fs
['info'].items()
1037 if mds
['state'] == 'up:standby-replay']),
1038 'num_mds': len(fs
['info']),
1039 'num_sessions': num_sessions
,
1040 'cached_inos': cached_ino
,
1041 'cached_dns': cached_dn
,
1042 'cached_caps': cached_cap
,
1043 'cached_subtrees': subtrees
,
1044 'balancer_enabled': len(fs
['balancer']) > 0,
1045 'num_data_pools': len(fs
['data_pools']),
1046 'standby_count_wanted': fs
['standby_count_wanted'],
1047 'approx_ctime': fs
['created'][0:7],
1052 num_mds
+= len(fs
['info'])
1053 report
['fs']['total_num_mds'] = num_mds
# type: ignore
1056 report
['metadata'] = dict(osd
=self
.gather_osd_metadata(osd_map
),
1057 mon
=self
.gather_mon_metadata(mon_map
))
1059 if self
.is_enabled_collection(Collection
.basic_mds_metadata
):
1060 report
['metadata']['mds'] = self
.gather_mds_metadata() # type: ignore
1063 servers
= self
.list_servers()
1064 self
.log
.debug('servers %s' % servers
)
1066 'num': len([h
for h
in servers
if h
['hostname']]),
1068 for t
in ['mon', 'mds', 'osd', 'mgr']:
1069 nr_services
= sum(1 for host
in servers
if
1070 any(service
for service
in cast(List
[ServiceInfoT
],
1072 if service
['type'] == t
))
1073 hosts
['num_with_' + t
] = nr_services
1074 report
['hosts'] = hosts
1077 'pools': len(df
['pools']),
1079 'total_used_bytes': df
['stats']['total_used_bytes'],
1080 'total_bytes': df
['stats']['total_bytes'],
1081 'total_avail_bytes': df
['stats']['total_avail_bytes']
1083 # basic_usage_by_class collection
1084 if self
.is_enabled_collection(Collection
.basic_usage_by_class
):
1085 report
['usage']['stats_by_class'] = {} # type: ignore
1086 for device_class
in df
['stats_by_class']:
1087 if device_class
in ['hdd', 'ssd', 'nvme']:
1088 report
['usage']['stats_by_class'][device_class
] = df
['stats_by_class'][device_class
] # type: ignore
1090 services
: DefaultDict
[str, int] = defaultdict(int)
1091 for key
, value
in service_map
['services'].items():
1099 d
= value
.get('daemons', dict())
1100 for k
, v
in d
.items():
1101 if k
== 'summary' and v
:
1103 elif isinstance(v
, dict) and 'metadata' in v
:
1105 zones
.add(v
['metadata']['zone_id'])
1106 zonegroups
.add(v
['metadata']['zonegroup_id'])
1107 frontends
.add(v
['metadata']['frontend_type#0'])
1109 # we could actually iterate over all the keys of
1110 # the dict and check for how many frontends there
1111 # are, but it is unlikely that one would be running
1112 # more than 2 supported ones
1113 f2
= v
['metadata'].get('frontend_type#1', None)
1117 rgw
['count'] = count
1118 rgw
['zones'] = len(zones
)
1119 rgw
['zonegroups'] = len(zonegroups
)
1120 rgw
['frontends'] = list(frontends
) # sets aren't json-serializable
1122 report
['services'] = services
1125 report
['balancer'] = self
.remote('balancer', 'gather_telemetry')
1127 report
['balancer'] = {
1131 if 'crash' in channels
:
1132 report
['crashes'] = self
.gather_crashinfo()
1134 if 'perf' in channels
:
1135 if self
.is_enabled_collection(Collection
.perf_perf
):
1136 report
['perf_counters'] = self
.gather_perf_counters('separated')
1137 report
['stats_per_pool'] = self
.get_stats_per_pool()
1138 report
['stats_per_pg'] = self
.get_stats_per_pg()
1139 report
['io_rate'] = self
.get_io_rate()
1140 report
['osd_perf_histograms'] = self
.get_osd_histograms('separated')
1141 report
['mempool'] = self
.get_mempool('separated')
1142 report
['heap_stats'] = self
.get_heap_stats()
1143 report
['rocksdb_stats'] = self
.get_rocksdb_stats()
1145 # NOTE: We do not include the 'device' channel in this report; it is
1146 # sent to a different endpoint.
1150 def _try_post(self
, what
: str, url
: str, report
: Dict
[str, Dict
[str, str]]) -> Optional
[str]:
1151 self
.log
.info('Sending %s to: %s' % (what
, url
))
1154 self
.log
.info('Send using HTTP(S) proxy: %s', self
.proxy
)
1155 proxies
['http'] = self
.proxy
1156 proxies
['https'] = self
.proxy
1158 resp
= requests
.put(url
=url
, json
=report
, proxies
=proxies
)
1159 resp
.raise_for_status()
1160 except Exception as e
:
1161 fail_reason
= 'Failed to send %s to %s: %s' % (what
, url
, str(e
))
1162 self
.log
.error(fail_reason
)
1166 class EndPoint(enum
.Enum
):
1170 def collection_delta(self
, channels
: Optional
[List
[str]] = None) -> Optional
[List
[Collection
]]:
1172 Find collections that are available in the module, but are not in the db
1174 if self
.db_collection
is None:
1178 channels
= ALL_CHANNELS
1181 if ch
not in ALL_CHANNELS
:
1182 self
.log
.debug(f
"invalid channel name: {ch}")
1185 new_collection
: List
[Collection
] = []
1187 for c
in MODULE_COLLECTION
:
1188 if c
['name'].name
not in self
.db_collection
:
1189 if c
['channel'] in channels
:
1190 new_collection
.append(c
['name'])
1192 return new_collection
1194 def is_major_upgrade(self
) -> bool:
1196 Returns True only if the user last opted-in to an older major
1198 if self
.last_opted_in_ceph_version
is None or self
.last_opted_in_ceph_version
== 0:
1199 # we do not know what Ceph version was when the user last opted-in,
1200 # thus we do not wish to nag in case of a major upgrade
1203 mon_map
= self
.get('mon_map')
1204 mon_min
= mon_map
.get("min_mon_release", 0)
1206 if mon_min
- self
.last_opted_in_ceph_version
> 0:
1207 self
.log
.debug(f
"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1212 def is_opted_in(self
) -> bool:
1213 # If len is 0 it means that the user is either opted-out (never
1214 # opted-in, or invoked `telemetry off`), or they upgraded from a
1215 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1216 # regardless, hence is considered as opted-out
1217 if self
.db_collection
is None:
1219 return len(self
.db_collection
) > 0
1221 def should_nag(self
) -> bool:
1222 # Find delta between opted-in collections and module collections;
1223 # nag only if module has a collection which is not in db, and nag == True.
1225 # We currently do not nag if the user is opted-out (or never opted-in).
1226 # If we wish to do this in the future, we need to have a tri-mode state
1227 # (opted in, opted out, no action yet), and it needs to be guarded by a
1228 # config option (so that nagging can be turned off via config).
1229 # We also need to add a last_opted_out_ceph_version variable, for the
1230 # major upgrade check.
1232 # check if there are collections the user is not opt-in to
1233 # that we should nag about
1234 if self
.db_collection
is not None:
1235 for c
in MODULE_COLLECTION
:
1236 if c
['name'].name
not in self
.db_collection
:
1237 if c
['nag'] == True:
1238 self
.log
.debug(f
"The collection: {c['name']} is not reported")
1241 # user might be opted-in to the most recent collection, or there is no
1242 # new collection which requires nagging about; thus nag in case it's a
1243 # major upgrade and there are new collections
1244 # (which their own nag == False):
1245 new_collections
= False
1246 col_delta
= self
.collection_delta()
1247 if col_delta
is not None and len(col_delta
) > 0:
1248 new_collections
= True
1250 return self
.is_major_upgrade() and new_collections
1252 def init_collection(self
) -> None:
1253 # We fetch from db the collections the user had already opted-in to.
1254 # During the transition the results will be empty, but the user might
1255 # be opted-in to an older version (e.g. revision = 3)
1257 collection
= self
.get_store('collection')
1259 if collection
is not None:
1260 self
.db_collection
= json
.loads(collection
)
1262 if self
.db_collection
is None:
1263 # happens once on upgrade
1264 if not self
.enabled
:
1265 # user is not opted-in
1266 self
.set_store('collection', json
.dumps([]))
1267 self
.log
.debug("user is not opted-in")
1269 # user is opted-in, verify the revision:
1270 if self
.last_opt_revision
== REVISION
:
1271 self
.log
.debug(f
"telemetry revision is {REVISION}")
1272 base_collection
= [Collection
.basic_base
.name
, Collection
.device_base
.name
, Collection
.crash_base
.name
, Collection
.ident_base
.name
]
1273 self
.set_store('collection', json
.dumps(base_collection
))
1275 # user is opted-in to an older version, meaning they need
1276 # to re-opt in regardless
1277 self
.set_store('collection', json
.dumps([]))
1278 self
.log
.debug(f
"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1280 # reload collection after setting
1281 collection
= self
.get_store('collection')
1282 if collection
is not None:
1283 self
.db_collection
= json
.loads(collection
)
1285 raise RuntimeError('collection is None after initial setting')
1287 # user has already upgraded
1288 self
.log
.debug(f
"user has upgraded already: collection: {self.db_collection}")
1290 def is_enabled_collection(self
, collection
: Collection
) -> bool:
1291 if self
.db_collection
is None:
1293 return collection
.name
in self
.db_collection
1295 def opt_in_all_collections(self
) -> None:
1297 Opt-in to all collections; Update db with the currently available collections in the module
1299 if self
.db_collection
is None:
1300 raise RuntimeError('db_collection is None after initial setting')
1302 for c
in MODULE_COLLECTION
:
1303 if c
['name'].name
not in self
.db_collection
:
1304 self
.db_collection
.append(c
['name'])
1306 self
.set_store('collection', json
.dumps(self
.db_collection
))
1309 report
: Dict
[str, Dict
[str, str]],
1310 endpoint
: Optional
[List
[EndPoint
]] = None) -> Tuple
[int, str, str]:
1312 endpoint
= [self
.EndPoint
.ceph
, self
.EndPoint
.device
]
1315 self
.log
.debug('Send endpoints %s' % endpoint
)
1317 if e
== self
.EndPoint
.ceph
:
1318 fail_reason
= self
._try
_post
('ceph report', self
.url
, report
)
1320 failed
.append(fail_reason
)
1322 now
= int(time
.time())
1323 self
.last_upload
= now
1324 self
.set_store('last_upload', str(now
))
1325 success
.append('Ceph report sent to {0}'.format(self
.url
))
1326 self
.log
.info('Sent report to {0}'.format(self
.url
))
1327 elif e
== self
.EndPoint
.device
:
1328 if 'device' in self
.get_active_channels():
1329 devices
= self
.gather_device_report()
1333 for host
, ls
in devices
.items():
1334 self
.log
.debug('host %s devices %s' % (host
, ls
))
1337 fail_reason
= self
._try
_post
('devices', self
.device_url
,
1340 failed
.append(fail_reason
)
1345 success
.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1346 num_devs
, num_hosts
, len(devices
)))
1348 fail_reason
= 'Unable to send device report: Device channel is on, but the generated report was empty.'
1349 failed
.append(fail_reason
)
1350 self
.log
.error(fail_reason
)
1352 return 1, '', '\n'.join(success
+ failed
)
1353 return 0, '', '\n'.join(success
)
1355 def format_perf_histogram(self
, report
: Dict
[str, Any
]) -> None:
1356 # Formatting the perf histograms so they are human-readable. This will change the
1357 # ranges and values, which are currently in list form, into strings so that
1358 # they are displayed horizontally instead of vertically.
1360 # Formatting ranges and values in osd_perf_histograms
1361 mode
= 'osd_perf_histograms'
1362 for config
in report
[mode
]:
1363 for histogram
in config
:
1364 # Adjust ranges by converting lists into strings
1365 for axis
in config
[histogram
]['axes']:
1366 for i
in range(0, len(axis
['ranges'])):
1367 axis
['ranges'][i
] = str(axis
['ranges'][i
])
1369 for osd
in config
[histogram
]['osds']:
1370 for i
in range(0, len(osd
['values'])):
1371 osd
['values'][i
] = str(osd
['values'][i
])
1373 # If the perf channel is not enabled, there should be a KeyError since
1374 # 'osd_perf_histograms' would not be present in the report. In that case,
1375 # the show function should pass as usual without trying to format the
1379 def toggle_channel(self
, action
: str, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1381 Enable or disable a list of channels
1383 if not self
.enabled
:
1384 # telemetry should be on for channels to be toggled
1385 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1386 'Preview sample reports with `ceph telemetry preview`.'
1389 if channels
is None:
1390 msg
= f
'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1393 state
= action
== 'enable'
1396 if c
not in ALL_CHANNELS
:
1397 msg
= f
"{msg}{c} is not a valid channel name. "\
1398 f
"Available channels: {ALL_CHANNELS}.\n"
1400 self
.set_module_option(f
"channel_{c}", state
)
1404 msg
= f
"{msg}channel_{c} is {action}d\n"
1408 @CLIReadCommand('telemetry status')
1409 def status(self
) -> Tuple
[int, str, str]:
1411 Show current configuration
1414 for opt
in self
.MODULE_OPTIONS
:
1415 r
[opt
['name']] = getattr(self
, opt
['name'])
1416 r
['last_upload'] = (time
.ctime(self
.last_upload
)
1417 if self
.last_upload
else self
.last_upload
)
1418 return 0, json
.dumps(r
, indent
=4, sort_keys
=True), ''
1420 @CLIReadCommand('telemetry diff')
1421 def diff(self
) -> Tuple
[int, str, str]:
1423 Show the diff between opted-in collection and available collection
1428 for c
in MODULE_COLLECTION
:
1429 if not self
.is_enabled_collection(c
['name']):
1430 diff
.append({key
: val
for key
, val
in c
.items() if key
not in keys
})
1434 r
= "Telemetry is up to date"
1436 r
= json
.dumps(diff
, indent
=4, sort_keys
=True)
1440 @CLICommand('telemetry on')
1441 def on(self
, license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1443 Enable telemetry reports from this cluster
1445 if license
!= LICENSE
:
1446 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1447 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1449 self
.set_module_option('enabled', True)
1451 self
.opt_in_all_collections()
1453 # for major releases upgrade nagging
1454 mon_map
= self
.get('mon_map')
1455 mon_min
= mon_map
.get("min_mon_release", 0)
1456 self
.set_store('last_opted_in_ceph_version', str(mon_min
))
1457 self
.last_opted_in_ceph_version
= mon_min
1459 msg
= 'Telemetry is on.'
1460 disabled_channels
= ''
1461 active_channels
= self
.get_active_channels()
1462 for c
in ALL_CHANNELS
:
1463 if c
not in active_channels
and c
!= 'ident':
1464 disabled_channels
= f
"{disabled_channels} {c}"
1466 if len(disabled_channels
) > 0:
1467 msg
= f
"{msg}\nSome channels are disabled, please enable with:\n"\
1468 f
"`ceph telemetry enable channel{disabled_channels}`"
1472 @CLICommand('telemetry off')
1473 def off(self
) -> Tuple
[int, str, str]:
1475 Disable telemetry reports from this cluster
1477 if not self
.enabled
:
1478 # telemetry is already off
1479 msg
= 'Telemetry is currently not enabled, nothing to turn off. '\
1480 'Please consider opting-in with `ceph telemetry on`.\n' \
1481 'Preview sample reports with `ceph telemetry preview`.'
1484 self
.set_module_option('enabled', False)
1485 self
.enabled
= False
1486 self
.set_store('collection', json
.dumps([]))
1487 self
.db_collection
= []
1489 # we might need this info in the future, in case
1490 # of nagging when user is opted-out
1491 mon_map
= self
.get('mon_map')
1492 mon_min
= mon_map
.get("min_mon_release", 0)
1493 self
.set_store('last_opted_out_ceph_version', str(mon_min
))
1494 self
.last_opted_out_ceph_version
= mon_min
1496 msg
= 'Telemetry is now disabled.'
1499 @CLIReadCommand('telemetry enable channel all')
1500 def enable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1504 return self
.toggle_channel('enable', channels
)
1506 @CLIReadCommand('telemetry enable channel')
1507 def enable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1509 Enable a list of channels
1511 return self
.toggle_channel('enable', channels
)
1513 @CLIReadCommand('telemetry disable channel all')
1514 def disable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1516 Disable all channels
1518 return self
.toggle_channel('disable', channels
)
1520 @CLIReadCommand('telemetry disable channel')
1521 def disable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1523 Disable a list of channels
1525 return self
.toggle_channel('disable', channels
)
1527 @CLIReadCommand('telemetry channel ls')
1528 def channel_ls(self
) -> Tuple
[int, str, str]:
1532 table
= PrettyTable(
1534 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1537 table
.align
['NAME'] = 'l'
1538 table
.align
['ENABLED'] = 'l'
1539 table
.align
['DEFAULT'] = 'l'
1540 table
.align
['DESC'] = 'l'
1541 table
.left_padding_width
= 0
1542 table
.right_padding_width
= 4
1544 for c
in ALL_CHANNELS
:
1545 enabled
= "ON" if getattr(self
, f
"channel_{c}") else "OFF"
1546 for o
in self
.MODULE_OPTIONS
:
1547 if o
['name'] == f
"channel_{c}":
1548 default
= "ON" if o
.get('default', None) else "OFF"
1549 desc
= o
.get('desc', None)
1558 return 0, table
.get_string(sortby
="NAME"), ''
1560 @CLIReadCommand('telemetry collection ls')
1561 def collection_ls(self
) -> Tuple
[int, str, str]:
1563 List all collections
1565 col_delta
= self
.collection_delta()
1567 if col_delta
is not None and len(col_delta
) > 0:
1568 msg
= f
"New collections are available:\n" \
1569 f
"{sorted([c.name for c in col_delta])}\n" \
1570 f
"Run `ceph telemetry on` to opt-in to these collections.\n"
1572 table
= PrettyTable(
1574 'NAME', 'STATUS', 'DESC',
1577 table
.align
['NAME'] = 'l'
1578 table
.align
['STATUS'] = 'l'
1579 table
.align
['DESC'] = 'l'
1580 table
.left_padding_width
= 0
1581 table
.right_padding_width
= 4
1583 for c
in MODULE_COLLECTION
:
1585 opted_in
= self
.is_enabled_collection(name
)
1586 channel_enabled
= getattr(self
, f
"channel_{c['channel']}")
1589 if channel_enabled
and opted_in
:
1590 status
= "REPORTING"
1596 why
+= "NOT OPTED-IN"
1598 if not channel_enabled
:
1599 why
+= f
"{delimiter}CHANNEL {c['channel']} IS OFF"
1601 status
= f
"NOT REPORTING: {why}"
1603 desc
= c
['description']
1612 # add a new line between message and table output
1615 return 0, f
'{msg}{table.get_string(sortby="NAME")}', ''
1617 @CLICommand('telemetry send')
1619 endpoint
: Optional
[List
[EndPoint
]] = None,
1620 license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1622 Send a sample report
1624 if not self
.is_opted_in() and license
!= LICENSE
:
1625 self
.log
.debug(('A telemetry send attempt while opted-out. '
1626 'Asking for license agreement'))
1627 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1628 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1629 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1631 self
.last_report
= self
.compile_report()
1632 return self
.send(self
.last_report
, endpoint
)
1634 @CLIReadCommand('telemetry show')
1635 def show(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1637 Show a sample report of opted-in collections (except for 'device')
1639 if not self
.enabled
:
1640 # if telemetry is off, no report is being sent, hence nothing to show
1641 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1642 'Preview sample reports with `ceph telemetry preview`.'
1645 report
= self
.get_report_locked(channels
=channels
)
1646 self
.format_perf_histogram(report
)
1647 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1649 if self
.channel_device
:
1650 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1652 return 0, report
, ''
1654 @CLIReadCommand('telemetry preview')
1655 def preview(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1657 Preview a sample report of the most recent collections available (except for 'device')
1661 # We use a lock to prevent a scenario where the user wishes to preview
1662 # the report, and at the same time the module hits the interval of
1663 # sending a report with the opted-in collection, which has less data
1664 # than in the preview report.
1665 col_delta
= self
.collection_delta()
1666 with self
.get_report_lock
:
1667 if col_delta
is not None and len(col_delta
) == 0:
1668 # user is already opted-in to the most recent collection
1669 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1672 # there are collections the user is not opted-in to
1673 next_collection
= []
1675 for c
in MODULE_COLLECTION
:
1676 next_collection
.append(c
['name'].name
)
1678 opted_in_collection
= self
.db_collection
1679 self
.db_collection
= next_collection
1680 report
= self
.get_report(channels
=channels
)
1681 self
.db_collection
= opted_in_collection
1683 self
.format_perf_histogram(report
)
1684 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1686 if self
.channel_device
:
1687 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1689 return 0, report
, ''
1691 @CLIReadCommand('telemetry show-device')
1692 def show_device(self
) -> Tuple
[int, str, str]:
1694 Show a sample device report
1696 if not self
.enabled
:
1697 # if telemetry is off, no report is being sent, hence nothing to show
1698 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1699 'Preview sample device reports with `ceph telemetry preview-device`.'
1702 if not self
.channel_device
:
1703 # if device channel is off, device report is not being sent, hence nothing to show
1704 msg
= 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1705 'Preview sample device reports with `ceph telemetry preview-device`.'
1708 return 0, json
.dumps(self
.get_report_locked('device'), indent
=4, sort_keys
=True), ''
1710 @CLIReadCommand('telemetry preview-device')
1711 def preview_device(self
) -> Tuple
[int, str, str]:
1713 Preview a sample device report of the most recent device collection
1717 device_col_delta
= self
.collection_delta(['device'])
1718 with self
.get_report_lock
:
1719 if device_col_delta
is not None and len(device_col_delta
) == 0 and self
.channel_device
:
1720 # user is already opted-in to the most recent device collection,
1721 # and device channel is on, thus `show-device` should be called
1722 msg
= 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1725 # either the user is not opted-in at all, or there are collections
1726 # they are not opted-in to
1727 next_collection
= []
1729 for c
in MODULE_COLLECTION
:
1730 next_collection
.append(c
['name'].name
)
1732 opted_in_collection
= self
.db_collection
1733 self
.db_collection
= next_collection
1734 report
= self
.get_report('device')
1735 self
.db_collection
= opted_in_collection
1737 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1738 return 0, report
, ''
1740 @CLIReadCommand('telemetry show-all')
1741 def show_all(self
) -> Tuple
[int, str, str]:
1743 Show a sample report of all enabled channels (including 'device' channel)
1745 if not self
.enabled
:
1746 # if telemetry is off, no report is being sent, hence nothing to show
1747 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1748 'Preview sample reports with `ceph telemetry preview`.'
1751 if not self
.channel_device
:
1752 # device channel is off, no need to display its report
1753 return 0, json
.dumps(self
.get_report_locked('default'), indent
=4, sort_keys
=True), ''
1755 # telemetry is on and device channel is enabled, show both
1756 return 0, json
.dumps(self
.get_report_locked('all'), indent
=4, sort_keys
=True), ''
1758 @CLIReadCommand('telemetry preview-all')
1759 def preview_all(self
) -> Tuple
[int, str, str]:
1761 Preview a sample report of the most recent collections available of all channels (including 'device')
1765 col_delta
= self
.collection_delta()
1766 with self
.get_report_lock
:
1767 if col_delta
is not None and len(col_delta
) == 0:
1768 # user is already opted-in to the most recent collection
1769 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1772 # there are collections the user is not opted-in to
1773 next_collection
= []
1775 for c
in MODULE_COLLECTION
:
1776 next_collection
.append(c
['name'].name
)
1778 opted_in_collection
= self
.db_collection
1779 self
.db_collection
= next_collection
1780 report
= self
.get_report('all')
1781 self
.db_collection
= opted_in_collection
1783 self
.format_perf_histogram(report
)
1784 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1786 return 0, report
, ''
1788 def get_report_locked(self
,
1789 report_type
: str = 'default',
1790 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1792 A wrapper around get_report to allow for compiling a report of the most recent module collections
1794 with self
.get_report_lock
:
1795 return self
.get_report(report_type
, channels
)
1797 def get_report(self
,
1798 report_type
: str = 'default',
1799 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1800 if report_type
== 'default':
1801 return self
.compile_report(channels
=channels
)
1802 elif report_type
== 'device':
1803 return self
.gather_device_report()
1804 elif report_type
== 'all':
1805 return {'report': self
.compile_report(channels
=channels
),
1806 'device_report': self
.gather_device_report()}
1809 def self_test(self
) -> None:
1810 report
= self
.compile_report()
1811 if len(report
) == 0:
1812 raise RuntimeError('Report is empty')
1814 if 'report_id' not in report
:
1815 raise RuntimeError('report_id not found in report')
1817 def shutdown(self
) -> None:
1821 def refresh_health_checks(self
) -> None:
1823 # TODO do we want to nag also in case the user is not opted-in?
1824 if self
.enabled
and self
.should_nag():
1825 health_checks
['TELEMETRY_CHANGED'] = {
1826 'severity': 'warning',
1827 'summary': 'Telemetry requires re-opt-in',
1829 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
1832 self
.set_health_checks(health_checks
)
1834 def serve(self
) -> None:
1838 self
.log
.debug('Waiting for mgr to warm up')
1844 self
.refresh_health_checks()
1846 if not self
.is_opted_in():
1847 self
.log
.debug('Not sending report until user re-opts-in')
1848 self
.event
.wait(1800)
1850 if not self
.enabled
:
1851 self
.log
.debug('Not sending report until configured to do so')
1852 self
.event
.wait(1800)
1855 now
= int(time
.time())
1856 if not self
.last_upload
or \
1857 (now
- self
.last_upload
) > self
.interval
* 3600:
1858 self
.log
.info('Compiling and sending report to %s',
1862 self
.last_report
= self
.compile_report()
1864 self
.log
.exception('Exception while compiling report:')
1866 self
.send(self
.last_report
)
1868 self
.log
.debug('Interval for sending new report has not expired')
1871 self
.log
.debug('Sleeping for %d seconds', sleep
)
1872 self
.event
.wait(sleep
)
1875 def can_run() -> Tuple
[bool, str]: