]> git.proxmox.com Git - ceph.git/blame_incremental - ceph/src/pybind/mgr/telemetry/module.py
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
... / ...
CommitLineData
1"""
2Telemetry module for ceph-mgr
3
4Collect statistics from Ceph cluster and send this back to the Ceph project
5when user has opted-in
6"""
7import logging
8import numbers
9import enum
10import errno
11import hashlib
12import json
13import rbd
14import requests
15import uuid
16import time
17from datetime import datetime, timedelta
18from prettytable import PrettyTable
19from threading import Event, Lock
20from collections import defaultdict
21from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
22
23from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
24
25
26ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
27
28LICENSE = 'sharing-1-0'
29LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
30LICENSE_URL = 'https://cdla.io/sharing-1-0/'
31NO_SALT_CNT = 0
32
33# Latest revision of the telemetry report. Bump this each time we make
34# *any* change.
35REVISION = 3
36
37# History of revisions
38# --------------------
39#
40# Version 1:
41# Mimic and/or nautilus are lumped together here, since
42# we didn't track revisions yet.
43#
44# Version 2:
45# - added revision tracking, nagging, etc.
46# - added config option changes
47# - added channels
48# - added explicit license acknowledgement to the opt-in process
49#
50# Version 3:
51# - added device health metrics (i.e., SMART data, minus serial number)
52# - remove crush_rule
53# - added CephFS metadata (how many MDSs, fs features, how many data pools,
54# how much metadata is cached, rfiles, rbytes, rsnapshots)
55# - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56# - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57# - whether an OSD cluster network is in use
58# - rbd pool and image count, and rbd mirror mode (pool-level)
59# - rgw daemons, zones, zonegroups; which rgw frontends
60# - crush map stats
61
62class Collection(str, enum.Enum):
63 basic_base = 'basic_base'
64 device_base = 'device_base'
65 crash_base = 'crash_base'
66 ident_base = 'ident_base'
67 perf_perf = 'perf_perf'
68 basic_mds_metadata = 'basic_mds_metadata'
69 basic_pool_usage = 'basic_pool_usage'
70 basic_usage_by_class = 'basic_usage_by_class'
71 basic_rook_v01 = 'basic_rook_v01'
72 perf_memory_metrics = 'perf_memory_metrics'
73 basic_pool_options_bluestore = 'basic_pool_options_bluestore'
74 basic_pool_flags = 'basic_pool_flags'
75
76MODULE_COLLECTION : List[Dict] = [
77 {
78 "name": Collection.basic_base,
79 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
80 "channel": "basic",
81 "nag": False
82 },
83 {
84 "name": Collection.device_base,
85 "description": "Information about device health metrics",
86 "channel": "device",
87 "nag": False
88 },
89 {
90 "name": Collection.crash_base,
91 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
92 "channel": "crash",
93 "nag": False
94 },
95 {
96 "name": Collection.ident_base,
97 "description": "User-provided identifying information about the cluster",
98 "channel": "ident",
99 "nag": False
100 },
101 {
102 "name": Collection.perf_perf,
103 "description": "Information about performance counters of the cluster",
104 "channel": "perf",
105 "nag": True
106 },
107 {
108 "name": Collection.basic_mds_metadata,
109 "description": "MDS metadata",
110 "channel": "basic",
111 "nag": False
112 },
113 {
114 "name": Collection.basic_pool_usage,
115 "description": "Default pool application and usage statistics",
116 "channel": "basic",
117 "nag": False
118 },
119 {
120 "name": Collection.basic_usage_by_class,
121 "description": "Default device class usage statistics",
122 "channel": "basic",
123 "nag": False
124 },
125 {
126 "name": Collection.basic_rook_v01,
127 "description": "Basic Rook deployment data",
128 "channel": "basic",
129 "nag": True
130 },
131 {
132 "name": Collection.perf_memory_metrics,
133 "description": "Heap stats and mempools for mon and mds",
134 "channel": "perf",
135 "nag": False
136 },
137 {
138 "name": Collection.basic_pool_options_bluestore,
139 "description": "Per-pool bluestore config options",
140 "channel": "basic",
141 "nag": False
142 },
143 {
144 "name": Collection.basic_pool_flags,
145 "description": "Per-pool flags",
146 "channel": "basic",
147 "nag": False
148 },
149]
150
151ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
152 # Note: a key cannot be both a node and a leaf, e.g.
153 # "rook/a/b"
154 # "rook/a/b/c"
155 ("rook/version", Collection.basic_rook_v01),
156 ("rook/kubernetes/version", Collection.basic_rook_v01),
157 ("rook/csi/version", Collection.basic_rook_v01),
158 ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
159 ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
160 ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
161 ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
162 ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
163 ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
164 ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
165 ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
166 ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
167 ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
168 ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
169 ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
170 ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
171 ("rook/cluster/mon/count", Collection.basic_rook_v01),
172 ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
173 ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
174 ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
175 ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
176 ("rook/cluster/network/provider", Collection.basic_rook_v01),
177 ("rook/cluster/external-mode", Collection.basic_rook_v01),
178]
179
180class Module(MgrModule):
181 metadata_keys = [
182 "arch",
183 "ceph_version",
184 "os",
185 "cpu",
186 "kernel_description",
187 "kernel_version",
188 "distro_description",
189 "distro"
190 ]
191
192 MODULE_OPTIONS = [
193 Option(name='url',
194 type='str',
195 default='https://telemetry.ceph.com/report'),
196 Option(name='device_url',
197 type='str',
198 default='https://telemetry.ceph.com/device'),
199 Option(name='enabled',
200 type='bool',
201 default=False),
202 Option(name='last_opt_revision',
203 type='int',
204 default=1),
205 Option(name='leaderboard',
206 type='bool',
207 default=False),
208 Option(name='leaderboard_description',
209 type='str',
210 default=None),
211 Option(name='description',
212 type='str',
213 default=None),
214 Option(name='contact',
215 type='str',
216 default=None),
217 Option(name='organization',
218 type='str',
219 default=None),
220 Option(name='proxy',
221 type='str',
222 default=None),
223 Option(name='interval',
224 type='int',
225 default=24,
226 min=8),
227 Option(name='channel_basic',
228 type='bool',
229 default=True,
230 desc='Share basic cluster information (size, version)'),
231 Option(name='channel_ident',
232 type='bool',
233 default=False,
234 desc='Share a user-provided description and/or contact email for the cluster'),
235 Option(name='channel_crash',
236 type='bool',
237 default=True,
238 desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
239 Option(name='channel_device',
240 type='bool',
241 default=True,
242 desc=('Share device health metrics '
243 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
244 Option(name='channel_perf',
245 type='bool',
246 default=False,
247 desc='Share various performance metrics of a cluster'),
248 ]
249
250 @property
251 def config_keys(self) -> Dict[str, OptionValue]:
252 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
253
254 def __init__(self, *args: Any, **kwargs: Any) -> None:
255 super(Module, self).__init__(*args, **kwargs)
256 self.event = Event()
257 self.run = False
258 self.db_collection: Optional[List[str]] = None
259 self.last_opted_in_ceph_version: Optional[int] = None
260 self.last_opted_out_ceph_version: Optional[int] = None
261 self.last_upload: Optional[int] = None
262 self.last_report: Dict[str, Any] = dict()
263 self.report_id: Optional[str] = None
264 self.salt: Optional[str] = None
265 self.get_report_lock = Lock()
266 self.config_update_module_option()
267 # for mypy which does not run the code
268 if TYPE_CHECKING:
269 self.url = ''
270 self.device_url = ''
271 self.enabled = False
272 self.last_opt_revision = 0
273 self.leaderboard = ''
274 self.leaderboard_description = ''
275 self.interval = 0
276 self.proxy = ''
277 self.channel_basic = True
278 self.channel_ident = False
279 self.channel_crash = True
280 self.channel_device = True
281 self.channel_perf = False
282 self.db_collection = ['basic_base', 'device_base']
283 self.last_opted_in_ceph_version = 17
284 self.last_opted_out_ceph_version = 0
285
286 def config_update_module_option(self) -> None:
287 for opt in self.MODULE_OPTIONS:
288 setattr(self,
289 opt['name'],
290 self.get_module_option(opt['name']))
291 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
292
293 def config_notify(self) -> None:
294 self.config_update_module_option()
295 # wake up serve() thread
296 self.event.set()
297
298 def load(self) -> None:
299 last_upload = self.get_store('last_upload', None)
300 if last_upload is None:
301 self.last_upload = None
302 else:
303 self.last_upload = int(last_upload)
304
305 report_id = self.get_store('report_id', None)
306 if report_id is None:
307 self.report_id = str(uuid.uuid4())
308 self.set_store('report_id', self.report_id)
309 else:
310 self.report_id = report_id
311
312 salt = self.get_store('salt', None)
313 if salt is None:
314 self.salt = str(uuid.uuid4())
315 self.set_store('salt', self.salt)
316 else:
317 self.salt = salt
318
319 self.init_collection()
320
321 last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
322 if last_opted_in_ceph_version is None:
323 self.last_opted_in_ceph_version = None
324 else:
325 self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
326
327 last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
328 if last_opted_out_ceph_version is None:
329 self.last_opted_out_ceph_version = None
330 else:
331 self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
332
333 def gather_osd_metadata(self,
334 osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
335 keys = ["osd_objectstore", "rotational"]
336 keys += self.metadata_keys
337
338 metadata: Dict[str, Dict[str, int]] = dict()
339 for key in keys:
340 metadata[key] = defaultdict(int)
341
342 for osd in osd_map['osds']:
343 res = self.get_metadata('osd', str(osd['osd']))
344 if res is None:
345 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
346 continue
347 for k, v in res.items():
348 if k not in keys:
349 continue
350
351 metadata[k][v] += 1
352
353 return metadata
354
355 def gather_mon_metadata(self,
356 mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
357 keys = list()
358 keys += self.metadata_keys
359
360 metadata: Dict[str, Dict[str, int]] = dict()
361 for key in keys:
362 metadata[key] = defaultdict(int)
363
364 for mon in mon_map['mons']:
365 res = self.get_metadata('mon', mon['name'])
366 if res is None:
367 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
368 continue
369 for k, v in res.items():
370 if k not in keys:
371 continue
372
373 metadata[k][v] += 1
374
375 return metadata
376
377 def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
378 metadata: Dict[str, Dict[str, int]] = dict()
379
380 res = self.get('mds_metadata') # metadata of *all* mds daemons
381 if res is None or not res:
382 self.log.debug('Could not get metadata for mds daemons')
383 return metadata
384
385 keys = list()
386 keys += self.metadata_keys
387
388 for key in keys:
389 metadata[key] = defaultdict(int)
390
391 for mds in res.values():
392 for k, v in mds.items():
393 if k not in keys:
394 continue
395
396 metadata[k][v] += 1
397
398 return metadata
399
400 def gather_crush_info(self) -> Dict[str, Union[int,
401 bool,
402 List[int],
403 Dict[str, int],
404 Dict[int, int]]]:
405 osdmap = self.get_osdmap()
406 crush_raw = osdmap.get_crush()
407 crush = crush_raw.dump()
408
409 BucketKeyT = TypeVar('BucketKeyT', int, str)
410
411 def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
412 if k in d:
413 d[k] += 1
414 else:
415 d[k] = 1
416
417 device_classes: Dict[str, int] = {}
418 for dev in crush['devices']:
419 inc(device_classes, dev.get('class', ''))
420
421 bucket_algs: Dict[str, int] = {}
422 bucket_types: Dict[str, int] = {}
423 bucket_sizes: Dict[int, int] = {}
424 for bucket in crush['buckets']:
425 if '~' in bucket['name']: # ignore shadow buckets
426 continue
427 inc(bucket_algs, bucket['alg'])
428 inc(bucket_types, bucket['type_id'])
429 inc(bucket_sizes, len(bucket['items']))
430
431 return {
432 'num_devices': len(crush['devices']),
433 'num_types': len(crush['types']),
434 'num_buckets': len(crush['buckets']),
435 'num_rules': len(crush['rules']),
436 'device_classes': list(device_classes.values()),
437 'tunables': crush['tunables'],
438 'compat_weight_set': '-1' in crush['choose_args'],
439 'num_weight_sets': len(crush['choose_args']),
440 'bucket_algs': bucket_algs,
441 'bucket_sizes': bucket_sizes,
442 'bucket_types': bucket_types,
443 }
444
445 def gather_configs(self) -> Dict[str, List[str]]:
446 # cluster config options
447 cluster = set()
448 r, outb, outs = self.mon_command({
449 'prefix': 'config dump',
450 'format': 'json'
451 })
452 if r != 0:
453 return {}
454 try:
455 dump = json.loads(outb)
456 except json.decoder.JSONDecodeError:
457 return {}
458 for opt in dump:
459 name = opt.get('name')
460 if name:
461 cluster.add(name)
462 # daemon-reported options (which may include ceph.conf)
463 active = set()
464 ls = self.get("modified_config_options")
465 for opt in ls.get('options', {}):
466 active.add(opt)
467 return {
468 'cluster_changed': sorted(list(cluster)),
469 'active_changed': sorted(list(active)),
470 }
471
472 def anonymize_entity_name(self, entity_name:str) -> str:
473 if '.' not in entity_name:
474 self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
475 return entity_name
476
477 (etype, eid) = entity_name.split('.', 1)
478 m = hashlib.sha1()
479 salt = ''
480 if self.salt is not None:
481 salt = self.salt
482 # avoid asserting that salt exists
483 if not self.salt:
484 # do not set self.salt to a temp value
485 salt = f"no_salt_found_{NO_SALT_CNT}"
486 NO_SALT_CNT += 1
487 self.log.debug(f"No salt found, created a temp one: {salt}")
488 m.update(salt.encode('utf-8'))
489 m.update(eid.encode('utf-8'))
490 m.update(salt.encode('utf-8'))
491
492 return etype + '.' + m.hexdigest()
493
494 def get_heap_stats(self) -> Dict[str, dict]:
495 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
496 anonymized_daemons = {}
497 osd_map = self.get('osd_map')
498
499 # Combine available daemons
500 daemons = []
501 for osd in osd_map['osds']:
502 daemons.append('osd'+'.'+str(osd['osd']))
503 # perf_memory_metrics collection (1/2)
504 if self.is_enabled_collection(Collection.perf_memory_metrics):
505 mon_map = self.get('mon_map')
506 mds_metadata = self.get('mds_metadata')
507 for mon in mon_map['mons']:
508 daemons.append('mon'+'.'+mon['name'])
509 for mds in mds_metadata:
510 daemons.append('mds'+'.'+mds)
511
512 # Grab output from the "daemon.x heap stats" command
513 for daemon in daemons:
514 daemon_type, daemon_id = daemon.split('.', 1)
515 heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
516 if heap_stats:
517 if (daemon_type != 'osd'):
518 # Anonymize mon and mds
519 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
520 daemon = anonymized_daemons[daemon]
521 result[daemon_type][daemon] = heap_stats
522 else:
523 continue
524
525 if anonymized_daemons:
526 # for debugging purposes only, this data is never reported
527 self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
528 return result
529
530 def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
531 parsed_output = {}
532
533 cmd_dict = {
534 'prefix': 'heap',
535 'heapcmd': 'stats'
536 }
537 r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
538
539 if r != 0:
540 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
541 else:
542 if 'tcmalloc heap stats' in outb:
543 values = [int(i) for i in outb.split() if i.isdigit()]
544 # `categories` must be ordered this way for the correct output to be parsed
545 categories = ['use_by_application',
546 'page_heap_freelist',
547 'central_cache_freelist',
548 'transfer_cache_freelist',
549 'thread_cache_freelists',
550 'malloc_metadata',
551 'actual_memory_used',
552 'released_to_os',
553 'virtual_address_space_used',
554 'spans_in_use',
555 'thread_heaps_in_use',
556 'tcmalloc_page_size']
557 if len(values) != len(categories):
558 self.log.error('Received unexpected output from {}.{}; ' \
559 'number of values should match the number' \
560 'of expected categories:\n values: len={} {} '\
561 '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
562 else:
563 parsed_output = dict(zip(categories, values))
564 else:
565 self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
566
567 return parsed_output
568
569 def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
570 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
571 anonymized_daemons = {}
572 osd_map = self.get('osd_map')
573
574 # Combine available daemons
575 daemons = []
576 for osd in osd_map['osds']:
577 daemons.append('osd'+'.'+str(osd['osd']))
578 # perf_memory_metrics collection (2/2)
579 if self.is_enabled_collection(Collection.perf_memory_metrics):
580 mon_map = self.get('mon_map')
581 mds_metadata = self.get('mds_metadata')
582 for mon in mon_map['mons']:
583 daemons.append('mon'+'.'+mon['name'])
584 for mds in mds_metadata:
585 daemons.append('mds'+'.'+mds)
586
587 # Grab output from the "dump_mempools" command
588 for daemon in daemons:
589 daemon_type, daemon_id = daemon.split('.', 1)
590 cmd_dict = {
591 'prefix': 'dump_mempools',
592 'format': 'json'
593 }
594 r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
595 if r != 0:
596 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
597 continue
598 else:
599 try:
600 # This is where the mempool will land.
601 dump = json.loads(outb)
602 if mode == 'separated':
603 # Anonymize mon and mds
604 if daemon_type != 'osd':
605 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
606 daemon = anonymized_daemons[daemon]
607 result[daemon_type][daemon] = dump['mempool']['by_pool']
608 elif mode == 'aggregated':
609 for mem_type in dump['mempool']['by_pool']:
610 result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
611 result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
612 else:
613 self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
614 except (json.decoder.JSONDecodeError, KeyError) as e:
615 self.log.exception("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
616 continue
617
618 if anonymized_daemons:
619 # for debugging purposes only, this data is never reported
620 self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
621
622 return result
623
624 def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
625 # Initialize result dict
626 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
627 lambda: defaultdict(
628 lambda: defaultdict(
629 lambda: defaultdict(
630 lambda: defaultdict(int))))))
631
632 # Get list of osd ids from the metadata
633 osd_metadata = self.get('osd_metadata')
634
635 # Grab output from the "osd.x perf histogram dump" command
636 for osd_id in osd_metadata:
637 cmd_dict = {
638 'prefix': 'perf histogram dump',
639 'id': str(osd_id),
640 'format': 'json'
641 }
642 r, outb, outs = self.osd_command(cmd_dict)
643 # Check for invalid calls
644 if r != 0:
645 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
646 continue
647 else:
648 try:
649 # This is where the histograms will land if there are any.
650 dump = json.loads(outb)
651
652 for histogram in dump['osd']:
653 # Log axis information. There are two axes, each represented
654 # as a dictionary. Both dictionaries are contained inside a
655 # list called 'axes'.
656 axes = []
657 for axis in dump['osd'][histogram]['axes']:
658
659 # This is the dict that contains information for an individual
660 # axis. It will be appended to the 'axes' list at the end.
661 axis_dict: Dict[str, Any] = defaultdict()
662
663 # Collecting information for buckets, min, name, etc.
664 axis_dict['buckets'] = axis['buckets']
665 axis_dict['min'] = axis['min']
666 axis_dict['name'] = axis['name']
667 axis_dict['quant_size'] = axis['quant_size']
668 axis_dict['scale_type'] = axis['scale_type']
669
670 # Collecting ranges; placing them in lists to
671 # improve readability later on.
672 ranges = []
673 for _range in axis['ranges']:
674 _max, _min = None, None
675 if 'max' in _range:
676 _max = _range['max']
677 if 'min' in _range:
678 _min = _range['min']
679 ranges.append([_min, _max])
680 axis_dict['ranges'] = ranges
681
682 # Now that 'axis_dict' contains all the appropriate
683 # information for the current axis, append it to the 'axes' list.
684 # There will end up being two axes in the 'axes' list, since the
685 # histograms are 2D.
686 axes.append(axis_dict)
687
688 # Add the 'axes' list, containing both axes, to result.
689 # At this point, you will see that the name of the key is the string
690 # form of our axes list (str(axes)). This is there so that histograms
691 # with different axis configs will not be combined.
692 # These key names are later dropped when only the values are returned.
693 result[str(axes)][histogram]['axes'] = axes
694
695 # Collect current values and make sure they are in
696 # integer form.
697 values = []
698 for value_list in dump['osd'][histogram]['values']:
699 values.append([int(v) for v in value_list])
700
701 if mode == 'separated':
702 if 'osds' not in result[str(axes)][histogram]:
703 result[str(axes)][histogram]['osds'] = []
704 result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
705
706 elif mode == 'aggregated':
707 # Aggregate values. If 'values' have already been initialized,
708 # we can safely add.
709 if 'values' in result[str(axes)][histogram]:
710 for i in range (0, len(values)):
711 for j in range (0, len(values[i])):
712 values[i][j] += result[str(axes)][histogram]['values'][i][j]
713
714 # Add the values to result.
715 result[str(axes)][histogram]['values'] = values
716
717 # Update num_combined_osds
718 if 'num_combined_osds' not in result[str(axes)][histogram]:
719 result[str(axes)][histogram]['num_combined_osds'] = 1
720 else:
721 result[str(axes)][histogram]['num_combined_osds'] += 1
722 else:
723 self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
724 return list()
725
726 # Sometimes, json errors occur if you give it an empty string.
727 # I am also putting in a catch for a KeyError since it could
728 # happen where the code is assuming that a key exists in the
729 # schema when it doesn't. In either case, we'll handle that
730 # by continuing and collecting what we can from other osds.
731 except (json.decoder.JSONDecodeError, KeyError) as e:
732 self.log.exception("Error caught on osd.{}: {}".format(osd_id, e))
733 continue
734
735 return list(result.values())
736
737 def get_io_rate(self) -> dict:
738 return self.get('io_rate')
739
740 def get_stats_per_pool(self) -> dict:
741 result = self.get('pg_dump')['pool_stats']
742
743 # collect application metadata from osd_map
744 osd_map = self.get('osd_map')
745 application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
746
747 # add application to each pool from pg_dump
748 for pool in result:
749 pool['application'] = []
750 # Only include default applications
751 for application in application_metadata[pool['poolid']]:
752 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
753 pool['application'].append(application)
754
755 return result
756
757 def get_stats_per_pg(self) -> dict:
758 return self.get('pg_dump')['pg_stats']
759
760 def get_rocksdb_stats(self) -> Dict[str, str]:
761 # Initalizers
762 result: Dict[str, str] = defaultdict()
763 version = self.get_rocksdb_version()
764
765 # Update result
766 result['version'] = version
767
768 return result
769
770 def gather_crashinfo(self) -> List[Dict[str, str]]:
771 crashlist: List[Dict[str, str]] = list()
772 errno, crashids, err = self.remote('crash', 'ls')
773 if errno:
774 return crashlist
775 for crashid in crashids.split():
776 errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
777 if errno:
778 continue
779 c = json.loads(crashinfo)
780
781 # redact hostname
782 del c['utsname_hostname']
783
784 # entity_name might have more than one '.', beware
785 (etype, eid) = c.get('entity_name', '').split('.', 1)
786 m = hashlib.sha1()
787 assert self.salt
788 m.update(self.salt.encode('utf-8'))
789 m.update(eid.encode('utf-8'))
790 m.update(self.salt.encode('utf-8'))
791 c['entity_name'] = etype + '.' + m.hexdigest()
792
793 # redact final line of python tracebacks, as the exception
794 # payload may contain identifying information
795 if 'mgr_module' in c and 'backtrace' in c:
796 # backtrace might be empty
797 if len(c['backtrace']) > 0:
798 c['backtrace'][-1] = '<redacted>'
799
800 crashlist.append(c)
801 return crashlist
802
803 def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
804 # Extract perf counter data with get_unlabeled_perf_counters(), a method
805 # from mgr/mgr_module.py. This method returns a nested dictionary that
806 # looks a lot like perf schema, except with some additional fields.
807 #
808 # Example of output, a snapshot of a mon daemon:
809 # "mon.b": {
810 # "bluestore.kv_flush_lat": {
811 # "count": 2431,
812 # "description": "Average kv_thread flush latency",
813 # "nick": "fl_l",
814 # "priority": 8,
815 # "type": 5,
816 # "units": 1,
817 # "value": 88814109
818 # },
819 # },
820 perf_counters = self.get_unlabeled_perf_counters()
821
822 # Initialize 'result' dict
823 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
824 lambda: defaultdict(lambda: defaultdict(int))))
825
826 # 'separated' mode
827 anonymized_daemon_dict = {}
828
829 for daemon, perf_counters_by_daemon in perf_counters.items():
830 daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
831
832 if mode == 'separated':
833 # anonymize individual daemon names except osds
834 if (daemon_type != 'osd'):
835 anonymized_daemon = self.anonymize_entity_name(daemon)
836 anonymized_daemon_dict[anonymized_daemon] = daemon
837 daemon = anonymized_daemon
838
839 # Calculate num combined daemon types if in aggregated mode
840 if mode == 'aggregated':
841 if 'num_combined_daemons' not in result[daemon_type]:
842 result[daemon_type]['num_combined_daemons'] = 1
843 else:
844 result[daemon_type]['num_combined_daemons'] += 1
845
846 for collection in perf_counters_by_daemon:
847 # Split the collection to avoid redundancy in final report; i.e.:
848 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
849 # bluestore: kv_flush_lat, kv_final_lat
850 col_0, col_1 = collection.split('.')
851
852 # Debug log for empty keys. This initially was a problem for prioritycache
853 # perf counters, where the col_0 was empty for certain mon counters:
854 #
855 # "mon.a": { instead of "mon.a": {
856 # "": { "prioritycache": {
857 # "cache_bytes": {...}, "cache_bytes": {...},
858 #
859 # This log is here to detect any future instances of a similar issue.
860 if (daemon == "") or (col_0 == "") or (col_1 == ""):
861 self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
862
863 if mode == 'separated':
864 # Add value to result
865 result[daemon][col_0][col_1]['value'] = \
866 perf_counters_by_daemon[collection]['value']
867
868 # Check that 'count' exists, as not all counters have a count field.
869 if 'count' in perf_counters_by_daemon[collection]:
870 result[daemon][col_0][col_1]['count'] = \
871 perf_counters_by_daemon[collection]['count']
872 elif mode == 'aggregated':
873 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
874 # has a uniquely-named collection that starts off identically (i.e.
875 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
876 # This bit of code combines these unique counters all under one rgw instance.
877 # Without this check, the schema would remain separeted out in the final report.
878 if col_0[0:11] == "objecter-0x":
879 col_0 = "objecter-0x"
880
881 # Check that the value can be incremented. In some cases,
882 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
883 # In those cases, the value is a dictionary, and not a number.
884 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
885 if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number):
886 result[daemon_type][col_0][col_1]['value'] += \
887 perf_counters_by_daemon[collection]['value']
888
889 # Check that 'count' exists, as not all counters have a count field.
890 if 'count' in perf_counters_by_daemon[collection]:
891 result[daemon_type][col_0][col_1]['count'] += \
892 perf_counters_by_daemon[collection]['count']
893 else:
894 self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
895 return {}
896
897 if mode == 'separated':
898 # for debugging purposes only, this data is never reported
899 self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
900
901 return result
902
903 def get_active_channels(self) -> List[str]:
904 r = []
905 if self.channel_basic:
906 r.append('basic')
907 if self.channel_crash:
908 r.append('crash')
909 if self.channel_device:
910 r.append('device')
911 if self.channel_ident:
912 r.append('ident')
913 if self.channel_perf:
914 r.append('perf')
915 return r
916
917 def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
918 try:
919 time_format = self.remote('devicehealth', 'get_time_format')
920 except Exception as e:
921 self.log.debug('Unable to format time: {}'.format(e))
922 return {}
923 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
924 min_sample = cutoff.strftime(time_format)
925
926 devices = self.get('devices')['devices']
927 if not devices:
928 self.log.debug('Unable to get device info from the mgr.')
929 return {}
930
931 # anon-host-id -> anon-devid -> { timestamp -> record }
932 res: Dict[str, Dict[str, Dict[str, str]]] = {}
933 for d in devices:
934 devid = d['devid']
935 try:
936 # this is a map of stamp -> {device info}
937 m = self.remote('devicehealth', 'get_recent_device_metrics',
938 devid, min_sample)
939 except Exception as e:
940 self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
941 continue
942
943 # anonymize host id
944 try:
945 host = d['location'][0]['host']
946 except (KeyError, IndexError) as e:
947 self.log.exception('Unable to get host from device with id "{}": {}'.format(devid, e))
948 continue
949 anon_host = self.get_store('host-id/%s' % host)
950 if not anon_host:
951 anon_host = str(uuid.uuid1())
952 self.set_store('host-id/%s' % host, anon_host)
953 serial = None
954 for dev, rep in m.items():
955 rep['host_id'] = anon_host
956 if serial is None and 'serial_number' in rep:
957 serial = rep['serial_number']
958
959 # anonymize device id
960 anon_devid = self.get_store('devid-id/%s' % devid)
961 if not anon_devid:
962 # ideally devid is 'vendor_model_serial',
963 # but can also be 'model_serial', 'serial'
964 if '_' in devid:
965 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
966 else:
967 anon_devid = str(uuid.uuid1())
968 self.set_store('devid-id/%s' % devid, anon_devid)
969 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
970 host, anon_host))
971
972 # anonymize the smartctl report itself
973 if serial:
974 m_str = json.dumps(m)
975 m = json.loads(m_str.replace(serial, 'deleted'))
976
977 if anon_host not in res:
978 res[anon_host] = {}
979 res[anon_host][anon_devid] = m
980 return res
981
982 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
983 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
984 if data:
985 return data[-1][1]
986 else:
987 return 0
988
989 def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
990 if not channels:
991 channels = self.get_active_channels()
992 report = {
993 'leaderboard': self.leaderboard,
994 'leaderboard_description': self.leaderboard_description,
995 'report_version': 1,
996 'report_timestamp': datetime.utcnow().isoformat(),
997 'report_id': self.report_id,
998 'channels': channels,
999 'channels_available': ALL_CHANNELS,
1000 'license': LICENSE,
1001 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
1002 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
1003 }
1004
1005 if 'ident' in channels:
1006 for option in ['description', 'contact', 'organization']:
1007 report[option] = getattr(self, option)
1008
1009 if 'basic' in channels:
1010 mon_map = self.get('mon_map')
1011 osd_map = self.get('osd_map')
1012 service_map = self.get('service_map')
1013 fs_map = self.get('fs_map')
1014 df = self.get('df')
1015 df_pools = {pool['id']: pool for pool in df['pools']}
1016
1017 report['created'] = mon_map['created']
1018
1019 # mons
1020 v1_mons = 0
1021 v2_mons = 0
1022 ipv4_mons = 0
1023 ipv6_mons = 0
1024 for mon in mon_map['mons']:
1025 for a in mon['public_addrs']['addrvec']:
1026 if a['type'] == 'v2':
1027 v2_mons += 1
1028 elif a['type'] == 'v1':
1029 v1_mons += 1
1030 if a['addr'].startswith('['):
1031 ipv6_mons += 1
1032 else:
1033 ipv4_mons += 1
1034 report['mon'] = {
1035 'count': len(mon_map['mons']),
1036 'features': mon_map['features'],
1037 'min_mon_release': mon_map['min_mon_release'],
1038 'v1_addr_mons': v1_mons,
1039 'v2_addr_mons': v2_mons,
1040 'ipv4_addr_mons': ipv4_mons,
1041 'ipv6_addr_mons': ipv6_mons,
1042 }
1043
1044 report['config'] = self.gather_configs()
1045
1046 # pools
1047
1048 rbd_num_pools = 0
1049 rbd_num_images_by_pool = []
1050 rbd_mirroring_by_pool = []
1051 num_pg = 0
1052 report['pools'] = list()
1053 for pool in osd_map['pools']:
1054 num_pg += pool['pg_num']
1055 ec_profile = {}
1056 if pool['erasure_code_profile']:
1057 orig = osd_map['erasure_code_profiles'].get(
1058 pool['erasure_code_profile'], {})
1059 ec_profile = {
1060 k: orig[k] for k in orig.keys()
1061 if k in ['k', 'm', 'plugin', 'technique',
1062 'crush-failure-domain', 'l']
1063 }
1064 pool_data = {
1065 'pool': pool['pool'],
1066 'pg_num': pool['pg_num'],
1067 'pgp_num': pool['pg_placement_num'],
1068 'size': pool['size'],
1069 'min_size': pool['min_size'],
1070 'pg_autoscale_mode': pool['pg_autoscale_mode'],
1071 'target_max_bytes': pool['target_max_bytes'],
1072 'target_max_objects': pool['target_max_objects'],
1073 'type': ['', 'replicated', '', 'erasure'][pool['type']],
1074 'erasure_code_profile': ec_profile,
1075 'cache_mode': pool['cache_mode'],
1076 }
1077
1078 # basic_pool_usage collection
1079 if self.is_enabled_collection(Collection.basic_pool_usage):
1080 pool_data['application'] = []
1081 for application in pool['application_metadata']:
1082 # Only include default applications
1083 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
1084 pool_data['application'].append(application)
1085 pool_stats = df_pools[pool['pool']]['stats']
1086 pool_data['stats'] = { # filter out kb_used
1087 'avail_raw': pool_stats['avail_raw'],
1088 'bytes_used': pool_stats['bytes_used'],
1089 'compress_bytes_used': pool_stats['compress_bytes_used'],
1090 'compress_under_bytes': pool_stats['compress_under_bytes'],
1091 'data_bytes_used': pool_stats['data_bytes_used'],
1092 'dirty': pool_stats['dirty'],
1093 'max_avail': pool_stats['max_avail'],
1094 'objects': pool_stats['objects'],
1095 'omap_bytes_used': pool_stats['omap_bytes_used'],
1096 'percent_used': pool_stats['percent_used'],
1097 'quota_bytes': pool_stats['quota_bytes'],
1098 'quota_objects': pool_stats['quota_objects'],
1099 'rd': pool_stats['rd'],
1100 'rd_bytes': pool_stats['rd_bytes'],
1101 'stored': pool_stats['stored'],
1102 'stored_data': pool_stats['stored_data'],
1103 'stored_omap': pool_stats['stored_omap'],
1104 'stored_raw': pool_stats['stored_raw'],
1105 'wr': pool_stats['wr'],
1106 'wr_bytes': pool_stats['wr_bytes']
1107 }
1108 pool_data['options'] = {}
1109 # basic_pool_options_bluestore collection
1110 if self.is_enabled_collection(Collection.basic_pool_options_bluestore):
1111 bluestore_options = ['compression_algorithm',
1112 'compression_mode',
1113 'compression_required_ratio',
1114 'compression_min_blob_size',
1115 'compression_max_blob_size']
1116 for option in bluestore_options:
1117 if option in pool['options']:
1118 pool_data['options'][option] = pool['options'][option]
1119
1120 # basic_pool_flags collection
1121 if self.is_enabled_collection(Collection.basic_pool_flags):
1122 if 'flags_names' in pool and pool['flags_names'] is not None:
1123 # flags are defined in pg_pool_t (src/osd/osd_types.h)
1124 flags_to_report = [
1125 'hashpspool',
1126 'full',
1127 'ec_overwrites',
1128 'incomplete_clones',
1129 'nodelete',
1130 'nopgchange',
1131 'nosizechange',
1132 'write_fadvise_dontneed',
1133 'noscrub',
1134 'nodeep-scrub',
1135 'full_quota',
1136 'nearfull',
1137 'backfillfull',
1138 'selfmanaged_snaps',
1139 'pool_snaps',
1140 'creating',
1141 'eio',
1142 'bulk',
1143 'crimson',
1144 ]
1145
1146 pool_data['flags_names'] = [flag for flag in pool['flags_names'].split(',') if flag in flags_to_report]
1147
1148 cast(List[Dict[str, Any]], report['pools']).append(pool_data)
1149
1150 if 'rbd' in pool['application_metadata']:
1151 rbd_num_pools += 1
1152 ioctx = self.rados.open_ioctx(pool['pool_name'])
1153 rbd_num_images_by_pool.append(
1154 sum(1 for _ in rbd.RBD().list2(ioctx)))
1155 rbd_mirroring_by_pool.append(
1156 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
1157 report['rbd'] = {
1158 'num_pools': rbd_num_pools,
1159 'num_images_by_pool': rbd_num_images_by_pool,
1160 'mirroring_by_pool': rbd_mirroring_by_pool}
1161
1162 # osds
1163 cluster_network = False
1164 for osd in osd_map['osds']:
1165 if osd['up'] and not cluster_network:
1166 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
1167 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
1168 if front_ip != back_ip:
1169 cluster_network = True
1170 report['osd'] = {
1171 'count': len(osd_map['osds']),
1172 'require_osd_release': osd_map['require_osd_release'],
1173 'require_min_compat_client': osd_map['require_min_compat_client'],
1174 'cluster_network': cluster_network,
1175 }
1176
1177 # crush
1178 report['crush'] = self.gather_crush_info()
1179
1180 # cephfs
1181 report['fs'] = {
1182 'count': len(fs_map['filesystems']),
1183 'feature_flags': fs_map['feature_flags'],
1184 'num_standby_mds': len(fs_map['standbys']),
1185 'filesystems': [],
1186 }
1187 num_mds = len(fs_map['standbys'])
1188 for fsm in fs_map['filesystems']:
1189 fs = fsm['mdsmap']
1190 num_sessions = 0
1191 cached_ino = 0
1192 cached_dn = 0
1193 cached_cap = 0
1194 subtrees = 0
1195 rfiles = 0
1196 rbytes = 0
1197 rsnaps = 0
1198 for gid, mds in fs['info'].items():
1199 num_sessions += self.get_latest('mds', mds['name'],
1200 'mds_sessions.session_count')
1201 cached_ino += self.get_latest('mds', mds['name'],
1202 'mds_mem.ino')
1203 cached_dn += self.get_latest('mds', mds['name'],
1204 'mds_mem.dn')
1205 cached_cap += self.get_latest('mds', mds['name'],
1206 'mds_mem.cap')
1207 subtrees += self.get_latest('mds', mds['name'],
1208 'mds.subtrees')
1209 if mds['rank'] == 0:
1210 rfiles = self.get_latest('mds', mds['name'],
1211 'mds.root_rfiles')
1212 rbytes = self.get_latest('mds', mds['name'],
1213 'mds.root_rbytes')
1214 rsnaps = self.get_latest('mds', mds['name'],
1215 'mds.root_rsnaps')
1216 report['fs']['filesystems'].append({ # type: ignore
1217 'max_mds': fs['max_mds'],
1218 'ever_allowed_features': fs['ever_allowed_features'],
1219 'explicitly_allowed_features': fs['explicitly_allowed_features'],
1220 'num_in': len(fs['in']),
1221 'num_up': len(fs['up']),
1222 'num_standby_replay': len(
1223 [mds for gid, mds in fs['info'].items()
1224 if mds['state'] == 'up:standby-replay']),
1225 'num_mds': len(fs['info']),
1226 'num_sessions': num_sessions,
1227 'cached_inos': cached_ino,
1228 'cached_dns': cached_dn,
1229 'cached_caps': cached_cap,
1230 'cached_subtrees': subtrees,
1231 'balancer_enabled': len(fs['balancer']) > 0,
1232 'num_data_pools': len(fs['data_pools']),
1233 'standby_count_wanted': fs['standby_count_wanted'],
1234 'approx_ctime': fs['created'][0:7],
1235 'files': rfiles,
1236 'bytes': rbytes,
1237 'snaps': rsnaps,
1238 })
1239 num_mds += len(fs['info'])
1240 report['fs']['total_num_mds'] = num_mds # type: ignore
1241
1242 # daemons
1243 report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
1244 mon=self.gather_mon_metadata(mon_map))
1245
1246 if self.is_enabled_collection(Collection.basic_mds_metadata):
1247 report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
1248
1249 # host counts
1250 servers = self.list_servers()
1251 self.log.debug('servers %s' % servers)
1252 hosts = {
1253 'num': len([h for h in servers if h['hostname']]),
1254 }
1255 for t in ['mon', 'mds', 'osd', 'mgr']:
1256 nr_services = sum(1 for host in servers if
1257 any(service for service in cast(List[ServiceInfoT],
1258 host['services'])
1259 if service['type'] == t))
1260 hosts['num_with_' + t] = nr_services
1261 report['hosts'] = hosts
1262
1263 report['usage'] = {
1264 'pools': len(df['pools']),
1265 'pg_num': num_pg,
1266 'total_used_bytes': df['stats']['total_used_bytes'],
1267 'total_bytes': df['stats']['total_bytes'],
1268 'total_avail_bytes': df['stats']['total_avail_bytes']
1269 }
1270 # basic_usage_by_class collection
1271 if self.is_enabled_collection(Collection.basic_usage_by_class):
1272 report['usage']['stats_by_class'] = {} # type: ignore
1273 for device_class in df['stats_by_class']:
1274 if device_class in ['hdd', 'ssd', 'nvme']:
1275 report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
1276
1277 services: DefaultDict[str, int] = defaultdict(int)
1278 for key, value in service_map['services'].items():
1279 services[key] += 1
1280 if key == 'rgw':
1281 rgw = {}
1282 zones = set()
1283 zonegroups = set()
1284 frontends = set()
1285 count = 0
1286 d = value.get('daemons', dict())
1287 for k, v in d.items():
1288 if k == 'summary' and v:
1289 rgw[k] = v
1290 elif isinstance(v, dict) and 'metadata' in v:
1291 count += 1
1292 zones.add(v['metadata']['zone_id'])
1293 zonegroups.add(v['metadata']['zonegroup_id'])
1294 frontends.add(v['metadata']['frontend_type#0'])
1295
1296 # we could actually iterate over all the keys of
1297 # the dict and check for how many frontends there
1298 # are, but it is unlikely that one would be running
1299 # more than 2 supported ones
1300 f2 = v['metadata'].get('frontend_type#1', None)
1301 if f2:
1302 frontends.add(f2)
1303
1304 rgw['count'] = count
1305 rgw['zones'] = len(zones)
1306 rgw['zonegroups'] = len(zonegroups)
1307 rgw['frontends'] = list(frontends) # sets aren't json-serializable
1308 report['rgw'] = rgw
1309 report['services'] = services
1310
1311 try:
1312 report['balancer'] = self.remote('balancer', 'gather_telemetry')
1313 except ImportError:
1314 report['balancer'] = {
1315 'active': False
1316 }
1317
1318 # Rook
1319 self.get_rook_data(report)
1320
1321 if 'crash' in channels:
1322 report['crashes'] = self.gather_crashinfo()
1323
1324 if 'perf' in channels:
1325 if self.is_enabled_collection(Collection.perf_perf):
1326 report['perf_counters'] = self.gather_perf_counters('separated')
1327 report['stats_per_pool'] = self.get_stats_per_pool()
1328 report['stats_per_pg'] = self.get_stats_per_pg()
1329 report['io_rate'] = self.get_io_rate()
1330 report['osd_perf_histograms'] = self.get_osd_histograms('separated')
1331 report['mempool'] = self.get_mempool('separated')
1332 report['heap_stats'] = self.get_heap_stats()
1333 report['rocksdb_stats'] = self.get_rocksdb_stats()
1334
1335 # NOTE: We do not include the 'device' channel in this report; it is
1336 # sent to a different endpoint.
1337
1338 return report
1339
1340 def get_rook_data(self, report: Dict[str, object]) -> None:
1341 r, outb, outs = self.mon_command({
1342 'prefix': 'config-key dump',
1343 'format': 'json'
1344 })
1345 if r != 0:
1346 return
1347 try:
1348 config_kv_dump = json.loads(outb)
1349 except json.decoder.JSONDecodeError:
1350 return
1351
1352 for elem in ROOK_KEYS_BY_COLLECTION:
1353 # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
1354 # elem[1] is the Collection this key belongs to
1355 if self.is_enabled_collection(elem[1]):
1356 self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
1357
1358 def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
1359 last_node = key_path.split('/')[-1]
1360 for node in key_path.split('/')[0:-1]:
1361 if node not in report:
1362 report[node] = {}
1363 report = report[node] # type: ignore
1364
1365 # sanity check of keys correctness
1366 if not isinstance(report, dict):
1367 self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
1368 return
1369
1370 if last_node in report:
1371 self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
1372 return
1373
1374 report[last_node] = value
1375
1376 def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
1377 self.log.info('Sending %s to: %s' % (what, url))
1378 proxies = dict()
1379 if self.proxy:
1380 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
1381 proxies['http'] = self.proxy
1382 proxies['https'] = self.proxy
1383 try:
1384 resp = requests.put(url=url, json=report, proxies=proxies)
1385 resp.raise_for_status()
1386 except Exception as e:
1387 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
1388 self.log.error(fail_reason)
1389 return fail_reason
1390 return None
1391
1392 class EndPoint(enum.Enum):
1393 ceph = 'ceph'
1394 device = 'device'
1395
1396 def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
1397 '''
1398 Find collections that are available in the module, but are not in the db
1399 '''
1400 if self.db_collection is None:
1401 return None
1402
1403 if not channels:
1404 channels = ALL_CHANNELS
1405 else:
1406 for ch in channels:
1407 if ch not in ALL_CHANNELS:
1408 self.log.debug(f"invalid channel name: {ch}")
1409 return None
1410
1411 new_collection : List[Collection] = []
1412
1413 for c in MODULE_COLLECTION:
1414 if c['name'].name not in self.db_collection:
1415 if c['channel'] in channels:
1416 new_collection.append(c['name'])
1417
1418 return new_collection
1419
1420 def is_major_upgrade(self) -> bool:
1421 '''
1422 Returns True only if the user last opted-in to an older major
1423 '''
1424 if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
1425 # we do not know what Ceph version was when the user last opted-in,
1426 # thus we do not wish to nag in case of a major upgrade
1427 return False
1428
1429 mon_map = self.get('mon_map')
1430 mon_min = mon_map.get("min_mon_release", 0)
1431
1432 if mon_min - self.last_opted_in_ceph_version > 0:
1433 self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1434 return True
1435
1436 return False
1437
1438 def is_opted_in(self) -> bool:
1439 # If len is 0 it means that the user is either opted-out (never
1440 # opted-in, or invoked `telemetry off`), or they upgraded from a
1441 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1442 # regardless, hence is considered as opted-out
1443 if self.db_collection is None:
1444 return False
1445 return len(self.db_collection) > 0
1446
1447 def should_nag(self) -> bool:
1448 # Find delta between opted-in collections and module collections;
1449 # nag only if module has a collection which is not in db, and nag == True.
1450
1451 # We currently do not nag if the user is opted-out (or never opted-in).
1452 # If we wish to do this in the future, we need to have a tri-mode state
1453 # (opted in, opted out, no action yet), and it needs to be guarded by a
1454 # config option (so that nagging can be turned off via config).
1455 # We also need to add a last_opted_out_ceph_version variable, for the
1456 # major upgrade check.
1457
1458 # check if there are collections the user is not opt-in to
1459 # that we should nag about
1460 if self.db_collection is not None:
1461 for c in MODULE_COLLECTION:
1462 if c['name'].name not in self.db_collection:
1463 if c['nag'] == True:
1464 self.log.debug(f"The collection: {c['name']} is not reported")
1465 return True
1466
1467 # user might be opted-in to the most recent collection, or there is no
1468 # new collection which requires nagging about; thus nag in case it's a
1469 # major upgrade and there are new collections
1470 # (which their own nag == False):
1471 new_collections = False
1472 col_delta = self.collection_delta()
1473 if col_delta is not None and len(col_delta) > 0:
1474 new_collections = True
1475
1476 return self.is_major_upgrade() and new_collections
1477
1478 def init_collection(self) -> None:
1479 # We fetch from db the collections the user had already opted-in to.
1480 # During the transition the results will be empty, but the user might
1481 # be opted-in to an older version (e.g. revision = 3)
1482
1483 collection = self.get_store('collection')
1484
1485 if collection is not None:
1486 self.db_collection = json.loads(collection)
1487
1488 if self.db_collection is None:
1489 # happens once on upgrade
1490 if not self.enabled:
1491 # user is not opted-in
1492 self.set_store('collection', json.dumps([]))
1493 self.log.debug("user is not opted-in")
1494 else:
1495 # user is opted-in, verify the revision:
1496 if self.last_opt_revision == REVISION:
1497 self.log.debug(f"telemetry revision is {REVISION}")
1498 base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
1499 self.set_store('collection', json.dumps(base_collection))
1500 else:
1501 # user is opted-in to an older version, meaning they need
1502 # to re-opt in regardless
1503 self.set_store('collection', json.dumps([]))
1504 self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1505
1506 # reload collection after setting
1507 collection = self.get_store('collection')
1508 if collection is not None:
1509 self.db_collection = json.loads(collection)
1510 else:
1511 raise RuntimeError('collection is None after initial setting')
1512 else:
1513 # user has already upgraded
1514 self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
1515
1516 def is_enabled_collection(self, collection: Collection) -> bool:
1517 if self.db_collection is None:
1518 return False
1519 return collection.name in self.db_collection
1520
1521 def opt_in_all_collections(self) -> None:
1522 """
1523 Opt-in to all collections; Update db with the currently available collections in the module
1524 """
1525 if self.db_collection is None:
1526 raise RuntimeError('db_collection is None after initial setting')
1527
1528 for c in MODULE_COLLECTION:
1529 if c['name'].name not in self.db_collection:
1530 self.db_collection.append(c['name'])
1531
1532 self.set_store('collection', json.dumps(self.db_collection))
1533
1534 def send(self,
1535 report: Dict[str, Dict[str, str]],
1536 endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
1537 if not endpoint:
1538 endpoint = [self.EndPoint.ceph, self.EndPoint.device]
1539 failed = []
1540 success = []
1541 self.log.debug('Send endpoints %s' % endpoint)
1542 for e in endpoint:
1543 if e == self.EndPoint.ceph:
1544 fail_reason = self._try_post('ceph report', self.url, report)
1545 if fail_reason:
1546 failed.append(fail_reason)
1547 else:
1548 now = int(time.time())
1549 self.last_upload = now
1550 self.set_store('last_upload', str(now))
1551 success.append('Ceph report sent to {0}'.format(self.url))
1552 self.log.info('Sent report to {0}'.format(self.url))
1553 elif e == self.EndPoint.device:
1554 if 'device' in self.get_active_channels():
1555 devices = self.gather_device_report()
1556 if devices:
1557 num_devs = 0
1558 num_hosts = 0
1559 for host, ls in devices.items():
1560 self.log.debug('host %s devices %s' % (host, ls))
1561 if not len(ls):
1562 continue
1563 fail_reason = self._try_post('devices', self.device_url,
1564 ls)
1565 if fail_reason:
1566 failed.append(fail_reason)
1567 else:
1568 num_devs += len(ls)
1569 num_hosts += 1
1570 if num_devs:
1571 success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1572 num_devs, num_hosts, len(devices)))
1573 else:
1574 fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
1575 failed.append(fail_reason)
1576 self.log.error(fail_reason)
1577 if failed:
1578 return 1, '', '\n'.join(success + failed)
1579 return 0, '', '\n'.join(success)
1580
1581 def format_perf_histogram(self, report: Dict[str, Any]) -> None:
1582 # Formatting the perf histograms so they are human-readable. This will change the
1583 # ranges and values, which are currently in list form, into strings so that
1584 # they are displayed horizontally instead of vertically.
1585 if 'report' in report:
1586 report = report['report']
1587 try:
1588 # Formatting ranges and values in osd_perf_histograms
1589 mode = 'osd_perf_histograms'
1590 for config in report[mode]:
1591 for histogram in config:
1592 # Adjust ranges by converting lists into strings
1593 for axis in config[histogram]['axes']:
1594 for i in range(0, len(axis['ranges'])):
1595 axis['ranges'][i] = str(axis['ranges'][i])
1596
1597 for osd in config[histogram]['osds']:
1598 for i in range(0, len(osd['values'])):
1599 osd['values'][i] = str(osd['values'][i])
1600 except KeyError:
1601 # If the perf channel is not enabled, there should be a KeyError since
1602 # 'osd_perf_histograms' would not be present in the report. In that case,
1603 # the show function should pass as usual without trying to format the
1604 # histograms.
1605 pass
1606
1607 def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1608 '''
1609 Enable or disable a list of channels
1610 '''
1611 if not self.enabled:
1612 # telemetry should be on for channels to be toggled
1613 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1614 'Preview sample reports with `ceph telemetry preview`.'
1615 return 0, msg, ''
1616
1617 if channels is None:
1618 msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1619 return 0, msg, ''
1620
1621 state = action == 'enable'
1622 msg = ''
1623 for c in channels:
1624 if c not in ALL_CHANNELS:
1625 msg = f"{msg}{c} is not a valid channel name. "\
1626 f"Available channels: {ALL_CHANNELS}.\n"
1627 else:
1628 self.set_module_option(f"channel_{c}", state)
1629 setattr(self,
1630 f"channel_{c}",
1631 state)
1632 msg = f"{msg}channel_{c} is {action}d\n"
1633
1634 return 0, msg, ''
1635
1636 @CLIReadCommand('telemetry status')
1637 def status(self) -> Tuple[int, str, str]:
1638 '''
1639 Show current configuration
1640 '''
1641 r = {}
1642 for opt in self.MODULE_OPTIONS:
1643 r[opt['name']] = getattr(self, opt['name'])
1644 r['last_upload'] = (time.ctime(self.last_upload)
1645 if self.last_upload else self.last_upload)
1646 return 0, json.dumps(r, indent=4, sort_keys=True), ''
1647
1648 @CLIReadCommand('telemetry diff')
1649 def diff(self) -> Tuple[int, str, str]:
1650 '''
1651 Show the diff between opted-in collection and available collection
1652 '''
1653 diff = []
1654 keys = ['nag']
1655
1656 for c in MODULE_COLLECTION:
1657 if not self.is_enabled_collection(c['name']):
1658 diff.append({key: val for key, val in c.items() if key not in keys})
1659
1660 r = None
1661 if diff == []:
1662 r = "Telemetry is up to date"
1663 else:
1664 r = json.dumps(diff, indent=4, sort_keys=True)
1665
1666 return 0, r, ''
1667
1668 @CLICommand('telemetry on')
1669 def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
1670 '''
1671 Enable telemetry reports from this cluster
1672 '''
1673 if license != LICENSE:
1674 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1675To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1676 else:
1677 self.set_module_option('enabled', True)
1678 self.enabled = True
1679 self.opt_in_all_collections()
1680
1681 # for major releases upgrade nagging
1682 mon_map = self.get('mon_map')
1683 mon_min = mon_map.get("min_mon_release", 0)
1684 self.set_store('last_opted_in_ceph_version', str(mon_min))
1685 self.last_opted_in_ceph_version = mon_min
1686
1687 msg = 'Telemetry is on.'
1688 disabled_channels = ''
1689 active_channels = self.get_active_channels()
1690 for c in ALL_CHANNELS:
1691 if c not in active_channels and c != 'ident':
1692 disabled_channels = f"{disabled_channels} {c}"
1693
1694 if len(disabled_channels) > 0:
1695 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
1696 f"`ceph telemetry enable channel{disabled_channels}`"
1697
1698 # wake up serve() to reset health warning
1699 self.event.set()
1700
1701 return 0, msg, ''
1702
1703 @CLICommand('telemetry off')
1704 def off(self) -> Tuple[int, str, str]:
1705 '''
1706 Disable telemetry reports from this cluster
1707 '''
1708 if not self.enabled:
1709 # telemetry is already off
1710 msg = 'Telemetry is currently not enabled, nothing to turn off. '\
1711 'Please consider opting-in with `ceph telemetry on`.\n' \
1712 'Preview sample reports with `ceph telemetry preview`.'
1713 return 0, msg, ''
1714
1715 self.set_module_option('enabled', False)
1716 self.enabled = False
1717 self.set_store('collection', json.dumps([]))
1718 self.db_collection = []
1719
1720 # we might need this info in the future, in case
1721 # of nagging when user is opted-out
1722 mon_map = self.get('mon_map')
1723 mon_min = mon_map.get("min_mon_release", 0)
1724 self.set_store('last_opted_out_ceph_version', str(mon_min))
1725 self.last_opted_out_ceph_version = mon_min
1726
1727 msg = 'Telemetry is now disabled.'
1728 return 0, msg, ''
1729
1730 @CLIReadCommand('telemetry enable channel all')
1731 def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1732 '''
1733 Enable all channels
1734 '''
1735 return self.toggle_channel('enable', channels)
1736
1737 @CLIReadCommand('telemetry enable channel')
1738 def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1739 '''
1740 Enable a list of channels
1741 '''
1742 return self.toggle_channel('enable', channels)
1743
1744 @CLIReadCommand('telemetry disable channel all')
1745 def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1746 '''
1747 Disable all channels
1748 '''
1749 return self.toggle_channel('disable', channels)
1750
1751 @CLIReadCommand('telemetry disable channel')
1752 def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1753 '''
1754 Disable a list of channels
1755 '''
1756 return self.toggle_channel('disable', channels)
1757
1758 @CLIReadCommand('telemetry channel ls')
1759 def channel_ls(self) -> Tuple[int, str, str]:
1760 '''
1761 List all channels
1762 '''
1763 table = PrettyTable(
1764 [
1765 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1766 ],
1767 border=False)
1768 table.align['NAME'] = 'l'
1769 table.align['ENABLED'] = 'l'
1770 table.align['DEFAULT'] = 'l'
1771 table.align['DESC'] = 'l'
1772 table.left_padding_width = 0
1773 table.right_padding_width = 4
1774
1775 for c in ALL_CHANNELS:
1776 enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
1777 for o in self.MODULE_OPTIONS:
1778 if o['name'] == f"channel_{c}":
1779 default = "ON" if o.get('default', None) else "OFF"
1780 desc = o.get('desc', None)
1781
1782 table.add_row((
1783 c,
1784 enabled,
1785 default,
1786 desc,
1787 ))
1788
1789 return 0, table.get_string(sortby="NAME"), ''
1790
1791 @CLIReadCommand('telemetry collection ls')
1792 def collection_ls(self) -> Tuple[int, str, str]:
1793 '''
1794 List all collections
1795 '''
1796 col_delta = self.collection_delta()
1797 msg = ''
1798 if col_delta is not None and len(col_delta) > 0:
1799 msg = f"New collections are available:\n" \
1800 f"{sorted([c.name for c in col_delta])}\n" \
1801 f"Run `ceph telemetry on` to opt-in to these collections.\n"
1802
1803 table = PrettyTable(
1804 [
1805 'NAME', 'STATUS', 'DESC',
1806 ],
1807 border=False)
1808 table.align['NAME'] = 'l'
1809 table.align['STATUS'] = 'l'
1810 table.align['DESC'] = 'l'
1811 table.left_padding_width = 0
1812 table.right_padding_width = 4
1813
1814 for c in MODULE_COLLECTION:
1815 name = c['name']
1816 opted_in = self.is_enabled_collection(name)
1817 channel_enabled = getattr(self, f"channel_{c['channel']}")
1818
1819 status = ''
1820 if channel_enabled and opted_in:
1821 status = "REPORTING"
1822 else:
1823 why = ''
1824 delimiter = ''
1825
1826 if not opted_in:
1827 why += "NOT OPTED-IN"
1828 delimiter = ', '
1829 if not channel_enabled:
1830 why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
1831
1832 status = f"NOT REPORTING: {why}"
1833
1834 desc = c['description']
1835
1836 table.add_row((
1837 name,
1838 status,
1839 desc,
1840 ))
1841
1842 if len(msg):
1843 # add a new line between message and table output
1844 msg = f"{msg} \n"
1845
1846 return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
1847
1848 @CLICommand('telemetry send')
1849 def do_send(self,
1850 endpoint: Optional[List[EndPoint]] = None,
1851 license: Optional[str] = None) -> Tuple[int, str, str]:
1852 '''
1853 Send a sample report
1854 '''
1855 if not self.is_opted_in() and license != LICENSE:
1856 self.log.debug(('A telemetry send attempt while opted-out. '
1857 'Asking for license agreement'))
1858 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1859To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1860Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1861 else:
1862 self.last_report = self.compile_report()
1863 return self.send(self.last_report, endpoint)
1864
1865 @CLIReadCommand('telemetry show')
1866 def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1867 '''
1868 Show a sample report of opted-in collections (except for 'device')
1869 '''
1870 if not self.enabled:
1871 # if telemetry is off, no report is being sent, hence nothing to show
1872 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1873 'Preview sample reports with `ceph telemetry preview`.'
1874 return 0, msg, ''
1875
1876 report = self.get_report_locked(channels=channels)
1877 self.format_perf_histogram(report)
1878 report = json.dumps(report, indent=4, sort_keys=True)
1879
1880 if self.channel_device:
1881 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1882
1883 return 0, report, ''
1884
1885 @CLIReadCommand('telemetry preview')
1886 def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1887 '''
1888 Preview a sample report of the most recent collections available (except for 'device')
1889 '''
1890 report = {}
1891
1892 # We use a lock to prevent a scenario where the user wishes to preview
1893 # the report, and at the same time the module hits the interval of
1894 # sending a report with the opted-in collection, which has less data
1895 # than in the preview report.
1896 col_delta = self.collection_delta()
1897 with self.get_report_lock:
1898 if col_delta is not None and len(col_delta) == 0:
1899 # user is already opted-in to the most recent collection
1900 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1901 return 0, msg, ''
1902 else:
1903 # there are collections the user is not opted-in to
1904 next_collection = []
1905
1906 for c in MODULE_COLLECTION:
1907 next_collection.append(c['name'].name)
1908
1909 opted_in_collection = self.db_collection
1910 self.db_collection = next_collection
1911 report = self.get_report(channels=channels)
1912 self.db_collection = opted_in_collection
1913
1914 self.format_perf_histogram(report)
1915 report = json.dumps(report, indent=4, sort_keys=True)
1916
1917 if self.channel_device:
1918 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1919
1920 return 0, report, ''
1921
1922 @CLIReadCommand('telemetry show-device')
1923 def show_device(self) -> Tuple[int, str, str]:
1924 '''
1925 Show a sample device report
1926 '''
1927 if not self.enabled:
1928 # if telemetry is off, no report is being sent, hence nothing to show
1929 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1930 'Preview sample device reports with `ceph telemetry preview-device`.'
1931 return 0, msg, ''
1932
1933 if not self.channel_device:
1934 # if device channel is off, device report is not being sent, hence nothing to show
1935 msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1936 'Preview sample device reports with `ceph telemetry preview-device`.'
1937 return 0, msg, ''
1938
1939 return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
1940
1941 @CLIReadCommand('telemetry preview-device')
1942 def preview_device(self) -> Tuple[int, str, str]:
1943 '''
1944 Preview a sample device report of the most recent device collection
1945 '''
1946 report = {}
1947
1948 device_col_delta = self.collection_delta(['device'])
1949 with self.get_report_lock:
1950 if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
1951 # user is already opted-in to the most recent device collection,
1952 # and device channel is on, thus `show-device` should be called
1953 msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1954 return 0, msg, ''
1955
1956 # either the user is not opted-in at all, or there are collections
1957 # they are not opted-in to
1958 next_collection = []
1959
1960 for c in MODULE_COLLECTION:
1961 next_collection.append(c['name'].name)
1962
1963 opted_in_collection = self.db_collection
1964 self.db_collection = next_collection
1965 report = self.get_report('device')
1966 self.db_collection = opted_in_collection
1967
1968 report = json.dumps(report, indent=4, sort_keys=True)
1969 return 0, report, ''
1970
1971 @CLIReadCommand('telemetry show-all')
1972 def show_all(self) -> Tuple[int, str, str]:
1973 '''
1974 Show a sample report of all enabled channels (including 'device' channel)
1975 '''
1976 if not self.enabled:
1977 # if telemetry is off, no report is being sent, hence nothing to show
1978 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1979 'Preview sample reports with `ceph telemetry preview`.'
1980 return 0, msg, ''
1981
1982 if not self.channel_device:
1983 # device channel is off, no need to display its report
1984 report = self.get_report_locked('default')
1985 else:
1986 # telemetry is on and device channel is enabled, show both
1987 report = self.get_report_locked('all')
1988
1989 self.format_perf_histogram(report)
1990 return 0, json.dumps(report, indent=4, sort_keys=True), ''
1991
1992 @CLIReadCommand('telemetry preview-all')
1993 def preview_all(self) -> Tuple[int, str, str]:
1994 '''
1995 Preview a sample report of the most recent collections available of all channels (including 'device')
1996 '''
1997 report = {}
1998
1999 col_delta = self.collection_delta()
2000 with self.get_report_lock:
2001 if col_delta is not None and len(col_delta) == 0:
2002 # user is already opted-in to the most recent collection
2003 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
2004 return 0, msg, ''
2005
2006 # there are collections the user is not opted-in to
2007 next_collection = []
2008
2009 for c in MODULE_COLLECTION:
2010 next_collection.append(c['name'].name)
2011
2012 opted_in_collection = self.db_collection
2013 self.db_collection = next_collection
2014 report = self.get_report('all')
2015 self.db_collection = opted_in_collection
2016
2017 self.format_perf_histogram(report)
2018 report = json.dumps(report, indent=4, sort_keys=True)
2019
2020 return 0, report, ''
2021
2022 def get_report_locked(self,
2023 report_type: str = 'default',
2024 channels: Optional[List[str]] = None) -> Dict[str, Any]:
2025 '''
2026 A wrapper around get_report to allow for compiling a report of the most recent module collections
2027 '''
2028 with self.get_report_lock:
2029 return self.get_report(report_type, channels)
2030
2031 def get_report(self,
2032 report_type: str = 'default',
2033 channels: Optional[List[str]] = None) -> Dict[str, Any]:
2034 if report_type == 'default':
2035 return self.compile_report(channels=channels)
2036 elif report_type == 'device':
2037 return self.gather_device_report()
2038 elif report_type == 'all':
2039 return {'report': self.compile_report(channels=channels),
2040 'device_report': self.gather_device_report()}
2041 return {}
2042
2043 def self_test(self) -> None:
2044 self.opt_in_all_collections()
2045 report = self.compile_report(channels=ALL_CHANNELS)
2046 if len(report) == 0:
2047 raise RuntimeError('Report is empty')
2048
2049 if 'report_id' not in report:
2050 raise RuntimeError('report_id not found in report')
2051
2052 def shutdown(self) -> None:
2053 self.run = False
2054 self.event.set()
2055
2056 def refresh_health_checks(self) -> None:
2057 health_checks = {}
2058 # TODO do we want to nag also in case the user is not opted-in?
2059 if self.enabled and self.should_nag():
2060 health_checks['TELEMETRY_CHANGED'] = {
2061 'severity': 'warning',
2062 'summary': 'Telemetry requires re-opt-in',
2063 'detail': [
2064 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
2065 ]
2066 }
2067 self.set_health_checks(health_checks)
2068
2069 def serve(self) -> None:
2070 self.load()
2071 self.run = True
2072
2073 self.log.debug('Waiting for mgr to warm up')
2074 time.sleep(10)
2075
2076 while self.run:
2077 self.event.clear()
2078
2079 self.refresh_health_checks()
2080
2081 if not self.is_opted_in():
2082 self.log.debug('Not sending report until user re-opts-in')
2083 self.event.wait(1800)
2084 continue
2085 if not self.enabled:
2086 self.log.debug('Not sending report until configured to do so')
2087 self.event.wait(1800)
2088 continue
2089
2090 now = int(time.time())
2091 if not self.last_upload or \
2092 (now - self.last_upload) > self.interval * 3600:
2093 self.log.info('Compiling and sending report to %s',
2094 self.url)
2095
2096 try:
2097 self.last_report = self.compile_report()
2098 except Exception:
2099 self.log.exception('Exception while compiling report:')
2100
2101 self.send(self.last_report)
2102 else:
2103 self.log.debug('Interval for sending new report has not expired')
2104
2105 sleep = 3600
2106 self.log.debug('Sleeping for %d seconds', sleep)
2107 self.event.wait(sleep)
2108
2109 @staticmethod
2110 def can_run() -> Tuple[bool, str]:
2111 return True, ''