]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import logging
8 import numbers
9 import enum
10 import errno
11 import hashlib
12 import json
13 import rbd
14 import requests
15 import uuid
16 import time
17 from datetime import datetime, timedelta
18 from prettytable import PrettyTable
19 from threading import Event, Lock
20 from collections import defaultdict
21 from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
22
23 from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
24
25
26 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
27
28 LICENSE = 'sharing-1-0'
29 LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL = 'https://cdla.io/sharing-1-0/'
31 NO_SALT_CNT = 0
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Collection(str, enum.Enum):
63 basic_base = 'basic_base'
64 device_base = 'device_base'
65 crash_base = 'crash_base'
66 ident_base = 'ident_base'
67 perf_perf = 'perf_perf'
68 basic_mds_metadata = 'basic_mds_metadata'
69 basic_pool_usage = 'basic_pool_usage'
70 basic_usage_by_class = 'basic_usage_by_class'
71 basic_rook_v01 = 'basic_rook_v01'
72 perf_memory_metrics = 'perf_memory_metrics'
73 basic_pool_options_bluestore = 'basic_pool_options_bluestore'
74
75 MODULE_COLLECTION : List[Dict] = [
76 {
77 "name": Collection.basic_base,
78 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
79 "channel": "basic",
80 "nag": False
81 },
82 {
83 "name": Collection.device_base,
84 "description": "Information about device health metrics",
85 "channel": "device",
86 "nag": False
87 },
88 {
89 "name": Collection.crash_base,
90 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
91 "channel": "crash",
92 "nag": False
93 },
94 {
95 "name": Collection.ident_base,
96 "description": "User-provided identifying information about the cluster",
97 "channel": "ident",
98 "nag": False
99 },
100 {
101 "name": Collection.perf_perf,
102 "description": "Information about performance counters of the cluster",
103 "channel": "perf",
104 "nag": True
105 },
106 {
107 "name": Collection.basic_mds_metadata,
108 "description": "MDS metadata",
109 "channel": "basic",
110 "nag": False
111 },
112 {
113 "name": Collection.basic_pool_usage,
114 "description": "Default pool application and usage statistics",
115 "channel": "basic",
116 "nag": False
117 },
118 {
119 "name": Collection.basic_usage_by_class,
120 "description": "Default device class usage statistics",
121 "channel": "basic",
122 "nag": False
123 },
124 {
125 "name": Collection.basic_rook_v01,
126 "description": "Basic Rook deployment data",
127 "channel": "basic",
128 "nag": True
129 },
130 {
131 "name": Collection.perf_memory_metrics,
132 "description": "Heap stats and mempools for mon and mds",
133 "channel": "perf",
134 "nag": False
135 },
136 {
137 "name": Collection.basic_pool_options_bluestore,
138 "description": "Per-pool bluestore config options",
139 "channel": "basic",
140 "nag": False
141 },
142 ]
143
144 ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
145 # Note: a key cannot be both a node and a leaf, e.g.
146 # "rook/a/b"
147 # "rook/a/b/c"
148 ("rook/version", Collection.basic_rook_v01),
149 ("rook/kubernetes/version", Collection.basic_rook_v01),
150 ("rook/csi/version", Collection.basic_rook_v01),
151 ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
152 ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
153 ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
154 ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
155 ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
156 ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
157 ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
158 ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
159 ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
160 ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
161 ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
162 ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
163 ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
164 ("rook/cluster/mon/count", Collection.basic_rook_v01),
165 ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
166 ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
167 ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
168 ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
169 ("rook/cluster/network/provider", Collection.basic_rook_v01),
170 ("rook/cluster/external-mode", Collection.basic_rook_v01),
171 ]
172
173 class Module(MgrModule):
174 metadata_keys = [
175 "arch",
176 "ceph_version",
177 "os",
178 "cpu",
179 "kernel_description",
180 "kernel_version",
181 "distro_description",
182 "distro"
183 ]
184
185 MODULE_OPTIONS = [
186 Option(name='url',
187 type='str',
188 default='https://telemetry.ceph.com/report'),
189 Option(name='device_url',
190 type='str',
191 default='https://telemetry.ceph.com/device'),
192 Option(name='enabled',
193 type='bool',
194 default=False),
195 Option(name='last_opt_revision',
196 type='int',
197 default=1),
198 Option(name='leaderboard',
199 type='bool',
200 default=False),
201 Option(name='leaderboard_description',
202 type='str',
203 default=None),
204 Option(name='description',
205 type='str',
206 default=None),
207 Option(name='contact',
208 type='str',
209 default=None),
210 Option(name='organization',
211 type='str',
212 default=None),
213 Option(name='proxy',
214 type='str',
215 default=None),
216 Option(name='interval',
217 type='int',
218 default=24,
219 min=8),
220 Option(name='channel_basic',
221 type='bool',
222 default=True,
223 desc='Share basic cluster information (size, version)'),
224 Option(name='channel_ident',
225 type='bool',
226 default=False,
227 desc='Share a user-provided description and/or contact email for the cluster'),
228 Option(name='channel_crash',
229 type='bool',
230 default=True,
231 desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
232 Option(name='channel_device',
233 type='bool',
234 default=True,
235 desc=('Share device health metrics '
236 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
237 Option(name='channel_perf',
238 type='bool',
239 default=False,
240 desc='Share various performance metrics of a cluster'),
241 ]
242
243 @property
244 def config_keys(self) -> Dict[str, OptionValue]:
245 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
246
247 def __init__(self, *args: Any, **kwargs: Any) -> None:
248 super(Module, self).__init__(*args, **kwargs)
249 self.event = Event()
250 self.run = False
251 self.db_collection: Optional[List[str]] = None
252 self.last_opted_in_ceph_version: Optional[int] = None
253 self.last_opted_out_ceph_version: Optional[int] = None
254 self.last_upload: Optional[int] = None
255 self.last_report: Dict[str, Any] = dict()
256 self.report_id: Optional[str] = None
257 self.salt: Optional[str] = None
258 self.get_report_lock = Lock()
259 self.config_update_module_option()
260 # for mypy which does not run the code
261 if TYPE_CHECKING:
262 self.url = ''
263 self.device_url = ''
264 self.enabled = False
265 self.last_opt_revision = 0
266 self.leaderboard = ''
267 self.leaderboard_description = ''
268 self.interval = 0
269 self.proxy = ''
270 self.channel_basic = True
271 self.channel_ident = False
272 self.channel_crash = True
273 self.channel_device = True
274 self.channel_perf = False
275 self.db_collection = ['basic_base', 'device_base']
276 self.last_opted_in_ceph_version = 17
277 self.last_opted_out_ceph_version = 0
278
279 def config_update_module_option(self) -> None:
280 for opt in self.MODULE_OPTIONS:
281 setattr(self,
282 opt['name'],
283 self.get_module_option(opt['name']))
284 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
285
286 def config_notify(self) -> None:
287 self.config_update_module_option()
288 # wake up serve() thread
289 self.event.set()
290
291 def load(self) -> None:
292 last_upload = self.get_store('last_upload', None)
293 if last_upload is None:
294 self.last_upload = None
295 else:
296 self.last_upload = int(last_upload)
297
298 report_id = self.get_store('report_id', None)
299 if report_id is None:
300 self.report_id = str(uuid.uuid4())
301 self.set_store('report_id', self.report_id)
302 else:
303 self.report_id = report_id
304
305 salt = self.get_store('salt', None)
306 if salt is None:
307 self.salt = str(uuid.uuid4())
308 self.set_store('salt', self.salt)
309 else:
310 self.salt = salt
311
312 self.init_collection()
313
314 last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
315 if last_opted_in_ceph_version is None:
316 self.last_opted_in_ceph_version = None
317 else:
318 self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
319
320 last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
321 if last_opted_out_ceph_version is None:
322 self.last_opted_out_ceph_version = None
323 else:
324 self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
325
326 def gather_osd_metadata(self,
327 osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
328 keys = ["osd_objectstore", "rotational"]
329 keys += self.metadata_keys
330
331 metadata: Dict[str, Dict[str, int]] = dict()
332 for key in keys:
333 metadata[key] = defaultdict(int)
334
335 for osd in osd_map['osds']:
336 res = self.get_metadata('osd', str(osd['osd']))
337 if res is None:
338 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
339 continue
340 for k, v in res.items():
341 if k not in keys:
342 continue
343
344 metadata[k][v] += 1
345
346 return metadata
347
348 def gather_mon_metadata(self,
349 mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
350 keys = list()
351 keys += self.metadata_keys
352
353 metadata: Dict[str, Dict[str, int]] = dict()
354 for key in keys:
355 metadata[key] = defaultdict(int)
356
357 for mon in mon_map['mons']:
358 res = self.get_metadata('mon', mon['name'])
359 if res is None:
360 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
361 continue
362 for k, v in res.items():
363 if k not in keys:
364 continue
365
366 metadata[k][v] += 1
367
368 return metadata
369
370 def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
371 metadata: Dict[str, Dict[str, int]] = dict()
372
373 res = self.get('mds_metadata') # metadata of *all* mds daemons
374 if res is None or not res:
375 self.log.debug('Could not get metadata for mds daemons')
376 return metadata
377
378 keys = list()
379 keys += self.metadata_keys
380
381 for key in keys:
382 metadata[key] = defaultdict(int)
383
384 for mds in res.values():
385 for k, v in mds.items():
386 if k not in keys:
387 continue
388
389 metadata[k][v] += 1
390
391 return metadata
392
393 def gather_crush_info(self) -> Dict[str, Union[int,
394 bool,
395 List[int],
396 Dict[str, int],
397 Dict[int, int]]]:
398 osdmap = self.get_osdmap()
399 crush_raw = osdmap.get_crush()
400 crush = crush_raw.dump()
401
402 BucketKeyT = TypeVar('BucketKeyT', int, str)
403
404 def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
405 if k in d:
406 d[k] += 1
407 else:
408 d[k] = 1
409
410 device_classes: Dict[str, int] = {}
411 for dev in crush['devices']:
412 inc(device_classes, dev.get('class', ''))
413
414 bucket_algs: Dict[str, int] = {}
415 bucket_types: Dict[str, int] = {}
416 bucket_sizes: Dict[int, int] = {}
417 for bucket in crush['buckets']:
418 if '~' in bucket['name']: # ignore shadow buckets
419 continue
420 inc(bucket_algs, bucket['alg'])
421 inc(bucket_types, bucket['type_id'])
422 inc(bucket_sizes, len(bucket['items']))
423
424 return {
425 'num_devices': len(crush['devices']),
426 'num_types': len(crush['types']),
427 'num_buckets': len(crush['buckets']),
428 'num_rules': len(crush['rules']),
429 'device_classes': list(device_classes.values()),
430 'tunables': crush['tunables'],
431 'compat_weight_set': '-1' in crush['choose_args'],
432 'num_weight_sets': len(crush['choose_args']),
433 'bucket_algs': bucket_algs,
434 'bucket_sizes': bucket_sizes,
435 'bucket_types': bucket_types,
436 }
437
438 def gather_configs(self) -> Dict[str, List[str]]:
439 # cluster config options
440 cluster = set()
441 r, outb, outs = self.mon_command({
442 'prefix': 'config dump',
443 'format': 'json'
444 })
445 if r != 0:
446 return {}
447 try:
448 dump = json.loads(outb)
449 except json.decoder.JSONDecodeError:
450 return {}
451 for opt in dump:
452 name = opt.get('name')
453 if name:
454 cluster.add(name)
455 # daemon-reported options (which may include ceph.conf)
456 active = set()
457 ls = self.get("modified_config_options")
458 for opt in ls.get('options', {}):
459 active.add(opt)
460 return {
461 'cluster_changed': sorted(list(cluster)),
462 'active_changed': sorted(list(active)),
463 }
464
465 def anonymize_entity_name(self, entity_name:str) -> str:
466 if '.' not in entity_name:
467 self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
468 return entity_name
469
470 (etype, eid) = entity_name.split('.', 1)
471 m = hashlib.sha1()
472 salt = ''
473 if self.salt is not None:
474 salt = self.salt
475 # avoid asserting that salt exists
476 if not self.salt:
477 # do not set self.salt to a temp value
478 salt = f"no_salt_found_{NO_SALT_CNT}"
479 NO_SALT_CNT += 1
480 self.log.debug(f"No salt found, created a temp one: {salt}")
481 m.update(salt.encode('utf-8'))
482 m.update(eid.encode('utf-8'))
483 m.update(salt.encode('utf-8'))
484
485 return etype + '.' + m.hexdigest()
486
487 def get_heap_stats(self) -> Dict[str, dict]:
488 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
489 anonymized_daemons = {}
490 osd_map = self.get('osd_map')
491
492 # Combine available daemons
493 daemons = []
494 for osd in osd_map['osds']:
495 daemons.append('osd'+'.'+str(osd['osd']))
496 # perf_memory_metrics collection (1/2)
497 if self.is_enabled_collection(Collection.perf_memory_metrics):
498 mon_map = self.get('mon_map')
499 mds_metadata = self.get('mds_metadata')
500 for mon in mon_map['mons']:
501 daemons.append('mon'+'.'+mon['name'])
502 for mds in mds_metadata:
503 daemons.append('mds'+'.'+mds)
504
505 # Grab output from the "daemon.x heap stats" command
506 for daemon in daemons:
507 daemon_type, daemon_id = daemon.split('.', 1)
508 heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
509 if heap_stats:
510 if (daemon_type != 'osd'):
511 # Anonymize mon and mds
512 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
513 daemon = anonymized_daemons[daemon]
514 result[daemon_type][daemon] = heap_stats
515 else:
516 continue
517
518 if anonymized_daemons:
519 # for debugging purposes only, this data is never reported
520 self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
521 return result
522
523 def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
524 parsed_output = {}
525
526 cmd_dict = {
527 'prefix': 'heap',
528 'heapcmd': 'stats'
529 }
530 r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
531
532 if r != 0:
533 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
534 else:
535 if 'tcmalloc heap stats' in outb:
536 values = [int(i) for i in outb.split() if i.isdigit()]
537 # `categories` must be ordered this way for the correct output to be parsed
538 categories = ['use_by_application',
539 'page_heap_freelist',
540 'central_cache_freelist',
541 'transfer_cache_freelist',
542 'thread_cache_freelists',
543 'malloc_metadata',
544 'actual_memory_used',
545 'released_to_os',
546 'virtual_address_space_used',
547 'spans_in_use',
548 'thread_heaps_in_use',
549 'tcmalloc_page_size']
550 if len(values) != len(categories):
551 self.log.error('Received unexpected output from {}.{}; ' \
552 'number of values should match the number' \
553 'of expected categories:\n values: len={} {} '\
554 '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
555 else:
556 parsed_output = dict(zip(categories, values))
557 else:
558 self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
559
560 return parsed_output
561
562 def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
563 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
564 anonymized_daemons = {}
565 osd_map = self.get('osd_map')
566
567 # Combine available daemons
568 daemons = []
569 for osd in osd_map['osds']:
570 daemons.append('osd'+'.'+str(osd['osd']))
571 # perf_memory_metrics collection (2/2)
572 if self.is_enabled_collection(Collection.perf_memory_metrics):
573 mon_map = self.get('mon_map')
574 mds_metadata = self.get('mds_metadata')
575 for mon in mon_map['mons']:
576 daemons.append('mon'+'.'+mon['name'])
577 for mds in mds_metadata:
578 daemons.append('mds'+'.'+mds)
579
580 # Grab output from the "dump_mempools" command
581 for daemon in daemons:
582 daemon_type, daemon_id = daemon.split('.', 1)
583 cmd_dict = {
584 'prefix': 'dump_mempools',
585 'format': 'json'
586 }
587 r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
588 if r != 0:
589 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
590 continue
591 else:
592 try:
593 # This is where the mempool will land.
594 dump = json.loads(outb)
595 if mode == 'separated':
596 # Anonymize mon and mds
597 if daemon_type != 'osd':
598 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
599 daemon = anonymized_daemons[daemon]
600 result[daemon_type][daemon] = dump['mempool']['by_pool']
601 elif mode == 'aggregated':
602 for mem_type in dump['mempool']['by_pool']:
603 result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
604 result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
605 else:
606 self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
607 except (json.decoder.JSONDecodeError, KeyError) as e:
608 self.log.exception("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
609 continue
610
611 if anonymized_daemons:
612 # for debugging purposes only, this data is never reported
613 self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
614
615 return result
616
617 def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
618 # Initialize result dict
619 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
620 lambda: defaultdict(
621 lambda: defaultdict(
622 lambda: defaultdict(
623 lambda: defaultdict(int))))))
624
625 # Get list of osd ids from the metadata
626 osd_metadata = self.get('osd_metadata')
627
628 # Grab output from the "osd.x perf histogram dump" command
629 for osd_id in osd_metadata:
630 cmd_dict = {
631 'prefix': 'perf histogram dump',
632 'id': str(osd_id),
633 'format': 'json'
634 }
635 r, outb, outs = self.osd_command(cmd_dict)
636 # Check for invalid calls
637 if r != 0:
638 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
639 continue
640 else:
641 try:
642 # This is where the histograms will land if there are any.
643 dump = json.loads(outb)
644
645 for histogram in dump['osd']:
646 # Log axis information. There are two axes, each represented
647 # as a dictionary. Both dictionaries are contained inside a
648 # list called 'axes'.
649 axes = []
650 for axis in dump['osd'][histogram]['axes']:
651
652 # This is the dict that contains information for an individual
653 # axis. It will be appended to the 'axes' list at the end.
654 axis_dict: Dict[str, Any] = defaultdict()
655
656 # Collecting information for buckets, min, name, etc.
657 axis_dict['buckets'] = axis['buckets']
658 axis_dict['min'] = axis['min']
659 axis_dict['name'] = axis['name']
660 axis_dict['quant_size'] = axis['quant_size']
661 axis_dict['scale_type'] = axis['scale_type']
662
663 # Collecting ranges; placing them in lists to
664 # improve readability later on.
665 ranges = []
666 for _range in axis['ranges']:
667 _max, _min = None, None
668 if 'max' in _range:
669 _max = _range['max']
670 if 'min' in _range:
671 _min = _range['min']
672 ranges.append([_min, _max])
673 axis_dict['ranges'] = ranges
674
675 # Now that 'axis_dict' contains all the appropriate
676 # information for the current axis, append it to the 'axes' list.
677 # There will end up being two axes in the 'axes' list, since the
678 # histograms are 2D.
679 axes.append(axis_dict)
680
681 # Add the 'axes' list, containing both axes, to result.
682 # At this point, you will see that the name of the key is the string
683 # form of our axes list (str(axes)). This is there so that histograms
684 # with different axis configs will not be combined.
685 # These key names are later dropped when only the values are returned.
686 result[str(axes)][histogram]['axes'] = axes
687
688 # Collect current values and make sure they are in
689 # integer form.
690 values = []
691 for value_list in dump['osd'][histogram]['values']:
692 values.append([int(v) for v in value_list])
693
694 if mode == 'separated':
695 if 'osds' not in result[str(axes)][histogram]:
696 result[str(axes)][histogram]['osds'] = []
697 result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
698
699 elif mode == 'aggregated':
700 # Aggregate values. If 'values' have already been initialized,
701 # we can safely add.
702 if 'values' in result[str(axes)][histogram]:
703 for i in range (0, len(values)):
704 for j in range (0, len(values[i])):
705 values[i][j] += result[str(axes)][histogram]['values'][i][j]
706
707 # Add the values to result.
708 result[str(axes)][histogram]['values'] = values
709
710 # Update num_combined_osds
711 if 'num_combined_osds' not in result[str(axes)][histogram]:
712 result[str(axes)][histogram]['num_combined_osds'] = 1
713 else:
714 result[str(axes)][histogram]['num_combined_osds'] += 1
715 else:
716 self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
717 return list()
718
719 # Sometimes, json errors occur if you give it an empty string.
720 # I am also putting in a catch for a KeyError since it could
721 # happen where the code is assuming that a key exists in the
722 # schema when it doesn't. In either case, we'll handle that
723 # by continuing and collecting what we can from other osds.
724 except (json.decoder.JSONDecodeError, KeyError) as e:
725 self.log.exception("Error caught on osd.{}: {}".format(osd_id, e))
726 continue
727
728 return list(result.values())
729
730 def get_io_rate(self) -> dict:
731 return self.get('io_rate')
732
733 def get_stats_per_pool(self) -> dict:
734 result = self.get('pg_dump')['pool_stats']
735
736 # collect application metadata from osd_map
737 osd_map = self.get('osd_map')
738 application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
739
740 # add application to each pool from pg_dump
741 for pool in result:
742 pool['application'] = []
743 # Only include default applications
744 for application in application_metadata[pool['poolid']]:
745 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
746 pool['application'].append(application)
747
748 return result
749
750 def get_stats_per_pg(self) -> dict:
751 return self.get('pg_dump')['pg_stats']
752
753 def get_rocksdb_stats(self) -> Dict[str, str]:
754 # Initalizers
755 result: Dict[str, str] = defaultdict()
756 version = self.get_rocksdb_version()
757
758 # Update result
759 result['version'] = version
760
761 return result
762
763 def gather_crashinfo(self) -> List[Dict[str, str]]:
764 crashlist: List[Dict[str, str]] = list()
765 errno, crashids, err = self.remote('crash', 'ls')
766 if errno:
767 return crashlist
768 for crashid in crashids.split():
769 errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
770 if errno:
771 continue
772 c = json.loads(crashinfo)
773
774 # redact hostname
775 del c['utsname_hostname']
776
777 # entity_name might have more than one '.', beware
778 (etype, eid) = c.get('entity_name', '').split('.', 1)
779 m = hashlib.sha1()
780 assert self.salt
781 m.update(self.salt.encode('utf-8'))
782 m.update(eid.encode('utf-8'))
783 m.update(self.salt.encode('utf-8'))
784 c['entity_name'] = etype + '.' + m.hexdigest()
785
786 # redact final line of python tracebacks, as the exception
787 # payload may contain identifying information
788 if 'mgr_module' in c and 'backtrace' in c:
789 # backtrace might be empty
790 if len(c['backtrace']) > 0:
791 c['backtrace'][-1] = '<redacted>'
792
793 crashlist.append(c)
794 return crashlist
795
796 def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
797 # Extract perf counter data with get_unlabeled_perf_counters(), a method
798 # from mgr/mgr_module.py. This method returns a nested dictionary that
799 # looks a lot like perf schema, except with some additional fields.
800 #
801 # Example of output, a snapshot of a mon daemon:
802 # "mon.b": {
803 # "bluestore.kv_flush_lat": {
804 # "count": 2431,
805 # "description": "Average kv_thread flush latency",
806 # "nick": "fl_l",
807 # "priority": 8,
808 # "type": 5,
809 # "units": 1,
810 # "value": 88814109
811 # },
812 # },
813 perf_counters = self.get_unlabeled_perf_counters()
814
815 # Initialize 'result' dict
816 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
817 lambda: defaultdict(lambda: defaultdict(int))))
818
819 # 'separated' mode
820 anonymized_daemon_dict = {}
821
822 for daemon, perf_counters_by_daemon in perf_counters.items():
823 daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
824
825 if mode == 'separated':
826 # anonymize individual daemon names except osds
827 if (daemon_type != 'osd'):
828 anonymized_daemon = self.anonymize_entity_name(daemon)
829 anonymized_daemon_dict[anonymized_daemon] = daemon
830 daemon = anonymized_daemon
831
832 # Calculate num combined daemon types if in aggregated mode
833 if mode == 'aggregated':
834 if 'num_combined_daemons' not in result[daemon_type]:
835 result[daemon_type]['num_combined_daemons'] = 1
836 else:
837 result[daemon_type]['num_combined_daemons'] += 1
838
839 for collection in perf_counters_by_daemon:
840 # Split the collection to avoid redundancy in final report; i.e.:
841 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
842 # bluestore: kv_flush_lat, kv_final_lat
843 col_0, col_1 = collection.split('.')
844
845 # Debug log for empty keys. This initially was a problem for prioritycache
846 # perf counters, where the col_0 was empty for certain mon counters:
847 #
848 # "mon.a": { instead of "mon.a": {
849 # "": { "prioritycache": {
850 # "cache_bytes": {...}, "cache_bytes": {...},
851 #
852 # This log is here to detect any future instances of a similar issue.
853 if (daemon == "") or (col_0 == "") or (col_1 == ""):
854 self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
855
856 if mode == 'separated':
857 # Add value to result
858 result[daemon][col_0][col_1]['value'] = \
859 perf_counters_by_daemon[collection]['value']
860
861 # Check that 'count' exists, as not all counters have a count field.
862 if 'count' in perf_counters_by_daemon[collection]:
863 result[daemon][col_0][col_1]['count'] = \
864 perf_counters_by_daemon[collection]['count']
865 elif mode == 'aggregated':
866 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
867 # has a uniquely-named collection that starts off identically (i.e.
868 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
869 # This bit of code combines these unique counters all under one rgw instance.
870 # Without this check, the schema would remain separeted out in the final report.
871 if col_0[0:11] == "objecter-0x":
872 col_0 = "objecter-0x"
873
874 # Check that the value can be incremented. In some cases,
875 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
876 # In those cases, the value is a dictionary, and not a number.
877 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
878 if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number):
879 result[daemon_type][col_0][col_1]['value'] += \
880 perf_counters_by_daemon[collection]['value']
881
882 # Check that 'count' exists, as not all counters have a count field.
883 if 'count' in perf_counters_by_daemon[collection]:
884 result[daemon_type][col_0][col_1]['count'] += \
885 perf_counters_by_daemon[collection]['count']
886 else:
887 self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
888 return {}
889
890 if mode == 'separated':
891 # for debugging purposes only, this data is never reported
892 self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
893
894 return result
895
896 def get_active_channels(self) -> List[str]:
897 r = []
898 if self.channel_basic:
899 r.append('basic')
900 if self.channel_crash:
901 r.append('crash')
902 if self.channel_device:
903 r.append('device')
904 if self.channel_ident:
905 r.append('ident')
906 if self.channel_perf:
907 r.append('perf')
908 return r
909
910 def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
911 try:
912 time_format = self.remote('devicehealth', 'get_time_format')
913 except Exception as e:
914 self.log.debug('Unable to format time: {}'.format(e))
915 return {}
916 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
917 min_sample = cutoff.strftime(time_format)
918
919 devices = self.get('devices')['devices']
920 if not devices:
921 self.log.debug('Unable to get device info from the mgr.')
922 return {}
923
924 # anon-host-id -> anon-devid -> { timestamp -> record }
925 res: Dict[str, Dict[str, Dict[str, str]]] = {}
926 for d in devices:
927 devid = d['devid']
928 try:
929 # this is a map of stamp -> {device info}
930 m = self.remote('devicehealth', 'get_recent_device_metrics',
931 devid, min_sample)
932 except Exception as e:
933 self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
934 continue
935
936 # anonymize host id
937 try:
938 host = d['location'][0]['host']
939 except (KeyError, IndexError) as e:
940 self.log.exception('Unable to get host from device with id "{}": {}'.format(devid, e))
941 continue
942 anon_host = self.get_store('host-id/%s' % host)
943 if not anon_host:
944 anon_host = str(uuid.uuid1())
945 self.set_store('host-id/%s' % host, anon_host)
946 serial = None
947 for dev, rep in m.items():
948 rep['host_id'] = anon_host
949 if serial is None and 'serial_number' in rep:
950 serial = rep['serial_number']
951
952 # anonymize device id
953 anon_devid = self.get_store('devid-id/%s' % devid)
954 if not anon_devid:
955 # ideally devid is 'vendor_model_serial',
956 # but can also be 'model_serial', 'serial'
957 if '_' in devid:
958 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
959 else:
960 anon_devid = str(uuid.uuid1())
961 self.set_store('devid-id/%s' % devid, anon_devid)
962 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
963 host, anon_host))
964
965 # anonymize the smartctl report itself
966 if serial:
967 m_str = json.dumps(m)
968 m = json.loads(m_str.replace(serial, 'deleted'))
969
970 if anon_host not in res:
971 res[anon_host] = {}
972 res[anon_host][anon_devid] = m
973 return res
974
975 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
976 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
977 if data:
978 return data[-1][1]
979 else:
980 return 0
981
982 def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
983 if not channels:
984 channels = self.get_active_channels()
985 report = {
986 'leaderboard': self.leaderboard,
987 'leaderboard_description': self.leaderboard_description,
988 'report_version': 1,
989 'report_timestamp': datetime.utcnow().isoformat(),
990 'report_id': self.report_id,
991 'channels': channels,
992 'channels_available': ALL_CHANNELS,
993 'license': LICENSE,
994 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
995 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
996 }
997
998 if 'ident' in channels:
999 for option in ['description', 'contact', 'organization']:
1000 report[option] = getattr(self, option)
1001
1002 if 'basic' in channels:
1003 mon_map = self.get('mon_map')
1004 osd_map = self.get('osd_map')
1005 service_map = self.get('service_map')
1006 fs_map = self.get('fs_map')
1007 df = self.get('df')
1008 df_pools = {pool['id']: pool for pool in df['pools']}
1009
1010 report['created'] = mon_map['created']
1011
1012 # mons
1013 v1_mons = 0
1014 v2_mons = 0
1015 ipv4_mons = 0
1016 ipv6_mons = 0
1017 for mon in mon_map['mons']:
1018 for a in mon['public_addrs']['addrvec']:
1019 if a['type'] == 'v2':
1020 v2_mons += 1
1021 elif a['type'] == 'v1':
1022 v1_mons += 1
1023 if a['addr'].startswith('['):
1024 ipv6_mons += 1
1025 else:
1026 ipv4_mons += 1
1027 report['mon'] = {
1028 'count': len(mon_map['mons']),
1029 'features': mon_map['features'],
1030 'min_mon_release': mon_map['min_mon_release'],
1031 'v1_addr_mons': v1_mons,
1032 'v2_addr_mons': v2_mons,
1033 'ipv4_addr_mons': ipv4_mons,
1034 'ipv6_addr_mons': ipv6_mons,
1035 }
1036
1037 report['config'] = self.gather_configs()
1038
1039 # pools
1040
1041 rbd_num_pools = 0
1042 rbd_num_images_by_pool = []
1043 rbd_mirroring_by_pool = []
1044 num_pg = 0
1045 report['pools'] = list()
1046 for pool in osd_map['pools']:
1047 num_pg += pool['pg_num']
1048 ec_profile = {}
1049 if pool['erasure_code_profile']:
1050 orig = osd_map['erasure_code_profiles'].get(
1051 pool['erasure_code_profile'], {})
1052 ec_profile = {
1053 k: orig[k] for k in orig.keys()
1054 if k in ['k', 'm', 'plugin', 'technique',
1055 'crush-failure-domain', 'l']
1056 }
1057 pool_data = {
1058 'pool': pool['pool'],
1059 'pg_num': pool['pg_num'],
1060 'pgp_num': pool['pg_placement_num'],
1061 'size': pool['size'],
1062 'min_size': pool['min_size'],
1063 'pg_autoscale_mode': pool['pg_autoscale_mode'],
1064 'target_max_bytes': pool['target_max_bytes'],
1065 'target_max_objects': pool['target_max_objects'],
1066 'type': ['', 'replicated', '', 'erasure'][pool['type']],
1067 'erasure_code_profile': ec_profile,
1068 'cache_mode': pool['cache_mode'],
1069 }
1070
1071 # basic_pool_usage collection
1072 if self.is_enabled_collection(Collection.basic_pool_usage):
1073 pool_data['application'] = []
1074 for application in pool['application_metadata']:
1075 # Only include default applications
1076 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
1077 pool_data['application'].append(application)
1078 pool_stats = df_pools[pool['pool']]['stats']
1079 pool_data['stats'] = { # filter out kb_used
1080 'avail_raw': pool_stats['avail_raw'],
1081 'bytes_used': pool_stats['bytes_used'],
1082 'compress_bytes_used': pool_stats['compress_bytes_used'],
1083 'compress_under_bytes': pool_stats['compress_under_bytes'],
1084 'data_bytes_used': pool_stats['data_bytes_used'],
1085 'dirty': pool_stats['dirty'],
1086 'max_avail': pool_stats['max_avail'],
1087 'objects': pool_stats['objects'],
1088 'omap_bytes_used': pool_stats['omap_bytes_used'],
1089 'percent_used': pool_stats['percent_used'],
1090 'quota_bytes': pool_stats['quota_bytes'],
1091 'quota_objects': pool_stats['quota_objects'],
1092 'rd': pool_stats['rd'],
1093 'rd_bytes': pool_stats['rd_bytes'],
1094 'stored': pool_stats['stored'],
1095 'stored_data': pool_stats['stored_data'],
1096 'stored_omap': pool_stats['stored_omap'],
1097 'stored_raw': pool_stats['stored_raw'],
1098 'wr': pool_stats['wr'],
1099 'wr_bytes': pool_stats['wr_bytes']
1100 }
1101 pool_data['options'] = {}
1102 # basic_pool_options_bluestore collection
1103 if self.is_enabled_collection(Collection.basic_pool_options_bluestore):
1104 bluestore_options = ['compression_algorithm',
1105 'compression_mode',
1106 'compression_required_ratio',
1107 'compression_min_blob_size',
1108 'compression_max_blob_size']
1109 for option in bluestore_options:
1110 if option in pool['options']:
1111 pool_data['options'][option] = pool['options'][option]
1112 cast(List[Dict[str, Any]], report['pools']).append(pool_data)
1113 if 'rbd' in pool['application_metadata']:
1114 rbd_num_pools += 1
1115 ioctx = self.rados.open_ioctx(pool['pool_name'])
1116 rbd_num_images_by_pool.append(
1117 sum(1 for _ in rbd.RBD().list2(ioctx)))
1118 rbd_mirroring_by_pool.append(
1119 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
1120 report['rbd'] = {
1121 'num_pools': rbd_num_pools,
1122 'num_images_by_pool': rbd_num_images_by_pool,
1123 'mirroring_by_pool': rbd_mirroring_by_pool}
1124
1125 # osds
1126 cluster_network = False
1127 for osd in osd_map['osds']:
1128 if osd['up'] and not cluster_network:
1129 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
1130 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
1131 if front_ip != back_ip:
1132 cluster_network = True
1133 report['osd'] = {
1134 'count': len(osd_map['osds']),
1135 'require_osd_release': osd_map['require_osd_release'],
1136 'require_min_compat_client': osd_map['require_min_compat_client'],
1137 'cluster_network': cluster_network,
1138 }
1139
1140 # crush
1141 report['crush'] = self.gather_crush_info()
1142
1143 # cephfs
1144 report['fs'] = {
1145 'count': len(fs_map['filesystems']),
1146 'feature_flags': fs_map['feature_flags'],
1147 'num_standby_mds': len(fs_map['standbys']),
1148 'filesystems': [],
1149 }
1150 num_mds = len(fs_map['standbys'])
1151 for fsm in fs_map['filesystems']:
1152 fs = fsm['mdsmap']
1153 num_sessions = 0
1154 cached_ino = 0
1155 cached_dn = 0
1156 cached_cap = 0
1157 subtrees = 0
1158 rfiles = 0
1159 rbytes = 0
1160 rsnaps = 0
1161 for gid, mds in fs['info'].items():
1162 num_sessions += self.get_latest('mds', mds['name'],
1163 'mds_sessions.session_count')
1164 cached_ino += self.get_latest('mds', mds['name'],
1165 'mds_mem.ino')
1166 cached_dn += self.get_latest('mds', mds['name'],
1167 'mds_mem.dn')
1168 cached_cap += self.get_latest('mds', mds['name'],
1169 'mds_mem.cap')
1170 subtrees += self.get_latest('mds', mds['name'],
1171 'mds.subtrees')
1172 if mds['rank'] == 0:
1173 rfiles = self.get_latest('mds', mds['name'],
1174 'mds.root_rfiles')
1175 rbytes = self.get_latest('mds', mds['name'],
1176 'mds.root_rbytes')
1177 rsnaps = self.get_latest('mds', mds['name'],
1178 'mds.root_rsnaps')
1179 report['fs']['filesystems'].append({ # type: ignore
1180 'max_mds': fs['max_mds'],
1181 'ever_allowed_features': fs['ever_allowed_features'],
1182 'explicitly_allowed_features': fs['explicitly_allowed_features'],
1183 'num_in': len(fs['in']),
1184 'num_up': len(fs['up']),
1185 'num_standby_replay': len(
1186 [mds for gid, mds in fs['info'].items()
1187 if mds['state'] == 'up:standby-replay']),
1188 'num_mds': len(fs['info']),
1189 'num_sessions': num_sessions,
1190 'cached_inos': cached_ino,
1191 'cached_dns': cached_dn,
1192 'cached_caps': cached_cap,
1193 'cached_subtrees': subtrees,
1194 'balancer_enabled': len(fs['balancer']) > 0,
1195 'num_data_pools': len(fs['data_pools']),
1196 'standby_count_wanted': fs['standby_count_wanted'],
1197 'approx_ctime': fs['created'][0:7],
1198 'files': rfiles,
1199 'bytes': rbytes,
1200 'snaps': rsnaps,
1201 })
1202 num_mds += len(fs['info'])
1203 report['fs']['total_num_mds'] = num_mds # type: ignore
1204
1205 # daemons
1206 report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
1207 mon=self.gather_mon_metadata(mon_map))
1208
1209 if self.is_enabled_collection(Collection.basic_mds_metadata):
1210 report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
1211
1212 # host counts
1213 servers = self.list_servers()
1214 self.log.debug('servers %s' % servers)
1215 hosts = {
1216 'num': len([h for h in servers if h['hostname']]),
1217 }
1218 for t in ['mon', 'mds', 'osd', 'mgr']:
1219 nr_services = sum(1 for host in servers if
1220 any(service for service in cast(List[ServiceInfoT],
1221 host['services'])
1222 if service['type'] == t))
1223 hosts['num_with_' + t] = nr_services
1224 report['hosts'] = hosts
1225
1226 report['usage'] = {
1227 'pools': len(df['pools']),
1228 'pg_num': num_pg,
1229 'total_used_bytes': df['stats']['total_used_bytes'],
1230 'total_bytes': df['stats']['total_bytes'],
1231 'total_avail_bytes': df['stats']['total_avail_bytes']
1232 }
1233 # basic_usage_by_class collection
1234 if self.is_enabled_collection(Collection.basic_usage_by_class):
1235 report['usage']['stats_by_class'] = {} # type: ignore
1236 for device_class in df['stats_by_class']:
1237 if device_class in ['hdd', 'ssd', 'nvme']:
1238 report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
1239
1240 services: DefaultDict[str, int] = defaultdict(int)
1241 for key, value in service_map['services'].items():
1242 services[key] += 1
1243 if key == 'rgw':
1244 rgw = {}
1245 zones = set()
1246 zonegroups = set()
1247 frontends = set()
1248 count = 0
1249 d = value.get('daemons', dict())
1250 for k, v in d.items():
1251 if k == 'summary' and v:
1252 rgw[k] = v
1253 elif isinstance(v, dict) and 'metadata' in v:
1254 count += 1
1255 zones.add(v['metadata']['zone_id'])
1256 zonegroups.add(v['metadata']['zonegroup_id'])
1257 frontends.add(v['metadata']['frontend_type#0'])
1258
1259 # we could actually iterate over all the keys of
1260 # the dict and check for how many frontends there
1261 # are, but it is unlikely that one would be running
1262 # more than 2 supported ones
1263 f2 = v['metadata'].get('frontend_type#1', None)
1264 if f2:
1265 frontends.add(f2)
1266
1267 rgw['count'] = count
1268 rgw['zones'] = len(zones)
1269 rgw['zonegroups'] = len(zonegroups)
1270 rgw['frontends'] = list(frontends) # sets aren't json-serializable
1271 report['rgw'] = rgw
1272 report['services'] = services
1273
1274 try:
1275 report['balancer'] = self.remote('balancer', 'gather_telemetry')
1276 except ImportError:
1277 report['balancer'] = {
1278 'active': False
1279 }
1280
1281 # Rook
1282 self.get_rook_data(report)
1283
1284 if 'crash' in channels:
1285 report['crashes'] = self.gather_crashinfo()
1286
1287 if 'perf' in channels:
1288 if self.is_enabled_collection(Collection.perf_perf):
1289 report['perf_counters'] = self.gather_perf_counters('separated')
1290 report['stats_per_pool'] = self.get_stats_per_pool()
1291 report['stats_per_pg'] = self.get_stats_per_pg()
1292 report['io_rate'] = self.get_io_rate()
1293 report['osd_perf_histograms'] = self.get_osd_histograms('separated')
1294 report['mempool'] = self.get_mempool('separated')
1295 report['heap_stats'] = self.get_heap_stats()
1296 report['rocksdb_stats'] = self.get_rocksdb_stats()
1297
1298 # NOTE: We do not include the 'device' channel in this report; it is
1299 # sent to a different endpoint.
1300
1301 return report
1302
1303 def get_rook_data(self, report: Dict[str, object]) -> None:
1304 r, outb, outs = self.mon_command({
1305 'prefix': 'config-key dump',
1306 'format': 'json'
1307 })
1308 if r != 0:
1309 return
1310 try:
1311 config_kv_dump = json.loads(outb)
1312 except json.decoder.JSONDecodeError:
1313 return
1314
1315 for elem in ROOK_KEYS_BY_COLLECTION:
1316 # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
1317 # elem[1] is the Collection this key belongs to
1318 if self.is_enabled_collection(elem[1]):
1319 self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
1320
1321 def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
1322 last_node = key_path.split('/')[-1]
1323 for node in key_path.split('/')[0:-1]:
1324 if node not in report:
1325 report[node] = {}
1326 report = report[node] # type: ignore
1327
1328 # sanity check of keys correctness
1329 if not isinstance(report, dict):
1330 self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
1331 return
1332
1333 if last_node in report:
1334 self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
1335 return
1336
1337 report[last_node] = value
1338
1339 def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
1340 self.log.info('Sending %s to: %s' % (what, url))
1341 proxies = dict()
1342 if self.proxy:
1343 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
1344 proxies['http'] = self.proxy
1345 proxies['https'] = self.proxy
1346 try:
1347 resp = requests.put(url=url, json=report, proxies=proxies)
1348 resp.raise_for_status()
1349 except Exception as e:
1350 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
1351 self.log.error(fail_reason)
1352 return fail_reason
1353 return None
1354
1355 class EndPoint(enum.Enum):
1356 ceph = 'ceph'
1357 device = 'device'
1358
1359 def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
1360 '''
1361 Find collections that are available in the module, but are not in the db
1362 '''
1363 if self.db_collection is None:
1364 return None
1365
1366 if not channels:
1367 channels = ALL_CHANNELS
1368 else:
1369 for ch in channels:
1370 if ch not in ALL_CHANNELS:
1371 self.log.debug(f"invalid channel name: {ch}")
1372 return None
1373
1374 new_collection : List[Collection] = []
1375
1376 for c in MODULE_COLLECTION:
1377 if c['name'].name not in self.db_collection:
1378 if c['channel'] in channels:
1379 new_collection.append(c['name'])
1380
1381 return new_collection
1382
1383 def is_major_upgrade(self) -> bool:
1384 '''
1385 Returns True only if the user last opted-in to an older major
1386 '''
1387 if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
1388 # we do not know what Ceph version was when the user last opted-in,
1389 # thus we do not wish to nag in case of a major upgrade
1390 return False
1391
1392 mon_map = self.get('mon_map')
1393 mon_min = mon_map.get("min_mon_release", 0)
1394
1395 if mon_min - self.last_opted_in_ceph_version > 0:
1396 self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1397 return True
1398
1399 return False
1400
1401 def is_opted_in(self) -> bool:
1402 # If len is 0 it means that the user is either opted-out (never
1403 # opted-in, or invoked `telemetry off`), or they upgraded from a
1404 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1405 # regardless, hence is considered as opted-out
1406 if self.db_collection is None:
1407 return False
1408 return len(self.db_collection) > 0
1409
1410 def should_nag(self) -> bool:
1411 # Find delta between opted-in collections and module collections;
1412 # nag only if module has a collection which is not in db, and nag == True.
1413
1414 # We currently do not nag if the user is opted-out (or never opted-in).
1415 # If we wish to do this in the future, we need to have a tri-mode state
1416 # (opted in, opted out, no action yet), and it needs to be guarded by a
1417 # config option (so that nagging can be turned off via config).
1418 # We also need to add a last_opted_out_ceph_version variable, for the
1419 # major upgrade check.
1420
1421 # check if there are collections the user is not opt-in to
1422 # that we should nag about
1423 if self.db_collection is not None:
1424 for c in MODULE_COLLECTION:
1425 if c['name'].name not in self.db_collection:
1426 if c['nag'] == True:
1427 self.log.debug(f"The collection: {c['name']} is not reported")
1428 return True
1429
1430 # user might be opted-in to the most recent collection, or there is no
1431 # new collection which requires nagging about; thus nag in case it's a
1432 # major upgrade and there are new collections
1433 # (which their own nag == False):
1434 new_collections = False
1435 col_delta = self.collection_delta()
1436 if col_delta is not None and len(col_delta) > 0:
1437 new_collections = True
1438
1439 return self.is_major_upgrade() and new_collections
1440
1441 def init_collection(self) -> None:
1442 # We fetch from db the collections the user had already opted-in to.
1443 # During the transition the results will be empty, but the user might
1444 # be opted-in to an older version (e.g. revision = 3)
1445
1446 collection = self.get_store('collection')
1447
1448 if collection is not None:
1449 self.db_collection = json.loads(collection)
1450
1451 if self.db_collection is None:
1452 # happens once on upgrade
1453 if not self.enabled:
1454 # user is not opted-in
1455 self.set_store('collection', json.dumps([]))
1456 self.log.debug("user is not opted-in")
1457 else:
1458 # user is opted-in, verify the revision:
1459 if self.last_opt_revision == REVISION:
1460 self.log.debug(f"telemetry revision is {REVISION}")
1461 base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
1462 self.set_store('collection', json.dumps(base_collection))
1463 else:
1464 # user is opted-in to an older version, meaning they need
1465 # to re-opt in regardless
1466 self.set_store('collection', json.dumps([]))
1467 self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1468
1469 # reload collection after setting
1470 collection = self.get_store('collection')
1471 if collection is not None:
1472 self.db_collection = json.loads(collection)
1473 else:
1474 raise RuntimeError('collection is None after initial setting')
1475 else:
1476 # user has already upgraded
1477 self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
1478
1479 def is_enabled_collection(self, collection: Collection) -> bool:
1480 if self.db_collection is None:
1481 return False
1482 return collection.name in self.db_collection
1483
1484 def opt_in_all_collections(self) -> None:
1485 """
1486 Opt-in to all collections; Update db with the currently available collections in the module
1487 """
1488 if self.db_collection is None:
1489 raise RuntimeError('db_collection is None after initial setting')
1490
1491 for c in MODULE_COLLECTION:
1492 if c['name'].name not in self.db_collection:
1493 self.db_collection.append(c['name'])
1494
1495 self.set_store('collection', json.dumps(self.db_collection))
1496
1497 def send(self,
1498 report: Dict[str, Dict[str, str]],
1499 endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
1500 if not endpoint:
1501 endpoint = [self.EndPoint.ceph, self.EndPoint.device]
1502 failed = []
1503 success = []
1504 self.log.debug('Send endpoints %s' % endpoint)
1505 for e in endpoint:
1506 if e == self.EndPoint.ceph:
1507 fail_reason = self._try_post('ceph report', self.url, report)
1508 if fail_reason:
1509 failed.append(fail_reason)
1510 else:
1511 now = int(time.time())
1512 self.last_upload = now
1513 self.set_store('last_upload', str(now))
1514 success.append('Ceph report sent to {0}'.format(self.url))
1515 self.log.info('Sent report to {0}'.format(self.url))
1516 elif e == self.EndPoint.device:
1517 if 'device' in self.get_active_channels():
1518 devices = self.gather_device_report()
1519 if devices:
1520 num_devs = 0
1521 num_hosts = 0
1522 for host, ls in devices.items():
1523 self.log.debug('host %s devices %s' % (host, ls))
1524 if not len(ls):
1525 continue
1526 fail_reason = self._try_post('devices', self.device_url,
1527 ls)
1528 if fail_reason:
1529 failed.append(fail_reason)
1530 else:
1531 num_devs += len(ls)
1532 num_hosts += 1
1533 if num_devs:
1534 success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1535 num_devs, num_hosts, len(devices)))
1536 else:
1537 fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
1538 failed.append(fail_reason)
1539 self.log.error(fail_reason)
1540 if failed:
1541 return 1, '', '\n'.join(success + failed)
1542 return 0, '', '\n'.join(success)
1543
1544 def format_perf_histogram(self, report: Dict[str, Any]) -> None:
1545 # Formatting the perf histograms so they are human-readable. This will change the
1546 # ranges and values, which are currently in list form, into strings so that
1547 # they are displayed horizontally instead of vertically.
1548 if 'report' in report:
1549 report = report['report']
1550 try:
1551 # Formatting ranges and values in osd_perf_histograms
1552 mode = 'osd_perf_histograms'
1553 for config in report[mode]:
1554 for histogram in config:
1555 # Adjust ranges by converting lists into strings
1556 for axis in config[histogram]['axes']:
1557 for i in range(0, len(axis['ranges'])):
1558 axis['ranges'][i] = str(axis['ranges'][i])
1559
1560 for osd in config[histogram]['osds']:
1561 for i in range(0, len(osd['values'])):
1562 osd['values'][i] = str(osd['values'][i])
1563 except KeyError:
1564 # If the perf channel is not enabled, there should be a KeyError since
1565 # 'osd_perf_histograms' would not be present in the report. In that case,
1566 # the show function should pass as usual without trying to format the
1567 # histograms.
1568 pass
1569
1570 def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1571 '''
1572 Enable or disable a list of channels
1573 '''
1574 if not self.enabled:
1575 # telemetry should be on for channels to be toggled
1576 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1577 'Preview sample reports with `ceph telemetry preview`.'
1578 return 0, msg, ''
1579
1580 if channels is None:
1581 msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1582 return 0, msg, ''
1583
1584 state = action == 'enable'
1585 msg = ''
1586 for c in channels:
1587 if c not in ALL_CHANNELS:
1588 msg = f"{msg}{c} is not a valid channel name. "\
1589 f"Available channels: {ALL_CHANNELS}.\n"
1590 else:
1591 self.set_module_option(f"channel_{c}", state)
1592 setattr(self,
1593 f"channel_{c}",
1594 state)
1595 msg = f"{msg}channel_{c} is {action}d\n"
1596
1597 return 0, msg, ''
1598
1599 @CLIReadCommand('telemetry status')
1600 def status(self) -> Tuple[int, str, str]:
1601 '''
1602 Show current configuration
1603 '''
1604 r = {}
1605 for opt in self.MODULE_OPTIONS:
1606 r[opt['name']] = getattr(self, opt['name'])
1607 r['last_upload'] = (time.ctime(self.last_upload)
1608 if self.last_upload else self.last_upload)
1609 return 0, json.dumps(r, indent=4, sort_keys=True), ''
1610
1611 @CLIReadCommand('telemetry diff')
1612 def diff(self) -> Tuple[int, str, str]:
1613 '''
1614 Show the diff between opted-in collection and available collection
1615 '''
1616 diff = []
1617 keys = ['nag']
1618
1619 for c in MODULE_COLLECTION:
1620 if not self.is_enabled_collection(c['name']):
1621 diff.append({key: val for key, val in c.items() if key not in keys})
1622
1623 r = None
1624 if diff == []:
1625 r = "Telemetry is up to date"
1626 else:
1627 r = json.dumps(diff, indent=4, sort_keys=True)
1628
1629 return 0, r, ''
1630
1631 @CLICommand('telemetry on')
1632 def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
1633 '''
1634 Enable telemetry reports from this cluster
1635 '''
1636 if license != LICENSE:
1637 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1638 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1639 else:
1640 self.set_module_option('enabled', True)
1641 self.enabled = True
1642 self.opt_in_all_collections()
1643
1644 # for major releases upgrade nagging
1645 mon_map = self.get('mon_map')
1646 mon_min = mon_map.get("min_mon_release", 0)
1647 self.set_store('last_opted_in_ceph_version', str(mon_min))
1648 self.last_opted_in_ceph_version = mon_min
1649
1650 msg = 'Telemetry is on.'
1651 disabled_channels = ''
1652 active_channels = self.get_active_channels()
1653 for c in ALL_CHANNELS:
1654 if c not in active_channels and c != 'ident':
1655 disabled_channels = f"{disabled_channels} {c}"
1656
1657 if len(disabled_channels) > 0:
1658 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
1659 f"`ceph telemetry enable channel{disabled_channels}`"
1660
1661 # wake up serve() to reset health warning
1662 self.event.set()
1663
1664 return 0, msg, ''
1665
1666 @CLICommand('telemetry off')
1667 def off(self) -> Tuple[int, str, str]:
1668 '''
1669 Disable telemetry reports from this cluster
1670 '''
1671 if not self.enabled:
1672 # telemetry is already off
1673 msg = 'Telemetry is currently not enabled, nothing to turn off. '\
1674 'Please consider opting-in with `ceph telemetry on`.\n' \
1675 'Preview sample reports with `ceph telemetry preview`.'
1676 return 0, msg, ''
1677
1678 self.set_module_option('enabled', False)
1679 self.enabled = False
1680 self.set_store('collection', json.dumps([]))
1681 self.db_collection = []
1682
1683 # we might need this info in the future, in case
1684 # of nagging when user is opted-out
1685 mon_map = self.get('mon_map')
1686 mon_min = mon_map.get("min_mon_release", 0)
1687 self.set_store('last_opted_out_ceph_version', str(mon_min))
1688 self.last_opted_out_ceph_version = mon_min
1689
1690 msg = 'Telemetry is now disabled.'
1691 return 0, msg, ''
1692
1693 @CLIReadCommand('telemetry enable channel all')
1694 def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1695 '''
1696 Enable all channels
1697 '''
1698 return self.toggle_channel('enable', channels)
1699
1700 @CLIReadCommand('telemetry enable channel')
1701 def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1702 '''
1703 Enable a list of channels
1704 '''
1705 return self.toggle_channel('enable', channels)
1706
1707 @CLIReadCommand('telemetry disable channel all')
1708 def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1709 '''
1710 Disable all channels
1711 '''
1712 return self.toggle_channel('disable', channels)
1713
1714 @CLIReadCommand('telemetry disable channel')
1715 def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1716 '''
1717 Disable a list of channels
1718 '''
1719 return self.toggle_channel('disable', channels)
1720
1721 @CLIReadCommand('telemetry channel ls')
1722 def channel_ls(self) -> Tuple[int, str, str]:
1723 '''
1724 List all channels
1725 '''
1726 table = PrettyTable(
1727 [
1728 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1729 ],
1730 border=False)
1731 table.align['NAME'] = 'l'
1732 table.align['ENABLED'] = 'l'
1733 table.align['DEFAULT'] = 'l'
1734 table.align['DESC'] = 'l'
1735 table.left_padding_width = 0
1736 table.right_padding_width = 4
1737
1738 for c in ALL_CHANNELS:
1739 enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
1740 for o in self.MODULE_OPTIONS:
1741 if o['name'] == f"channel_{c}":
1742 default = "ON" if o.get('default', None) else "OFF"
1743 desc = o.get('desc', None)
1744
1745 table.add_row((
1746 c,
1747 enabled,
1748 default,
1749 desc,
1750 ))
1751
1752 return 0, table.get_string(sortby="NAME"), ''
1753
1754 @CLIReadCommand('telemetry collection ls')
1755 def collection_ls(self) -> Tuple[int, str, str]:
1756 '''
1757 List all collections
1758 '''
1759 col_delta = self.collection_delta()
1760 msg = ''
1761 if col_delta is not None and len(col_delta) > 0:
1762 msg = f"New collections are available:\n" \
1763 f"{sorted([c.name for c in col_delta])}\n" \
1764 f"Run `ceph telemetry on` to opt-in to these collections.\n"
1765
1766 table = PrettyTable(
1767 [
1768 'NAME', 'STATUS', 'DESC',
1769 ],
1770 border=False)
1771 table.align['NAME'] = 'l'
1772 table.align['STATUS'] = 'l'
1773 table.align['DESC'] = 'l'
1774 table.left_padding_width = 0
1775 table.right_padding_width = 4
1776
1777 for c in MODULE_COLLECTION:
1778 name = c['name']
1779 opted_in = self.is_enabled_collection(name)
1780 channel_enabled = getattr(self, f"channel_{c['channel']}")
1781
1782 status = ''
1783 if channel_enabled and opted_in:
1784 status = "REPORTING"
1785 else:
1786 why = ''
1787 delimiter = ''
1788
1789 if not opted_in:
1790 why += "NOT OPTED-IN"
1791 delimiter = ', '
1792 if not channel_enabled:
1793 why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
1794
1795 status = f"NOT REPORTING: {why}"
1796
1797 desc = c['description']
1798
1799 table.add_row((
1800 name,
1801 status,
1802 desc,
1803 ))
1804
1805 if len(msg):
1806 # add a new line between message and table output
1807 msg = f"{msg} \n"
1808
1809 return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
1810
1811 @CLICommand('telemetry send')
1812 def do_send(self,
1813 endpoint: Optional[List[EndPoint]] = None,
1814 license: Optional[str] = None) -> Tuple[int, str, str]:
1815 '''
1816 Send a sample report
1817 '''
1818 if not self.is_opted_in() and license != LICENSE:
1819 self.log.debug(('A telemetry send attempt while opted-out. '
1820 'Asking for license agreement'))
1821 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1822 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1823 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1824 else:
1825 self.last_report = self.compile_report()
1826 return self.send(self.last_report, endpoint)
1827
1828 @CLIReadCommand('telemetry show')
1829 def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1830 '''
1831 Show a sample report of opted-in collections (except for 'device')
1832 '''
1833 if not self.enabled:
1834 # if telemetry is off, no report is being sent, hence nothing to show
1835 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1836 'Preview sample reports with `ceph telemetry preview`.'
1837 return 0, msg, ''
1838
1839 report = self.get_report_locked(channels=channels)
1840 self.format_perf_histogram(report)
1841 report = json.dumps(report, indent=4, sort_keys=True)
1842
1843 if self.channel_device:
1844 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1845
1846 return 0, report, ''
1847
1848 @CLIReadCommand('telemetry preview')
1849 def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1850 '''
1851 Preview a sample report of the most recent collections available (except for 'device')
1852 '''
1853 report = {}
1854
1855 # We use a lock to prevent a scenario where the user wishes to preview
1856 # the report, and at the same time the module hits the interval of
1857 # sending a report with the opted-in collection, which has less data
1858 # than in the preview report.
1859 col_delta = self.collection_delta()
1860 with self.get_report_lock:
1861 if col_delta is not None and len(col_delta) == 0:
1862 # user is already opted-in to the most recent collection
1863 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1864 return 0, msg, ''
1865 else:
1866 # there are collections the user is not opted-in to
1867 next_collection = []
1868
1869 for c in MODULE_COLLECTION:
1870 next_collection.append(c['name'].name)
1871
1872 opted_in_collection = self.db_collection
1873 self.db_collection = next_collection
1874 report = self.get_report(channels=channels)
1875 self.db_collection = opted_in_collection
1876
1877 self.format_perf_histogram(report)
1878 report = json.dumps(report, indent=4, sort_keys=True)
1879
1880 if self.channel_device:
1881 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1882
1883 return 0, report, ''
1884
1885 @CLIReadCommand('telemetry show-device')
1886 def show_device(self) -> Tuple[int, str, str]:
1887 '''
1888 Show a sample device report
1889 '''
1890 if not self.enabled:
1891 # if telemetry is off, no report is being sent, hence nothing to show
1892 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1893 'Preview sample device reports with `ceph telemetry preview-device`.'
1894 return 0, msg, ''
1895
1896 if not self.channel_device:
1897 # if device channel is off, device report is not being sent, hence nothing to show
1898 msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1899 'Preview sample device reports with `ceph telemetry preview-device`.'
1900 return 0, msg, ''
1901
1902 return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
1903
1904 @CLIReadCommand('telemetry preview-device')
1905 def preview_device(self) -> Tuple[int, str, str]:
1906 '''
1907 Preview a sample device report of the most recent device collection
1908 '''
1909 report = {}
1910
1911 device_col_delta = self.collection_delta(['device'])
1912 with self.get_report_lock:
1913 if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
1914 # user is already opted-in to the most recent device collection,
1915 # and device channel is on, thus `show-device` should be called
1916 msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1917 return 0, msg, ''
1918
1919 # either the user is not opted-in at all, or there are collections
1920 # they are not opted-in to
1921 next_collection = []
1922
1923 for c in MODULE_COLLECTION:
1924 next_collection.append(c['name'].name)
1925
1926 opted_in_collection = self.db_collection
1927 self.db_collection = next_collection
1928 report = self.get_report('device')
1929 self.db_collection = opted_in_collection
1930
1931 report = json.dumps(report, indent=4, sort_keys=True)
1932 return 0, report, ''
1933
1934 @CLIReadCommand('telemetry show-all')
1935 def show_all(self) -> Tuple[int, str, str]:
1936 '''
1937 Show a sample report of all enabled channels (including 'device' channel)
1938 '''
1939 if not self.enabled:
1940 # if telemetry is off, no report is being sent, hence nothing to show
1941 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1942 'Preview sample reports with `ceph telemetry preview`.'
1943 return 0, msg, ''
1944
1945 if not self.channel_device:
1946 # device channel is off, no need to display its report
1947 report = self.get_report_locked('default')
1948 else:
1949 # telemetry is on and device channel is enabled, show both
1950 report = self.get_report_locked('all')
1951
1952 self.format_perf_histogram(report)
1953 return 0, json.dumps(report, indent=4, sort_keys=True), ''
1954
1955 @CLIReadCommand('telemetry preview-all')
1956 def preview_all(self) -> Tuple[int, str, str]:
1957 '''
1958 Preview a sample report of the most recent collections available of all channels (including 'device')
1959 '''
1960 report = {}
1961
1962 col_delta = self.collection_delta()
1963 with self.get_report_lock:
1964 if col_delta is not None and len(col_delta) == 0:
1965 # user is already opted-in to the most recent collection
1966 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1967 return 0, msg, ''
1968
1969 # there are collections the user is not opted-in to
1970 next_collection = []
1971
1972 for c in MODULE_COLLECTION:
1973 next_collection.append(c['name'].name)
1974
1975 opted_in_collection = self.db_collection
1976 self.db_collection = next_collection
1977 report = self.get_report('all')
1978 self.db_collection = opted_in_collection
1979
1980 self.format_perf_histogram(report)
1981 report = json.dumps(report, indent=4, sort_keys=True)
1982
1983 return 0, report, ''
1984
1985 def get_report_locked(self,
1986 report_type: str = 'default',
1987 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1988 '''
1989 A wrapper around get_report to allow for compiling a report of the most recent module collections
1990 '''
1991 with self.get_report_lock:
1992 return self.get_report(report_type, channels)
1993
1994 def get_report(self,
1995 report_type: str = 'default',
1996 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1997 if report_type == 'default':
1998 return self.compile_report(channels=channels)
1999 elif report_type == 'device':
2000 return self.gather_device_report()
2001 elif report_type == 'all':
2002 return {'report': self.compile_report(channels=channels),
2003 'device_report': self.gather_device_report()}
2004 return {}
2005
2006 def self_test(self) -> None:
2007 self.opt_in_all_collections()
2008 report = self.compile_report(channels=ALL_CHANNELS)
2009 if len(report) == 0:
2010 raise RuntimeError('Report is empty')
2011
2012 if 'report_id' not in report:
2013 raise RuntimeError('report_id not found in report')
2014
2015 def shutdown(self) -> None:
2016 self.run = False
2017 self.event.set()
2018
2019 def refresh_health_checks(self) -> None:
2020 health_checks = {}
2021 # TODO do we want to nag also in case the user is not opted-in?
2022 if self.enabled and self.should_nag():
2023 health_checks['TELEMETRY_CHANGED'] = {
2024 'severity': 'warning',
2025 'summary': 'Telemetry requires re-opt-in',
2026 'detail': [
2027 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
2028 ]
2029 }
2030 self.set_health_checks(health_checks)
2031
2032 def serve(self) -> None:
2033 self.load()
2034 self.run = True
2035
2036 self.log.debug('Waiting for mgr to warm up')
2037 time.sleep(10)
2038
2039 while self.run:
2040 self.event.clear()
2041
2042 self.refresh_health_checks()
2043
2044 if not self.is_opted_in():
2045 self.log.debug('Not sending report until user re-opts-in')
2046 self.event.wait(1800)
2047 continue
2048 if not self.enabled:
2049 self.log.debug('Not sending report until configured to do so')
2050 self.event.wait(1800)
2051 continue
2052
2053 now = int(time.time())
2054 if not self.last_upload or \
2055 (now - self.last_upload) > self.interval * 3600:
2056 self.log.info('Compiling and sending report to %s',
2057 self.url)
2058
2059 try:
2060 self.last_report = self.compile_report()
2061 except Exception:
2062 self.log.exception('Exception while compiling report:')
2063
2064 self.send(self.last_report)
2065 else:
2066 self.log.debug('Interval for sending new report has not expired')
2067
2068 sleep = 3600
2069 self.log.debug('Sleeping for %d seconds', sleep)
2070 self.event.wait(sleep)
2071
2072 @staticmethod
2073 def can_run() -> Tuple[bool, str]:
2074 return True, ''