2 Telemetry module for ceph-mgr
4 Collect statistics from Ceph cluster and send this back to the Ceph project
17 from datetime
import datetime
, timedelta
18 from prettytable
import PrettyTable
19 from threading
import Event
, Lock
20 from collections
import defaultdict
21 from typing
import cast
, Any
, DefaultDict
, Dict
, List
, Optional
, Tuple
, TypeVar
, TYPE_CHECKING
, Union
23 from mgr_module
import CLICommand
, CLIReadCommand
, MgrModule
, Option
, OptionValue
, ServiceInfoT
26 ALL_CHANNELS
= ['basic', 'ident', 'crash', 'device', 'perf']
28 LICENSE
= 'sharing-1-0'
29 LICENSE_NAME
= 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL
= 'https://cdla.io/sharing-1-0/'
33 # Latest revision of the telemetry report. Bump this each time we make
37 # History of revisions
38 # --------------------
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
48 # - added explicit license acknowledgement to the opt-in process
51 # - added device health metrics (i.e., SMART data, minus serial number)
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
62 class Collection(str, enum
.Enum
):
63 basic_base
= 'basic_base'
64 device_base
= 'device_base'
65 crash_base
= 'crash_base'
66 ident_base
= 'ident_base'
67 perf_perf
= 'perf_perf'
68 basic_mds_metadata
= 'basic_mds_metadata'
69 basic_pool_usage
= 'basic_pool_usage'
70 basic_usage_by_class
= 'basic_usage_by_class'
72 MODULE_COLLECTION
: List
[Dict
] = [
74 "name": Collection
.basic_base
,
75 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
80 "name": Collection
.device_base
,
81 "description": "Information about device health metrics",
86 "name": Collection
.crash_base
,
87 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
92 "name": Collection
.ident_base
,
93 "description": "User-provided identifying information about the cluster",
98 "name": Collection
.perf_perf
,
99 "description": "Information about performance counters of the cluster",
104 "name": Collection
.basic_mds_metadata
,
105 "description": "MDS metadata",
110 "name": Collection
.basic_pool_usage
,
111 "description": "Default pool application and usage statistics",
116 "name": Collection
.basic_usage_by_class
,
117 "description": "Default device class usage statistics",
123 class Module(MgrModule
):
129 "kernel_description",
131 "distro_description",
138 default
='https://telemetry.ceph.com/report'),
139 Option(name
='device_url',
141 default
='https://telemetry.ceph.com/device'),
142 Option(name
='enabled',
145 Option(name
='last_opt_revision',
148 Option(name
='leaderboard',
151 Option(name
='description',
154 Option(name
='contact',
157 Option(name
='organization',
163 Option(name
='interval',
167 Option(name
='channel_basic',
170 desc
='Share basic cluster information (size, version)'),
171 Option(name
='channel_ident',
174 desc
='Share a user-provided description and/or contact email for the cluster'),
175 Option(name
='channel_crash',
178 desc
='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
179 Option(name
='channel_device',
182 desc
=('Share device health metrics '
183 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
184 Option(name
='channel_perf',
187 desc
='Share various performance metrics of a cluster'),
191 def config_keys(self
) -> Dict
[str, OptionValue
]:
192 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
194 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
195 super(Module
, self
).__init
__(*args
, **kwargs
)
198 self
.db_collection
: Optional
[List
[str]] = None
199 self
.last_opted_in_ceph_version
: Optional
[int] = None
200 self
.last_opted_out_ceph_version
: Optional
[int] = None
201 self
.last_upload
: Optional
[int] = None
202 self
.last_report
: Dict
[str, Any
] = dict()
203 self
.report_id
: Optional
[str] = None
204 self
.salt
: Optional
[str] = None
205 self
.get_report_lock
= Lock()
206 self
.config_update_module_option()
207 # for mypy which does not run the code
212 self
.last_opt_revision
= 0
213 self
.leaderboard
= ''
216 self
.channel_basic
= True
217 self
.channel_ident
= False
218 self
.channel_crash
= True
219 self
.channel_device
= True
220 self
.channel_perf
= False
221 self
.db_collection
= ['basic_base', 'device_base']
222 self
.last_opted_in_ceph_version
= 17
223 self
.last_opted_out_ceph_version
= 0
225 def config_update_module_option(self
) -> None:
226 for opt
in self
.MODULE_OPTIONS
:
229 self
.get_module_option(opt
['name']))
230 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
232 def config_notify(self
) -> None:
233 self
.config_update_module_option()
234 # wake up serve() thread
237 def load(self
) -> None:
238 last_upload
= self
.get_store('last_upload', None)
239 if last_upload
is None:
240 self
.last_upload
= None
242 self
.last_upload
= int(last_upload
)
244 report_id
= self
.get_store('report_id', None)
245 if report_id
is None:
246 self
.report_id
= str(uuid
.uuid4())
247 self
.set_store('report_id', self
.report_id
)
249 self
.report_id
= report_id
251 salt
= self
.get_store('salt', None)
253 self
.salt
= str(uuid
.uuid4())
254 self
.set_store('salt', self
.salt
)
258 self
.init_collection()
260 last_opted_in_ceph_version
= self
.get_store('last_opted_in_ceph_version', None)
261 if last_opted_in_ceph_version
is None:
262 self
.last_opted_in_ceph_version
= None
264 self
.last_opted_in_ceph_version
= int(last_opted_in_ceph_version
)
266 last_opted_out_ceph_version
= self
.get_store('last_opted_out_ceph_version', None)
267 if last_opted_out_ceph_version
is None:
268 self
.last_opted_out_ceph_version
= None
270 self
.last_opted_out_ceph_version
= int(last_opted_out_ceph_version
)
272 def gather_osd_metadata(self
,
273 osd_map
: Dict
[str, List
[Dict
[str, int]]]) -> Dict
[str, Dict
[str, int]]:
274 keys
= ["osd_objectstore", "rotational"]
275 keys
+= self
.metadata_keys
277 metadata
: Dict
[str, Dict
[str, int]] = dict()
279 metadata
[key
] = defaultdict(int)
281 for osd
in osd_map
['osds']:
282 res
= self
.get_metadata('osd', str(osd
['osd']))
284 self
.log
.debug('Could not get metadata for osd.%s' % str(osd
['osd']))
286 for k
, v
in res
.items():
294 def gather_mon_metadata(self
,
295 mon_map
: Dict
[str, List
[Dict
[str, str]]]) -> Dict
[str, Dict
[str, int]]:
297 keys
+= self
.metadata_keys
299 metadata
: Dict
[str, Dict
[str, int]] = dict()
301 metadata
[key
] = defaultdict(int)
303 for mon
in mon_map
['mons']:
304 res
= self
.get_metadata('mon', mon
['name'])
306 self
.log
.debug('Could not get metadata for mon.%s' % (mon
['name']))
308 for k
, v
in res
.items():
316 def gather_mds_metadata(self
) -> Dict
[str, Dict
[str, int]]:
317 metadata
: Dict
[str, Dict
[str, int]] = dict()
319 res
= self
.get('mds_metadata') # metadata of *all* mds daemons
320 if res
is None or not res
:
321 self
.log
.debug('Could not get metadata for mds daemons')
325 keys
+= self
.metadata_keys
328 metadata
[key
] = defaultdict(int)
330 for mds
in res
.values():
331 for k
, v
in mds
.items():
339 def gather_crush_info(self
) -> Dict
[str, Union
[int,
344 osdmap
= self
.get_osdmap()
345 crush_raw
= osdmap
.get_crush()
346 crush
= crush_raw
.dump()
348 BucketKeyT
= TypeVar('BucketKeyT', int, str)
350 def inc(d
: Dict
[BucketKeyT
, int], k
: BucketKeyT
) -> None:
356 device_classes
: Dict
[str, int] = {}
357 for dev
in crush
['devices']:
358 inc(device_classes
, dev
.get('class', ''))
360 bucket_algs
: Dict
[str, int] = {}
361 bucket_types
: Dict
[str, int] = {}
362 bucket_sizes
: Dict
[int, int] = {}
363 for bucket
in crush
['buckets']:
364 if '~' in bucket
['name']: # ignore shadow buckets
366 inc(bucket_algs
, bucket
['alg'])
367 inc(bucket_types
, bucket
['type_id'])
368 inc(bucket_sizes
, len(bucket
['items']))
371 'num_devices': len(crush
['devices']),
372 'num_types': len(crush
['types']),
373 'num_buckets': len(crush
['buckets']),
374 'num_rules': len(crush
['rules']),
375 'device_classes': list(device_classes
.values()),
376 'tunables': crush
['tunables'],
377 'compat_weight_set': '-1' in crush
['choose_args'],
378 'num_weight_sets': len(crush
['choose_args']),
379 'bucket_algs': bucket_algs
,
380 'bucket_sizes': bucket_sizes
,
381 'bucket_types': bucket_types
,
384 def gather_configs(self
) -> Dict
[str, List
[str]]:
385 # cluster config options
387 r
, outb
, outs
= self
.mon_command({
388 'prefix': 'config dump',
394 dump
= json
.loads(outb
)
395 except json
.decoder
.JSONDecodeError
:
398 name
= opt
.get('name')
401 # daemon-reported options (which may include ceph.conf)
403 ls
= self
.get("modified_config_options")
404 for opt
in ls
.get('options', {}):
407 'cluster_changed': sorted(list(cluster
)),
408 'active_changed': sorted(list(active
)),
411 def anonymize_entity_name(self
, entity_name
:str) -> str:
412 if '.' not in entity_name
:
413 self
.log
.debug(f
"Cannot split entity name ({entity_name}), no '.' is found")
416 (etype
, eid
) = entity_name
.split('.', 1)
419 if self
.salt
is not None:
421 # avoid asserting that salt exists
423 # do not set self.salt to a temp value
424 salt
= f
"no_salt_found_{NO_SALT_CNT}"
426 self
.log
.debug(f
"No salt found, created a temp one: {salt}")
427 m
.update(salt
.encode('utf-8'))
428 m
.update(eid
.encode('utf-8'))
429 m
.update(salt
.encode('utf-8'))
431 return etype
+ '.' + m
.hexdigest()
433 def get_heap_stats(self
) -> Dict
[str, dict]:
434 # Initialize result dict
435 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(int))
437 # Get list of osd ids from the metadata
438 osd_metadata
= self
.get('osd_metadata')
440 # Grab output from the "osd.x heap stats" command
441 for osd_id
in osd_metadata
:
447 r
, outb
, outs
= self
.osd_command(cmd_dict
)
449 self
.log
.debug("Invalid command dictionary.")
452 if 'tcmalloc heap stats' in outs
:
453 values
= [int(i
) for i
in outs
.split() if i
.isdigit()]
454 # `categories` must be ordered this way for the correct output to be parsed
455 categories
= ['use_by_application',
456 'page_heap_freelist',
457 'central_cache_freelist',
458 'transfer_cache_freelist',
459 'thread_cache_freelists',
461 'actual_memory_used',
463 'virtual_address_space_used',
465 'thread_heaps_in_use',
466 'tcmalloc_page_size']
467 if len(values
) != len(categories
):
468 self
.log
.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
469 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id
, len(values
), values
, len(categories
), categories
, outs
))
471 osd
= 'osd.' + str(osd_id
)
472 result
[osd
] = dict(zip(categories
, values
))
474 self
.log
.debug('No heap stats available on osd.{}: {}'.format(osd_id
, outs
))
479 def get_mempool(self
, mode
: str = 'separated') -> Dict
[str, dict]:
480 # Initialize result dict
481 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(int))
483 # Get list of osd ids from the metadata
484 osd_metadata
= self
.get('osd_metadata')
486 # Grab output from the "osd.x dump_mempools" command
487 for osd_id
in osd_metadata
:
489 'prefix': 'dump_mempools',
493 r
, outb
, outs
= self
.osd_command(cmd_dict
)
495 self
.log
.debug("Invalid command dictionary.")
499 # This is where the mempool will land.
500 dump
= json
.loads(outb
)
501 if mode
== 'separated':
502 result
["osd." + str(osd_id
)] = dump
['mempool']['by_pool']
503 elif mode
== 'aggregated':
504 for mem_type
in dump
['mempool']['by_pool']:
505 result
[mem_type
]['bytes'] += dump
['mempool']['by_pool'][mem_type
]['bytes']
506 result
[mem_type
]['items'] += dump
['mempool']['by_pool'][mem_type
]['items']
508 self
.log
.debug("Incorrect mode specified in get_mempool")
509 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
510 self
.log
.debug("Error caught on osd.{}: {}".format(osd_id
, e
))
515 def get_osd_histograms(self
, mode
: str = 'separated') -> List
[Dict
[str, dict]]:
516 # Initialize result dict
517 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
521 lambda: defaultdict(int))))))
523 # Get list of osd ids from the metadata
524 osd_metadata
= self
.get('osd_metadata')
526 # Grab output from the "osd.x perf histogram dump" command
527 for osd_id
in osd_metadata
:
529 'prefix': 'perf histogram dump',
533 r
, outb
, outs
= self
.osd_command(cmd_dict
)
534 # Check for invalid calls
536 self
.log
.debug("Invalid command dictionary.")
540 # This is where the histograms will land if there are any.
541 dump
= json
.loads(outb
)
543 for histogram
in dump
['osd']:
544 # Log axis information. There are two axes, each represented
545 # as a dictionary. Both dictionaries are contained inside a
546 # list called 'axes'.
548 for axis
in dump
['osd'][histogram
]['axes']:
550 # This is the dict that contains information for an individual
551 # axis. It will be appended to the 'axes' list at the end.
552 axis_dict
: Dict
[str, Any
] = defaultdict()
554 # Collecting information for buckets, min, name, etc.
555 axis_dict
['buckets'] = axis
['buckets']
556 axis_dict
['min'] = axis
['min']
557 axis_dict
['name'] = axis
['name']
558 axis_dict
['quant_size'] = axis
['quant_size']
559 axis_dict
['scale_type'] = axis
['scale_type']
561 # Collecting ranges; placing them in lists to
562 # improve readability later on.
564 for _range
in axis
['ranges']:
565 _max
, _min
= None, None
570 ranges
.append([_min
, _max
])
571 axis_dict
['ranges'] = ranges
573 # Now that 'axis_dict' contains all the appropriate
574 # information for the current axis, append it to the 'axes' list.
575 # There will end up being two axes in the 'axes' list, since the
577 axes
.append(axis_dict
)
579 # Add the 'axes' list, containing both axes, to result.
580 # At this point, you will see that the name of the key is the string
581 # form of our axes list (str(axes)). This is there so that histograms
582 # with different axis configs will not be combined.
583 # These key names are later dropped when only the values are returned.
584 result
[str(axes
)][histogram
]['axes'] = axes
586 # Collect current values and make sure they are in
589 for value_list
in dump
['osd'][histogram
]['values']:
590 values
.append([int(v
) for v
in value_list
])
592 if mode
== 'separated':
593 if 'osds' not in result
[str(axes
)][histogram
]:
594 result
[str(axes
)][histogram
]['osds'] = []
595 result
[str(axes
)][histogram
]['osds'].append({'osd_id': int(osd_id
), 'values': values
})
597 elif mode
== 'aggregated':
598 # Aggregate values. If 'values' have already been initialized,
600 if 'values' in result
[str(axes
)][histogram
]:
601 for i
in range (0, len(values
)):
602 for j
in range (0, len(values
[i
])):
603 values
[i
][j
] += result
[str(axes
)][histogram
]['values'][i
][j
]
605 # Add the values to result.
606 result
[str(axes
)][histogram
]['values'] = values
608 # Update num_combined_osds
609 if 'num_combined_osds' not in result
[str(axes
)][histogram
]:
610 result
[str(axes
)][histogram
]['num_combined_osds'] = 1
612 result
[str(axes
)][histogram
]['num_combined_osds'] += 1
614 self
.log
.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode
))
617 # Sometimes, json errors occur if you give it an empty string.
618 # I am also putting in a catch for a KeyError since it could
619 # happen where the code is assuming that a key exists in the
620 # schema when it doesn't. In either case, we'll handle that
621 # by continuing and collecting what we can from other osds.
622 except (json
.decoder
.JSONDecodeError
, KeyError) as e
:
623 self
.log
.debug("Error caught on osd.{}: {}".format(osd_id
, e
))
626 return list(result
.values())
628 def get_io_rate(self
) -> dict:
629 return self
.get('io_rate')
631 def get_stats_per_pool(self
) -> dict:
632 result
= self
.get('pg_dump')['pool_stats']
634 # collect application metadata from osd_map
635 osd_map
= self
.get('osd_map')
636 application_metadata
= {pool
['pool']: pool
['application_metadata'] for pool
in osd_map
['pools']}
638 # add application to each pool from pg_dump
640 pool
['application'] = []
641 # Only include default applications
642 for application
in application_metadata
[pool
['poolid']]:
643 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
644 pool
['application'].append(application
)
648 def get_stats_per_pg(self
) -> dict:
649 return self
.get('pg_dump')['pg_stats']
651 def get_rocksdb_stats(self
) -> Dict
[str, str]:
653 result
: Dict
[str, str] = defaultdict()
654 version
= self
.get_rocksdb_version()
657 result
['version'] = version
661 def gather_crashinfo(self
) -> List
[Dict
[str, str]]:
662 crashlist
: List
[Dict
[str, str]] = list()
663 errno
, crashids
, err
= self
.remote('crash', 'ls')
666 for crashid
in crashids
.split():
667 errno
, crashinfo
, err
= self
.remote('crash', 'do_info', crashid
)
670 c
= json
.loads(crashinfo
)
673 del c
['utsname_hostname']
675 # entity_name might have more than one '.', beware
676 (etype
, eid
) = c
.get('entity_name', '').split('.', 1)
679 m
.update(self
.salt
.encode('utf-8'))
680 m
.update(eid
.encode('utf-8'))
681 m
.update(self
.salt
.encode('utf-8'))
682 c
['entity_name'] = etype
+ '.' + m
.hexdigest()
684 # redact final line of python tracebacks, as the exception
685 # payload may contain identifying information
686 if 'mgr_module' in c
and 'backtrace' in c
:
687 # backtrace might be empty
688 if len(c
['backtrace']) > 0:
689 c
['backtrace'][-1] = '<redacted>'
694 def gather_perf_counters(self
, mode
: str = 'separated') -> Dict
[str, dict]:
695 # Extract perf counter data with get_all_perf_counters(), a method
696 # from mgr/mgr_module.py. This method returns a nested dictionary that
697 # looks a lot like perf schema, except with some additional fields.
699 # Example of output, a snapshot of a mon daemon:
701 # "bluestore.kv_flush_lat": {
703 # "description": "Average kv_thread flush latency",
711 all_perf_counters
= self
.get_all_perf_counters()
713 # Initialize 'result' dict
714 result
: Dict
[str, dict] = defaultdict(lambda: defaultdict(
715 lambda: defaultdict(lambda: defaultdict(int))))
718 anonymized_daemon_dict
= {}
720 for daemon
, all_perf_counters_by_daemon
in all_perf_counters
.items():
721 daemon_type
= daemon
[0:3] # i.e. 'mds', 'osd', 'rgw'
723 if mode
== 'separated':
724 # anonymize individual daemon names except osds
725 if (daemon_type
!= 'osd'):
726 anonymized_daemon
= self
.anonymize_entity_name(daemon
)
727 anonymized_daemon_dict
[anonymized_daemon
] = daemon
728 daemon
= anonymized_daemon
730 # Calculate num combined daemon types if in aggregated mode
731 if mode
== 'aggregated':
732 if 'num_combined_daemons' not in result
[daemon_type
]:
733 result
[daemon_type
]['num_combined_daemons'] = 1
735 result
[daemon_type
]['num_combined_daemons'] += 1
737 for collection
in all_perf_counters_by_daemon
:
738 # Split the collection to avoid redundancy in final report; i.e.:
739 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
740 # bluestore: kv_flush_lat, kv_final_lat
741 col_0
, col_1
= collection
.split('.')
743 # Debug log for empty keys. This initially was a problem for prioritycache
744 # perf counters, where the col_0 was empty for certain mon counters:
746 # "mon.a": { instead of "mon.a": {
747 # "": { "prioritycache": {
748 # "cache_bytes": {...}, "cache_bytes": {...},
750 # This log is here to detect any future instances of a similar issue.
751 if (daemon
== "") or (col_0
== "") or (col_1
== ""):
752 self
.log
.debug("Instance of an empty key: {}{}".format(daemon
, collection
))
754 if mode
== 'separated':
755 # Add value to result
756 result
[daemon
][col_0
][col_1
]['value'] = \
757 all_perf_counters_by_daemon
[collection
]['value']
759 # Check that 'count' exists, as not all counters have a count field.
760 if 'count' in all_perf_counters_by_daemon
[collection
]:
761 result
[daemon
][col_0
][col_1
]['count'] = \
762 all_perf_counters_by_daemon
[collection
]['count']
763 elif mode
== 'aggregated':
764 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
765 # has a uniquely-named collection that starts off identically (i.e.
766 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
767 # This bit of code combines these unique counters all under one rgw instance.
768 # Without this check, the schema would remain separeted out in the final report.
769 if col_0
[0:11] == "objecter-0x":
770 col_0
= "objecter-0x"
772 # Check that the value can be incremented. In some cases,
773 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
774 # In those cases, the value is a dictionary, and not a number.
775 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
776 if isinstance(all_perf_counters_by_daemon
[collection
]['value'], numbers
.Number
):
777 result
[daemon_type
][col_0
][col_1
]['value'] += \
778 all_perf_counters_by_daemon
[collection
]['value']
780 # Check that 'count' exists, as not all counters have a count field.
781 if 'count' in all_perf_counters_by_daemon
[collection
]:
782 result
[daemon_type
][col_0
][col_1
]['count'] += \
783 all_perf_counters_by_daemon
[collection
]['count']
785 self
.log
.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode
))
788 if mode
== 'separated':
789 # for debugging purposes only, this data is never reported
790 self
.log
.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict
))
794 def get_active_channels(self
) -> List
[str]:
796 if self
.channel_basic
:
798 if self
.channel_crash
:
800 if self
.channel_device
:
802 if self
.channel_ident
:
804 if self
.channel_perf
:
808 def gather_device_report(self
) -> Dict
[str, Dict
[str, Dict
[str, str]]]:
810 time_format
= self
.remote('devicehealth', 'get_time_format')
811 except Exception as e
:
812 self
.log
.debug('Unable to format time: {}'.format(e
))
814 cutoff
= datetime
.utcnow() - timedelta(hours
=self
.interval
* 2)
815 min_sample
= cutoff
.strftime(time_format
)
817 devices
= self
.get('devices')['devices']
819 self
.log
.debug('Unable to get device info from the mgr.')
822 # anon-host-id -> anon-devid -> { timestamp -> record }
823 res
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
827 # this is a map of stamp -> {device info}
828 m
= self
.remote('devicehealth', 'get_recent_device_metrics',
830 except Exception as e
:
831 self
.log
.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid
, e
))
836 host
= d
['location'][0]['host']
837 except (KeyError, IndexError) as e
:
838 self
.log
.debug('Unable to get host from device with id "{}": {}'.format(devid
, e
))
840 anon_host
= self
.get_store('host-id/%s' % host
)
842 anon_host
= str(uuid
.uuid1())
843 self
.set_store('host-id/%s' % host
, anon_host
)
845 for dev
, rep
in m
.items():
846 rep
['host_id'] = anon_host
847 if serial
is None and 'serial_number' in rep
:
848 serial
= rep
['serial_number']
850 # anonymize device id
851 anon_devid
= self
.get_store('devid-id/%s' % devid
)
853 # ideally devid is 'vendor_model_serial',
854 # but can also be 'model_serial', 'serial'
856 anon_devid
= f
"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
858 anon_devid
= str(uuid
.uuid1())
859 self
.set_store('devid-id/%s' % devid
, anon_devid
)
860 self
.log
.info('devid %s / %s, host %s / %s' % (devid
, anon_devid
,
863 # anonymize the smartctl report itself
865 m_str
= json
.dumps(m
)
866 m
= json
.loads(m_str
.replace(serial
, 'deleted'))
868 if anon_host
not in res
:
870 res
[anon_host
][anon_devid
] = m
873 def get_latest(self
, daemon_type
: str, daemon_name
: str, stat
: str) -> int:
874 data
= self
.get_counter(daemon_type
, daemon_name
, stat
)[stat
]
880 def compile_report(self
, channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
882 channels
= self
.get_active_channels()
884 'leaderboard': self
.leaderboard
,
886 'report_timestamp': datetime
.utcnow().isoformat(),
887 'report_id': self
.report_id
,
888 'channels': channels
,
889 'channels_available': ALL_CHANNELS
,
891 'collections_available': [c
['name'].name
for c
in MODULE_COLLECTION
],
892 'collections_opted_in': [c
['name'].name
for c
in MODULE_COLLECTION
if self
.is_enabled_collection(c
['name'])],
895 if 'ident' in channels
:
896 for option
in ['description', 'contact', 'organization']:
897 report
[option
] = getattr(self
, option
)
899 if 'basic' in channels
:
900 mon_map
= self
.get('mon_map')
901 osd_map
= self
.get('osd_map')
902 service_map
= self
.get('service_map')
903 fs_map
= self
.get('fs_map')
905 df_pools
= {pool
['id']: pool
for pool
in df
['pools']}
907 report
['created'] = mon_map
['created']
914 for mon
in mon_map
['mons']:
915 for a
in mon
['public_addrs']['addrvec']:
916 if a
['type'] == 'v2':
918 elif a
['type'] == 'v1':
920 if a
['addr'].startswith('['):
925 'count': len(mon_map
['mons']),
926 'features': mon_map
['features'],
927 'min_mon_release': mon_map
['min_mon_release'],
928 'v1_addr_mons': v1_mons
,
929 'v2_addr_mons': v2_mons
,
930 'ipv4_addr_mons': ipv4_mons
,
931 'ipv6_addr_mons': ipv6_mons
,
934 report
['config'] = self
.gather_configs()
939 rbd_num_images_by_pool
= []
940 rbd_mirroring_by_pool
= []
942 report
['pools'] = list()
943 for pool
in osd_map
['pools']:
944 num_pg
+= pool
['pg_num']
946 if pool
['erasure_code_profile']:
947 orig
= osd_map
['erasure_code_profiles'].get(
948 pool
['erasure_code_profile'], {})
950 k
: orig
[k
] for k
in orig
.keys()
951 if k
in ['k', 'm', 'plugin', 'technique',
952 'crush-failure-domain', 'l']
955 'pool': pool
['pool'],
956 'pg_num': pool
['pg_num'],
957 'pgp_num': pool
['pg_placement_num'],
958 'size': pool
['size'],
959 'min_size': pool
['min_size'],
960 'pg_autoscale_mode': pool
['pg_autoscale_mode'],
961 'target_max_bytes': pool
['target_max_bytes'],
962 'target_max_objects': pool
['target_max_objects'],
963 'type': ['', 'replicated', '', 'erasure'][pool
['type']],
964 'erasure_code_profile': ec_profile
,
965 'cache_mode': pool
['cache_mode'],
968 # basic_pool_usage collection
969 if self
.is_enabled_collection(Collection
.basic_pool_usage
):
970 pool_data
['application'] = []
971 for application
in pool
['application_metadata']:
972 # Only include default applications
973 if application
in ['cephfs', 'mgr', 'rbd', 'rgw']:
974 pool_data
['application'].append(application
)
975 pool_stats
= df_pools
[pool
['pool']]['stats']
976 pool_data
['stats'] = { # filter out kb_used
977 'avail_raw': pool_stats
['avail_raw'],
978 'bytes_used': pool_stats
['bytes_used'],
979 'compress_bytes_used': pool_stats
['compress_bytes_used'],
980 'compress_under_bytes': pool_stats
['compress_under_bytes'],
981 'data_bytes_used': pool_stats
['data_bytes_used'],
982 'dirty': pool_stats
['dirty'],
983 'max_avail': pool_stats
['max_avail'],
984 'objects': pool_stats
['objects'],
985 'omap_bytes_used': pool_stats
['omap_bytes_used'],
986 'percent_used': pool_stats
['percent_used'],
987 'quota_bytes': pool_stats
['quota_bytes'],
988 'quota_objects': pool_stats
['quota_objects'],
989 'rd': pool_stats
['rd'],
990 'rd_bytes': pool_stats
['rd_bytes'],
991 'stored': pool_stats
['stored'],
992 'stored_data': pool_stats
['stored_data'],
993 'stored_omap': pool_stats
['stored_omap'],
994 'stored_raw': pool_stats
['stored_raw'],
995 'wr': pool_stats
['wr'],
996 'wr_bytes': pool_stats
['wr_bytes']
999 cast(List
[Dict
[str, Any
]], report
['pools']).append(pool_data
)
1000 if 'rbd' in pool
['application_metadata']:
1002 ioctx
= self
.rados
.open_ioctx(pool
['pool_name'])
1003 rbd_num_images_by_pool
.append(
1004 sum(1 for _
in rbd
.RBD().list2(ioctx
)))
1005 rbd_mirroring_by_pool
.append(
1006 rbd
.RBD().mirror_mode_get(ioctx
) != rbd
.RBD_MIRROR_MODE_DISABLED
)
1008 'num_pools': rbd_num_pools
,
1009 'num_images_by_pool': rbd_num_images_by_pool
,
1010 'mirroring_by_pool': rbd_mirroring_by_pool
}
1013 cluster_network
= False
1014 for osd
in osd_map
['osds']:
1015 if osd
['up'] and not cluster_network
:
1016 front_ip
= osd
['public_addrs']['addrvec'][0]['addr'].split(':')[0]
1017 back_ip
= osd
['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
1018 if front_ip
!= back_ip
:
1019 cluster_network
= True
1021 'count': len(osd_map
['osds']),
1022 'require_osd_release': osd_map
['require_osd_release'],
1023 'require_min_compat_client': osd_map
['require_min_compat_client'],
1024 'cluster_network': cluster_network
,
1028 report
['crush'] = self
.gather_crush_info()
1032 'count': len(fs_map
['filesystems']),
1033 'feature_flags': fs_map
['feature_flags'],
1034 'num_standby_mds': len(fs_map
['standbys']),
1037 num_mds
= len(fs_map
['standbys'])
1038 for fsm
in fs_map
['filesystems']:
1048 for gid
, mds
in fs
['info'].items():
1049 num_sessions
+= self
.get_latest('mds', mds
['name'],
1050 'mds_sessions.session_count')
1051 cached_ino
+= self
.get_latest('mds', mds
['name'],
1053 cached_dn
+= self
.get_latest('mds', mds
['name'],
1055 cached_cap
+= self
.get_latest('mds', mds
['name'],
1057 subtrees
+= self
.get_latest('mds', mds
['name'],
1059 if mds
['rank'] == 0:
1060 rfiles
= self
.get_latest('mds', mds
['name'],
1062 rbytes
= self
.get_latest('mds', mds
['name'],
1064 rsnaps
= self
.get_latest('mds', mds
['name'],
1066 report
['fs']['filesystems'].append({ # type: ignore
1067 'max_mds': fs
['max_mds'],
1068 'ever_allowed_features': fs
['ever_allowed_features'],
1069 'explicitly_allowed_features': fs
['explicitly_allowed_features'],
1070 'num_in': len(fs
['in']),
1071 'num_up': len(fs
['up']),
1072 'num_standby_replay': len(
1073 [mds
for gid
, mds
in fs
['info'].items()
1074 if mds
['state'] == 'up:standby-replay']),
1075 'num_mds': len(fs
['info']),
1076 'num_sessions': num_sessions
,
1077 'cached_inos': cached_ino
,
1078 'cached_dns': cached_dn
,
1079 'cached_caps': cached_cap
,
1080 'cached_subtrees': subtrees
,
1081 'balancer_enabled': len(fs
['balancer']) > 0,
1082 'num_data_pools': len(fs
['data_pools']),
1083 'standby_count_wanted': fs
['standby_count_wanted'],
1084 'approx_ctime': fs
['created'][0:7],
1089 num_mds
+= len(fs
['info'])
1090 report
['fs']['total_num_mds'] = num_mds
# type: ignore
1093 report
['metadata'] = dict(osd
=self
.gather_osd_metadata(osd_map
),
1094 mon
=self
.gather_mon_metadata(mon_map
))
1096 if self
.is_enabled_collection(Collection
.basic_mds_metadata
):
1097 report
['metadata']['mds'] = self
.gather_mds_metadata() # type: ignore
1100 servers
= self
.list_servers()
1101 self
.log
.debug('servers %s' % servers
)
1103 'num': len([h
for h
in servers
if h
['hostname']]),
1105 for t
in ['mon', 'mds', 'osd', 'mgr']:
1106 nr_services
= sum(1 for host
in servers
if
1107 any(service
for service
in cast(List
[ServiceInfoT
],
1109 if service
['type'] == t
))
1110 hosts
['num_with_' + t
] = nr_services
1111 report
['hosts'] = hosts
1114 'pools': len(df
['pools']),
1116 'total_used_bytes': df
['stats']['total_used_bytes'],
1117 'total_bytes': df
['stats']['total_bytes'],
1118 'total_avail_bytes': df
['stats']['total_avail_bytes']
1120 # basic_usage_by_class collection
1121 if self
.is_enabled_collection(Collection
.basic_usage_by_class
):
1122 report
['usage']['stats_by_class'] = {} # type: ignore
1123 for device_class
in df
['stats_by_class']:
1124 if device_class
in ['hdd', 'ssd', 'nvme']:
1125 report
['usage']['stats_by_class'][device_class
] = df
['stats_by_class'][device_class
] # type: ignore
1127 services
: DefaultDict
[str, int] = defaultdict(int)
1128 for key
, value
in service_map
['services'].items():
1136 d
= value
.get('daemons', dict())
1137 for k
, v
in d
.items():
1138 if k
== 'summary' and v
:
1140 elif isinstance(v
, dict) and 'metadata' in v
:
1142 zones
.add(v
['metadata']['zone_id'])
1143 zonegroups
.add(v
['metadata']['zonegroup_id'])
1144 frontends
.add(v
['metadata']['frontend_type#0'])
1146 # we could actually iterate over all the keys of
1147 # the dict and check for how many frontends there
1148 # are, but it is unlikely that one would be running
1149 # more than 2 supported ones
1150 f2
= v
['metadata'].get('frontend_type#1', None)
1154 rgw
['count'] = count
1155 rgw
['zones'] = len(zones
)
1156 rgw
['zonegroups'] = len(zonegroups
)
1157 rgw
['frontends'] = list(frontends
) # sets aren't json-serializable
1159 report
['services'] = services
1162 report
['balancer'] = self
.remote('balancer', 'gather_telemetry')
1164 report
['balancer'] = {
1168 if 'crash' in channels
:
1169 report
['crashes'] = self
.gather_crashinfo()
1171 if 'perf' in channels
:
1172 if self
.is_enabled_collection(Collection
.perf_perf
):
1173 report
['perf_counters'] = self
.gather_perf_counters('separated')
1174 report
['stats_per_pool'] = self
.get_stats_per_pool()
1175 report
['stats_per_pg'] = self
.get_stats_per_pg()
1176 report
['io_rate'] = self
.get_io_rate()
1177 report
['osd_perf_histograms'] = self
.get_osd_histograms('separated')
1178 report
['mempool'] = self
.get_mempool('separated')
1179 report
['heap_stats'] = self
.get_heap_stats()
1180 report
['rocksdb_stats'] = self
.get_rocksdb_stats()
1182 # NOTE: We do not include the 'device' channel in this report; it is
1183 # sent to a different endpoint.
1187 def _try_post(self
, what
: str, url
: str, report
: Dict
[str, Dict
[str, str]]) -> Optional
[str]:
1188 self
.log
.info('Sending %s to: %s' % (what
, url
))
1191 self
.log
.info('Send using HTTP(S) proxy: %s', self
.proxy
)
1192 proxies
['http'] = self
.proxy
1193 proxies
['https'] = self
.proxy
1195 resp
= requests
.put(url
=url
, json
=report
, proxies
=proxies
)
1196 resp
.raise_for_status()
1197 except Exception as e
:
1198 fail_reason
= 'Failed to send %s to %s: %s' % (what
, url
, str(e
))
1199 self
.log
.error(fail_reason
)
1203 class EndPoint(enum
.Enum
):
1207 def collection_delta(self
, channels
: Optional
[List
[str]] = None) -> Optional
[List
[Collection
]]:
1209 Find collections that are available in the module, but are not in the db
1211 if self
.db_collection
is None:
1215 channels
= ALL_CHANNELS
1218 if ch
not in ALL_CHANNELS
:
1219 self
.log
.debug(f
"invalid channel name: {ch}")
1222 new_collection
: List
[Collection
] = []
1224 for c
in MODULE_COLLECTION
:
1225 if c
['name'].name
not in self
.db_collection
:
1226 if c
['channel'] in channels
:
1227 new_collection
.append(c
['name'])
1229 return new_collection
1231 def is_major_upgrade(self
) -> bool:
1233 Returns True only if the user last opted-in to an older major
1235 if self
.last_opted_in_ceph_version
is None or self
.last_opted_in_ceph_version
== 0:
1236 # we do not know what Ceph version was when the user last opted-in,
1237 # thus we do not wish to nag in case of a major upgrade
1240 mon_map
= self
.get('mon_map')
1241 mon_min
= mon_map
.get("min_mon_release", 0)
1243 if mon_min
- self
.last_opted_in_ceph_version
> 0:
1244 self
.log
.debug(f
"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1249 def is_opted_in(self
) -> bool:
1250 # If len is 0 it means that the user is either opted-out (never
1251 # opted-in, or invoked `telemetry off`), or they upgraded from a
1252 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1253 # regardless, hence is considered as opted-out
1254 if self
.db_collection
is None:
1256 return len(self
.db_collection
) > 0
1258 def should_nag(self
) -> bool:
1259 # Find delta between opted-in collections and module collections;
1260 # nag only if module has a collection which is not in db, and nag == True.
1262 # We currently do not nag if the user is opted-out (or never opted-in).
1263 # If we wish to do this in the future, we need to have a tri-mode state
1264 # (opted in, opted out, no action yet), and it needs to be guarded by a
1265 # config option (so that nagging can be turned off via config).
1266 # We also need to add a last_opted_out_ceph_version variable, for the
1267 # major upgrade check.
1269 # check if there are collections the user is not opt-in to
1270 # that we should nag about
1271 if self
.db_collection
is not None:
1272 for c
in MODULE_COLLECTION
:
1273 if c
['name'].name
not in self
.db_collection
:
1274 if c
['nag'] == True:
1275 self
.log
.debug(f
"The collection: {c['name']} is not reported")
1278 # user might be opted-in to the most recent collection, or there is no
1279 # new collection which requires nagging about; thus nag in case it's a
1280 # major upgrade and there are new collections
1281 # (which their own nag == False):
1282 new_collections
= False
1283 col_delta
= self
.collection_delta()
1284 if col_delta
is not None and len(col_delta
) > 0:
1285 new_collections
= True
1287 return self
.is_major_upgrade() and new_collections
1289 def init_collection(self
) -> None:
1290 # We fetch from db the collections the user had already opted-in to.
1291 # During the transition the results will be empty, but the user might
1292 # be opted-in to an older version (e.g. revision = 3)
1294 collection
= self
.get_store('collection')
1296 if collection
is not None:
1297 self
.db_collection
= json
.loads(collection
)
1299 if self
.db_collection
is None:
1300 # happens once on upgrade
1301 if not self
.enabled
:
1302 # user is not opted-in
1303 self
.set_store('collection', json
.dumps([]))
1304 self
.log
.debug("user is not opted-in")
1306 # user is opted-in, verify the revision:
1307 if self
.last_opt_revision
== REVISION
:
1308 self
.log
.debug(f
"telemetry revision is {REVISION}")
1309 base_collection
= [Collection
.basic_base
.name
, Collection
.device_base
.name
, Collection
.crash_base
.name
, Collection
.ident_base
.name
]
1310 self
.set_store('collection', json
.dumps(base_collection
))
1312 # user is opted-in to an older version, meaning they need
1313 # to re-opt in regardless
1314 self
.set_store('collection', json
.dumps([]))
1315 self
.log
.debug(f
"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1317 # reload collection after setting
1318 collection
= self
.get_store('collection')
1319 if collection
is not None:
1320 self
.db_collection
= json
.loads(collection
)
1322 raise RuntimeError('collection is None after initial setting')
1324 # user has already upgraded
1325 self
.log
.debug(f
"user has upgraded already: collection: {self.db_collection}")
1327 def is_enabled_collection(self
, collection
: Collection
) -> bool:
1328 if self
.db_collection
is None:
1330 return collection
.name
in self
.db_collection
1332 def opt_in_all_collections(self
) -> None:
1334 Opt-in to all collections; Update db with the currently available collections in the module
1336 if self
.db_collection
is None:
1337 raise RuntimeError('db_collection is None after initial setting')
1339 for c
in MODULE_COLLECTION
:
1340 if c
['name'].name
not in self
.db_collection
:
1341 self
.db_collection
.append(c
['name'])
1343 self
.set_store('collection', json
.dumps(self
.db_collection
))
1346 report
: Dict
[str, Dict
[str, str]],
1347 endpoint
: Optional
[List
[EndPoint
]] = None) -> Tuple
[int, str, str]:
1349 endpoint
= [self
.EndPoint
.ceph
, self
.EndPoint
.device
]
1352 self
.log
.debug('Send endpoints %s' % endpoint
)
1354 if e
== self
.EndPoint
.ceph
:
1355 fail_reason
= self
._try
_post
('ceph report', self
.url
, report
)
1357 failed
.append(fail_reason
)
1359 now
= int(time
.time())
1360 self
.last_upload
= now
1361 self
.set_store('last_upload', str(now
))
1362 success
.append('Ceph report sent to {0}'.format(self
.url
))
1363 self
.log
.info('Sent report to {0}'.format(self
.url
))
1364 elif e
== self
.EndPoint
.device
:
1365 if 'device' in self
.get_active_channels():
1366 devices
= self
.gather_device_report()
1370 for host
, ls
in devices
.items():
1371 self
.log
.debug('host %s devices %s' % (host
, ls
))
1374 fail_reason
= self
._try
_post
('devices', self
.device_url
,
1377 failed
.append(fail_reason
)
1382 success
.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1383 num_devs
, num_hosts
, len(devices
)))
1385 fail_reason
= 'Unable to send device report: Device channel is on, but the generated report was empty.'
1386 failed
.append(fail_reason
)
1387 self
.log
.error(fail_reason
)
1389 return 1, '', '\n'.join(success
+ failed
)
1390 return 0, '', '\n'.join(success
)
1392 def format_perf_histogram(self
, report
: Dict
[str, Any
]) -> None:
1393 # Formatting the perf histograms so they are human-readable. This will change the
1394 # ranges and values, which are currently in list form, into strings so that
1395 # they are displayed horizontally instead of vertically.
1397 # Formatting ranges and values in osd_perf_histograms
1398 mode
= 'osd_perf_histograms'
1399 for config
in report
[mode
]:
1400 for histogram
in config
:
1401 # Adjust ranges by converting lists into strings
1402 for axis
in config
[histogram
]['axes']:
1403 for i
in range(0, len(axis
['ranges'])):
1404 axis
['ranges'][i
] = str(axis
['ranges'][i
])
1406 for osd
in config
[histogram
]['osds']:
1407 for i
in range(0, len(osd
['values'])):
1408 osd
['values'][i
] = str(osd
['values'][i
])
1410 # If the perf channel is not enabled, there should be a KeyError since
1411 # 'osd_perf_histograms' would not be present in the report. In that case,
1412 # the show function should pass as usual without trying to format the
1416 def toggle_channel(self
, action
: str, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1418 Enable or disable a list of channels
1420 if not self
.enabled
:
1421 # telemetry should be on for channels to be toggled
1422 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1423 'Preview sample reports with `ceph telemetry preview`.'
1426 if channels
is None:
1427 msg
= f
'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1430 state
= action
== 'enable'
1433 if c
not in ALL_CHANNELS
:
1434 msg
= f
"{msg}{c} is not a valid channel name. "\
1435 f
"Available channels: {ALL_CHANNELS}.\n"
1437 self
.set_module_option(f
"channel_{c}", state
)
1441 msg
= f
"{msg}channel_{c} is {action}d\n"
1445 @CLIReadCommand('telemetry status')
1446 def status(self
) -> Tuple
[int, str, str]:
1448 Show current configuration
1451 for opt
in self
.MODULE_OPTIONS
:
1452 r
[opt
['name']] = getattr(self
, opt
['name'])
1453 r
['last_upload'] = (time
.ctime(self
.last_upload
)
1454 if self
.last_upload
else self
.last_upload
)
1455 return 0, json
.dumps(r
, indent
=4, sort_keys
=True), ''
1457 @CLIReadCommand('telemetry diff')
1458 def diff(self
) -> Tuple
[int, str, str]:
1460 Show the diff between opted-in collection and available collection
1465 for c
in MODULE_COLLECTION
:
1466 if not self
.is_enabled_collection(c
['name']):
1467 diff
.append({key
: val
for key
, val
in c
.items() if key
not in keys
})
1471 r
= "Telemetry is up to date"
1473 r
= json
.dumps(diff
, indent
=4, sort_keys
=True)
1477 @CLICommand('telemetry on')
1478 def on(self
, license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1480 Enable telemetry reports from this cluster
1482 if license
!= LICENSE
:
1483 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1484 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1486 self
.set_module_option('enabled', True)
1488 self
.opt_in_all_collections()
1490 # for major releases upgrade nagging
1491 mon_map
= self
.get('mon_map')
1492 mon_min
= mon_map
.get("min_mon_release", 0)
1493 self
.set_store('last_opted_in_ceph_version', str(mon_min
))
1494 self
.last_opted_in_ceph_version
= mon_min
1496 msg
= 'Telemetry is on.'
1497 disabled_channels
= ''
1498 active_channels
= self
.get_active_channels()
1499 for c
in ALL_CHANNELS
:
1500 if c
not in active_channels
and c
!= 'ident':
1501 disabled_channels
= f
"{disabled_channels} {c}"
1503 if len(disabled_channels
) > 0:
1504 msg
= f
"{msg}\nSome channels are disabled, please enable with:\n"\
1505 f
"`ceph telemetry enable channel{disabled_channels}`"
1509 @CLICommand('telemetry off')
1510 def off(self
) -> Tuple
[int, str, str]:
1512 Disable telemetry reports from this cluster
1514 if not self
.enabled
:
1515 # telemetry is already off
1516 msg
= 'Telemetry is currently not enabled, nothing to turn off. '\
1517 'Please consider opting-in with `ceph telemetry on`.\n' \
1518 'Preview sample reports with `ceph telemetry preview`.'
1521 self
.set_module_option('enabled', False)
1522 self
.enabled
= False
1523 self
.set_store('collection', json
.dumps([]))
1524 self
.db_collection
= []
1526 # we might need this info in the future, in case
1527 # of nagging when user is opted-out
1528 mon_map
= self
.get('mon_map')
1529 mon_min
= mon_map
.get("min_mon_release", 0)
1530 self
.set_store('last_opted_out_ceph_version', str(mon_min
))
1531 self
.last_opted_out_ceph_version
= mon_min
1533 msg
= 'Telemetry is now disabled.'
1536 @CLIReadCommand('telemetry enable channel all')
1537 def enable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1541 return self
.toggle_channel('enable', channels
)
1543 @CLIReadCommand('telemetry enable channel')
1544 def enable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1546 Enable a list of channels
1548 return self
.toggle_channel('enable', channels
)
1550 @CLIReadCommand('telemetry disable channel all')
1551 def disable_channel_all(self
, channels
: List
[str] = ALL_CHANNELS
) -> Tuple
[int, str, str]:
1553 Disable all channels
1555 return self
.toggle_channel('disable', channels
)
1557 @CLIReadCommand('telemetry disable channel')
1558 def disable_channel(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1560 Disable a list of channels
1562 return self
.toggle_channel('disable', channels
)
1564 @CLIReadCommand('telemetry channel ls')
1565 def channel_ls(self
) -> Tuple
[int, str, str]:
1569 table
= PrettyTable(
1571 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1574 table
.align
['NAME'] = 'l'
1575 table
.align
['ENABLED'] = 'l'
1576 table
.align
['DEFAULT'] = 'l'
1577 table
.align
['DESC'] = 'l'
1578 table
.left_padding_width
= 0
1579 table
.right_padding_width
= 4
1581 for c
in ALL_CHANNELS
:
1582 enabled
= "ON" if getattr(self
, f
"channel_{c}") else "OFF"
1583 for o
in self
.MODULE_OPTIONS
:
1584 if o
['name'] == f
"channel_{c}":
1585 default
= "ON" if o
.get('default', None) else "OFF"
1586 desc
= o
.get('desc', None)
1595 return 0, table
.get_string(sortby
="NAME"), ''
1597 @CLIReadCommand('telemetry collection ls')
1598 def collection_ls(self
) -> Tuple
[int, str, str]:
1600 List all collections
1602 col_delta
= self
.collection_delta()
1604 if col_delta
is not None and len(col_delta
) > 0:
1605 msg
= f
"New collections are available:\n" \
1606 f
"{sorted([c.name for c in col_delta])}\n" \
1607 f
"Run `ceph telemetry on` to opt-in to these collections.\n"
1609 table
= PrettyTable(
1611 'NAME', 'STATUS', 'DESC',
1614 table
.align
['NAME'] = 'l'
1615 table
.align
['STATUS'] = 'l'
1616 table
.align
['DESC'] = 'l'
1617 table
.left_padding_width
= 0
1618 table
.right_padding_width
= 4
1620 for c
in MODULE_COLLECTION
:
1622 opted_in
= self
.is_enabled_collection(name
)
1623 channel_enabled
= getattr(self
, f
"channel_{c['channel']}")
1626 if channel_enabled
and opted_in
:
1627 status
= "REPORTING"
1633 why
+= "NOT OPTED-IN"
1635 if not channel_enabled
:
1636 why
+= f
"{delimiter}CHANNEL {c['channel']} IS OFF"
1638 status
= f
"NOT REPORTING: {why}"
1640 desc
= c
['description']
1649 # add a new line between message and table output
1652 return 0, f
'{msg}{table.get_string(sortby="NAME")}', ''
1654 @CLICommand('telemetry send')
1656 endpoint
: Optional
[List
[EndPoint
]] = None,
1657 license
: Optional
[str] = None) -> Tuple
[int, str, str]:
1659 Send a sample report
1661 if not self
.is_opted_in() and license
!= LICENSE
:
1662 self
.log
.debug(('A telemetry send attempt while opted-out. '
1663 'Asking for license agreement'))
1664 return -errno
.EPERM
, '', f
'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1665 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1666 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1668 self
.last_report
= self
.compile_report()
1669 return self
.send(self
.last_report
, endpoint
)
1671 @CLIReadCommand('telemetry show')
1672 def show(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1674 Show a sample report of opted-in collections (except for 'device')
1676 if not self
.enabled
:
1677 # if telemetry is off, no report is being sent, hence nothing to show
1678 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1679 'Preview sample reports with `ceph telemetry preview`.'
1682 report
= self
.get_report_locked(channels
=channels
)
1683 self
.format_perf_histogram(report
)
1684 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1686 if self
.channel_device
:
1687 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1689 return 0, report
, ''
1691 @CLIReadCommand('telemetry preview')
1692 def preview(self
, channels
: Optional
[List
[str]] = None) -> Tuple
[int, str, str]:
1694 Preview a sample report of the most recent collections available (except for 'device')
1698 # We use a lock to prevent a scenario where the user wishes to preview
1699 # the report, and at the same time the module hits the interval of
1700 # sending a report with the opted-in collection, which has less data
1701 # than in the preview report.
1702 col_delta
= self
.collection_delta()
1703 with self
.get_report_lock
:
1704 if col_delta
is not None and len(col_delta
) == 0:
1705 # user is already opted-in to the most recent collection
1706 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1709 # there are collections the user is not opted-in to
1710 next_collection
= []
1712 for c
in MODULE_COLLECTION
:
1713 next_collection
.append(c
['name'].name
)
1715 opted_in_collection
= self
.db_collection
1716 self
.db_collection
= next_collection
1717 report
= self
.get_report(channels
=channels
)
1718 self
.db_collection
= opted_in_collection
1720 self
.format_perf_histogram(report
)
1721 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1723 if self
.channel_device
:
1724 report
+= '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1726 return 0, report
, ''
1728 @CLIReadCommand('telemetry show-device')
1729 def show_device(self
) -> Tuple
[int, str, str]:
1731 Show a sample device report
1733 if not self
.enabled
:
1734 # if telemetry is off, no report is being sent, hence nothing to show
1735 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1736 'Preview sample device reports with `ceph telemetry preview-device`.'
1739 if not self
.channel_device
:
1740 # if device channel is off, device report is not being sent, hence nothing to show
1741 msg
= 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1742 'Preview sample device reports with `ceph telemetry preview-device`.'
1745 return 0, json
.dumps(self
.get_report_locked('device'), indent
=4, sort_keys
=True), ''
1747 @CLIReadCommand('telemetry preview-device')
1748 def preview_device(self
) -> Tuple
[int, str, str]:
1750 Preview a sample device report of the most recent device collection
1754 device_col_delta
= self
.collection_delta(['device'])
1755 with self
.get_report_lock
:
1756 if device_col_delta
is not None and len(device_col_delta
) == 0 and self
.channel_device
:
1757 # user is already opted-in to the most recent device collection,
1758 # and device channel is on, thus `show-device` should be called
1759 msg
= 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1762 # either the user is not opted-in at all, or there are collections
1763 # they are not opted-in to
1764 next_collection
= []
1766 for c
in MODULE_COLLECTION
:
1767 next_collection
.append(c
['name'].name
)
1769 opted_in_collection
= self
.db_collection
1770 self
.db_collection
= next_collection
1771 report
= self
.get_report('device')
1772 self
.db_collection
= opted_in_collection
1774 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1775 return 0, report
, ''
1777 @CLIReadCommand('telemetry show-all')
1778 def show_all(self
) -> Tuple
[int, str, str]:
1780 Show a sample report of all enabled channels (including 'device' channel)
1782 if not self
.enabled
:
1783 # if telemetry is off, no report is being sent, hence nothing to show
1784 msg
= 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1785 'Preview sample reports with `ceph telemetry preview`.'
1788 if not self
.channel_device
:
1789 # device channel is off, no need to display its report
1790 return 0, json
.dumps(self
.get_report_locked('default'), indent
=4, sort_keys
=True), ''
1792 # telemetry is on and device channel is enabled, show both
1793 return 0, json
.dumps(self
.get_report_locked('all'), indent
=4, sort_keys
=True), ''
1795 @CLIReadCommand('telemetry preview-all')
1796 def preview_all(self
) -> Tuple
[int, str, str]:
1798 Preview a sample report of the most recent collections available of all channels (including 'device')
1802 col_delta
= self
.collection_delta()
1803 with self
.get_report_lock
:
1804 if col_delta
is not None and len(col_delta
) == 0:
1805 # user is already opted-in to the most recent collection
1806 msg
= 'Telemetry is up to date, see report with `ceph telemetry show`.'
1809 # there are collections the user is not opted-in to
1810 next_collection
= []
1812 for c
in MODULE_COLLECTION
:
1813 next_collection
.append(c
['name'].name
)
1815 opted_in_collection
= self
.db_collection
1816 self
.db_collection
= next_collection
1817 report
= self
.get_report('all')
1818 self
.db_collection
= opted_in_collection
1820 self
.format_perf_histogram(report
)
1821 report
= json
.dumps(report
, indent
=4, sort_keys
=True)
1823 return 0, report
, ''
1825 def get_report_locked(self
,
1826 report_type
: str = 'default',
1827 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1829 A wrapper around get_report to allow for compiling a report of the most recent module collections
1831 with self
.get_report_lock
:
1832 return self
.get_report(report_type
, channels
)
1834 def get_report(self
,
1835 report_type
: str = 'default',
1836 channels
: Optional
[List
[str]] = None) -> Dict
[str, Any
]:
1837 if report_type
== 'default':
1838 return self
.compile_report(channels
=channels
)
1839 elif report_type
== 'device':
1840 return self
.gather_device_report()
1841 elif report_type
== 'all':
1842 return {'report': self
.compile_report(channels
=channels
),
1843 'device_report': self
.gather_device_report()}
1846 def self_test(self
) -> None:
1847 report
= self
.compile_report()
1848 if len(report
) == 0:
1849 raise RuntimeError('Report is empty')
1851 if 'report_id' not in report
:
1852 raise RuntimeError('report_id not found in report')
1854 def shutdown(self
) -> None:
1858 def refresh_health_checks(self
) -> None:
1860 # TODO do we want to nag also in case the user is not opted-in?
1861 if self
.enabled
and self
.should_nag():
1862 health_checks
['TELEMETRY_CHANGED'] = {
1863 'severity': 'warning',
1864 'summary': 'Telemetry requires re-opt-in',
1866 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
1869 self
.set_health_checks(health_checks
)
1871 def serve(self
) -> None:
1875 self
.log
.debug('Waiting for mgr to warm up')
1881 self
.refresh_health_checks()
1883 if not self
.is_opted_in():
1884 self
.log
.debug('Not sending report until user re-opts-in')
1885 self
.event
.wait(1800)
1887 if not self
.enabled
:
1888 self
.log
.debug('Not sending report until configured to do so')
1889 self
.event
.wait(1800)
1892 now
= int(time
.time())
1893 if not self
.last_upload
or \
1894 (now
- self
.last_upload
) > self
.interval
* 3600:
1895 self
.log
.info('Compiling and sending report to %s',
1899 self
.last_report
= self
.compile_report()
1901 self
.log
.exception('Exception while compiling report:')
1903 self
.send(self
.last_report
)
1905 self
.log
.debug('Interval for sending new report has not expired')
1908 self
.log
.debug('Sleeping for %d seconds', sleep
)
1909 self
.event
.wait(sleep
)
1912 def can_run() -> Tuple
[bool, str]: