]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
import ceph quincy 17.2.5
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import logging
8 import numbers
9 import enum
10 import errno
11 import hashlib
12 import json
13 import rbd
14 import requests
15 import uuid
16 import time
17 from datetime import datetime, timedelta
18 from prettytable import PrettyTable
19 from threading import Event, Lock
20 from collections import defaultdict
21 from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
22
23 from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
24
25
26 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
27
28 LICENSE = 'sharing-1-0'
29 LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL = 'https://cdla.io/sharing-1-0/'
31 NO_SALT_CNT = 0
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Collection(str, enum.Enum):
63 basic_base = 'basic_base'
64 device_base = 'device_base'
65 crash_base = 'crash_base'
66 ident_base = 'ident_base'
67 perf_perf = 'perf_perf'
68 basic_mds_metadata = 'basic_mds_metadata'
69 basic_pool_usage = 'basic_pool_usage'
70 basic_usage_by_class = 'basic_usage_by_class'
71 basic_rook_v01 = 'basic_rook_v01'
72 perf_memory_metrics = 'perf_memory_metrics'
73
74 MODULE_COLLECTION : List[Dict] = [
75 {
76 "name": Collection.basic_base,
77 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
78 "channel": "basic",
79 "nag": False
80 },
81 {
82 "name": Collection.device_base,
83 "description": "Information about device health metrics",
84 "channel": "device",
85 "nag": False
86 },
87 {
88 "name": Collection.crash_base,
89 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
90 "channel": "crash",
91 "nag": False
92 },
93 {
94 "name": Collection.ident_base,
95 "description": "User-provided identifying information about the cluster",
96 "channel": "ident",
97 "nag": False
98 },
99 {
100 "name": Collection.perf_perf,
101 "description": "Information about performance counters of the cluster",
102 "channel": "perf",
103 "nag": True
104 },
105 {
106 "name": Collection.basic_mds_metadata,
107 "description": "MDS metadata",
108 "channel": "basic",
109 "nag": False
110 },
111 {
112 "name": Collection.basic_pool_usage,
113 "description": "Default pool application and usage statistics",
114 "channel": "basic",
115 "nag": False
116 },
117 {
118 "name": Collection.basic_usage_by_class,
119 "description": "Default device class usage statistics",
120 "channel": "basic",
121 "nag": False
122 },
123 {
124 "name": Collection.basic_rook_v01,
125 "description": "Basic Rook deployment data",
126 "channel": "basic",
127 "nag": True
128 },
129 {
130 "name": Collection.perf_memory_metrics,
131 "description": "Heap stats and mempools for mon and mds",
132 "channel": "perf",
133 "nag": False
134 },
135 ]
136
137 ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
138 # Note: a key cannot be both a node and a leaf, e.g.
139 # "rook/a/b"
140 # "rook/a/b/c"
141 ("rook/version", Collection.basic_rook_v01),
142 ("rook/kubernetes/version", Collection.basic_rook_v01),
143 ("rook/csi/version", Collection.basic_rook_v01),
144 ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
145 ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
146 ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
147 ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
148 ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
149 ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
150 ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
151 ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
152 ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
153 ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
154 ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
155 ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
156 ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
157 ("rook/cluster/mon/count", Collection.basic_rook_v01),
158 ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
159 ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
160 ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
161 ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
162 ("rook/cluster/network/provider", Collection.basic_rook_v01),
163 ("rook/cluster/external-mode", Collection.basic_rook_v01),
164 ]
165
166 class Module(MgrModule):
167 metadata_keys = [
168 "arch",
169 "ceph_version",
170 "os",
171 "cpu",
172 "kernel_description",
173 "kernel_version",
174 "distro_description",
175 "distro"
176 ]
177
178 MODULE_OPTIONS = [
179 Option(name='url',
180 type='str',
181 default='https://telemetry.ceph.com/report'),
182 Option(name='device_url',
183 type='str',
184 default='https://telemetry.ceph.com/device'),
185 Option(name='enabled',
186 type='bool',
187 default=False),
188 Option(name='last_opt_revision',
189 type='int',
190 default=1),
191 Option(name='leaderboard',
192 type='bool',
193 default=False),
194 Option(name='description',
195 type='str',
196 default=None),
197 Option(name='contact',
198 type='str',
199 default=None),
200 Option(name='organization',
201 type='str',
202 default=None),
203 Option(name='proxy',
204 type='str',
205 default=None),
206 Option(name='interval',
207 type='int',
208 default=24,
209 min=8),
210 Option(name='channel_basic',
211 type='bool',
212 default=True,
213 desc='Share basic cluster information (size, version)'),
214 Option(name='channel_ident',
215 type='bool',
216 default=False,
217 desc='Share a user-provided description and/or contact email for the cluster'),
218 Option(name='channel_crash',
219 type='bool',
220 default=True,
221 desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
222 Option(name='channel_device',
223 type='bool',
224 default=True,
225 desc=('Share device health metrics '
226 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
227 Option(name='channel_perf',
228 type='bool',
229 default=False,
230 desc='Share various performance metrics of a cluster'),
231 ]
232
233 @property
234 def config_keys(self) -> Dict[str, OptionValue]:
235 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
236
237 def __init__(self, *args: Any, **kwargs: Any) -> None:
238 super(Module, self).__init__(*args, **kwargs)
239 self.event = Event()
240 self.run = False
241 self.db_collection: Optional[List[str]] = None
242 self.last_opted_in_ceph_version: Optional[int] = None
243 self.last_opted_out_ceph_version: Optional[int] = None
244 self.last_upload: Optional[int] = None
245 self.last_report: Dict[str, Any] = dict()
246 self.report_id: Optional[str] = None
247 self.salt: Optional[str] = None
248 self.get_report_lock = Lock()
249 self.config_update_module_option()
250 # for mypy which does not run the code
251 if TYPE_CHECKING:
252 self.url = ''
253 self.device_url = ''
254 self.enabled = False
255 self.last_opt_revision = 0
256 self.leaderboard = ''
257 self.interval = 0
258 self.proxy = ''
259 self.channel_basic = True
260 self.channel_ident = False
261 self.channel_crash = True
262 self.channel_device = True
263 self.channel_perf = False
264 self.db_collection = ['basic_base', 'device_base']
265 self.last_opted_in_ceph_version = 17
266 self.last_opted_out_ceph_version = 0
267
268 def config_update_module_option(self) -> None:
269 for opt in self.MODULE_OPTIONS:
270 setattr(self,
271 opt['name'],
272 self.get_module_option(opt['name']))
273 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
274
275 def config_notify(self) -> None:
276 self.config_update_module_option()
277 # wake up serve() thread
278 self.event.set()
279
280 def load(self) -> None:
281 last_upload = self.get_store('last_upload', None)
282 if last_upload is None:
283 self.last_upload = None
284 else:
285 self.last_upload = int(last_upload)
286
287 report_id = self.get_store('report_id', None)
288 if report_id is None:
289 self.report_id = str(uuid.uuid4())
290 self.set_store('report_id', self.report_id)
291 else:
292 self.report_id = report_id
293
294 salt = self.get_store('salt', None)
295 if salt is None:
296 self.salt = str(uuid.uuid4())
297 self.set_store('salt', self.salt)
298 else:
299 self.salt = salt
300
301 self.init_collection()
302
303 last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
304 if last_opted_in_ceph_version is None:
305 self.last_opted_in_ceph_version = None
306 else:
307 self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
308
309 last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
310 if last_opted_out_ceph_version is None:
311 self.last_opted_out_ceph_version = None
312 else:
313 self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
314
315 def gather_osd_metadata(self,
316 osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
317 keys = ["osd_objectstore", "rotational"]
318 keys += self.metadata_keys
319
320 metadata: Dict[str, Dict[str, int]] = dict()
321 for key in keys:
322 metadata[key] = defaultdict(int)
323
324 for osd in osd_map['osds']:
325 res = self.get_metadata('osd', str(osd['osd']))
326 if res is None:
327 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
328 continue
329 for k, v in res.items():
330 if k not in keys:
331 continue
332
333 metadata[k][v] += 1
334
335 return metadata
336
337 def gather_mon_metadata(self,
338 mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
339 keys = list()
340 keys += self.metadata_keys
341
342 metadata: Dict[str, Dict[str, int]] = dict()
343 for key in keys:
344 metadata[key] = defaultdict(int)
345
346 for mon in mon_map['mons']:
347 res = self.get_metadata('mon', mon['name'])
348 if res is None:
349 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
350 continue
351 for k, v in res.items():
352 if k not in keys:
353 continue
354
355 metadata[k][v] += 1
356
357 return metadata
358
359 def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
360 metadata: Dict[str, Dict[str, int]] = dict()
361
362 res = self.get('mds_metadata') # metadata of *all* mds daemons
363 if res is None or not res:
364 self.log.debug('Could not get metadata for mds daemons')
365 return metadata
366
367 keys = list()
368 keys += self.metadata_keys
369
370 for key in keys:
371 metadata[key] = defaultdict(int)
372
373 for mds in res.values():
374 for k, v in mds.items():
375 if k not in keys:
376 continue
377
378 metadata[k][v] += 1
379
380 return metadata
381
382 def gather_crush_info(self) -> Dict[str, Union[int,
383 bool,
384 List[int],
385 Dict[str, int],
386 Dict[int, int]]]:
387 osdmap = self.get_osdmap()
388 crush_raw = osdmap.get_crush()
389 crush = crush_raw.dump()
390
391 BucketKeyT = TypeVar('BucketKeyT', int, str)
392
393 def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
394 if k in d:
395 d[k] += 1
396 else:
397 d[k] = 1
398
399 device_classes: Dict[str, int] = {}
400 for dev in crush['devices']:
401 inc(device_classes, dev.get('class', ''))
402
403 bucket_algs: Dict[str, int] = {}
404 bucket_types: Dict[str, int] = {}
405 bucket_sizes: Dict[int, int] = {}
406 for bucket in crush['buckets']:
407 if '~' in bucket['name']: # ignore shadow buckets
408 continue
409 inc(bucket_algs, bucket['alg'])
410 inc(bucket_types, bucket['type_id'])
411 inc(bucket_sizes, len(bucket['items']))
412
413 return {
414 'num_devices': len(crush['devices']),
415 'num_types': len(crush['types']),
416 'num_buckets': len(crush['buckets']),
417 'num_rules': len(crush['rules']),
418 'device_classes': list(device_classes.values()),
419 'tunables': crush['tunables'],
420 'compat_weight_set': '-1' in crush['choose_args'],
421 'num_weight_sets': len(crush['choose_args']),
422 'bucket_algs': bucket_algs,
423 'bucket_sizes': bucket_sizes,
424 'bucket_types': bucket_types,
425 }
426
427 def gather_configs(self) -> Dict[str, List[str]]:
428 # cluster config options
429 cluster = set()
430 r, outb, outs = self.mon_command({
431 'prefix': 'config dump',
432 'format': 'json'
433 })
434 if r != 0:
435 return {}
436 try:
437 dump = json.loads(outb)
438 except json.decoder.JSONDecodeError:
439 return {}
440 for opt in dump:
441 name = opt.get('name')
442 if name:
443 cluster.add(name)
444 # daemon-reported options (which may include ceph.conf)
445 active = set()
446 ls = self.get("modified_config_options")
447 for opt in ls.get('options', {}):
448 active.add(opt)
449 return {
450 'cluster_changed': sorted(list(cluster)),
451 'active_changed': sorted(list(active)),
452 }
453
454 def anonymize_entity_name(self, entity_name:str) -> str:
455 if '.' not in entity_name:
456 self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
457 return entity_name
458
459 (etype, eid) = entity_name.split('.', 1)
460 m = hashlib.sha1()
461 salt = ''
462 if self.salt is not None:
463 salt = self.salt
464 # avoid asserting that salt exists
465 if not self.salt:
466 # do not set self.salt to a temp value
467 salt = f"no_salt_found_{NO_SALT_CNT}"
468 NO_SALT_CNT += 1
469 self.log.debug(f"No salt found, created a temp one: {salt}")
470 m.update(salt.encode('utf-8'))
471 m.update(eid.encode('utf-8'))
472 m.update(salt.encode('utf-8'))
473
474 return etype + '.' + m.hexdigest()
475
476 def get_heap_stats(self) -> Dict[str, dict]:
477 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
478 anonymized_daemons = {}
479 osd_map = self.get('osd_map')
480
481 # Combine available daemons
482 daemons = []
483 for osd in osd_map['osds']:
484 daemons.append('osd'+'.'+str(osd['osd']))
485 # perf_memory_metrics collection (1/2)
486 if self.is_enabled_collection(Collection.perf_memory_metrics):
487 mon_map = self.get('mon_map')
488 mds_metadata = self.get('mds_metadata')
489 for mon in mon_map['mons']:
490 daemons.append('mon'+'.'+mon['name'])
491 for mds in mds_metadata:
492 daemons.append('mds'+'.'+mds)
493
494 # Grab output from the "daemon.x heap stats" command
495 for daemon in daemons:
496 daemon_type, daemon_id = daemon.split('.', 1)
497 heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
498 if heap_stats:
499 if (daemon_type != 'osd'):
500 # Anonymize mon and mds
501 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
502 daemon = anonymized_daemons[daemon]
503 result[daemon_type][daemon] = heap_stats
504 else:
505 continue
506
507 if anonymized_daemons:
508 # for debugging purposes only, this data is never reported
509 self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
510 return result
511
512 def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
513 parsed_output = {}
514
515 cmd_dict = {
516 'prefix': 'heap',
517 'heapcmd': 'stats'
518 }
519 r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
520
521 if r != 0:
522 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
523 else:
524 if 'tcmalloc heap stats' in outb:
525 values = [int(i) for i in outb.split() if i.isdigit()]
526 # `categories` must be ordered this way for the correct output to be parsed
527 categories = ['use_by_application',
528 'page_heap_freelist',
529 'central_cache_freelist',
530 'transfer_cache_freelist',
531 'thread_cache_freelists',
532 'malloc_metadata',
533 'actual_memory_used',
534 'released_to_os',
535 'virtual_address_space_used',
536 'spans_in_use',
537 'thread_heaps_in_use',
538 'tcmalloc_page_size']
539 if len(values) != len(categories):
540 self.log.error('Received unexpected output from {}.{}; ' \
541 'number of values should match the number' \
542 'of expected categories:\n values: len={} {} '\
543 '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
544 else:
545 parsed_output = dict(zip(categories, values))
546 else:
547 self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
548
549 return parsed_output
550
551 def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
552 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
553 anonymized_daemons = {}
554 osd_map = self.get('osd_map')
555
556 # Combine available daemons
557 daemons = []
558 for osd in osd_map['osds']:
559 daemons.append('osd'+'.'+str(osd['osd']))
560 # perf_memory_metrics collection (2/2)
561 if self.is_enabled_collection(Collection.perf_memory_metrics):
562 mon_map = self.get('mon_map')
563 mds_metadata = self.get('mds_metadata')
564 for mon in mon_map['mons']:
565 daemons.append('mon'+'.'+mon['name'])
566 for mds in mds_metadata:
567 daemons.append('mds'+'.'+mds)
568
569 # Grab output from the "dump_mempools" command
570 for daemon in daemons:
571 daemon_type, daemon_id = daemon.split('.', 1)
572 cmd_dict = {
573 'prefix': 'dump_mempools',
574 'format': 'json'
575 }
576 r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
577 if r != 0:
578 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
579 continue
580 else:
581 try:
582 # This is where the mempool will land.
583 dump = json.loads(outb)
584 if mode == 'separated':
585 # Anonymize mon and mds
586 if daemon_type != 'osd':
587 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
588 daemon = anonymized_daemons[daemon]
589 result[daemon_type][daemon] = dump['mempool']['by_pool']
590 elif mode == 'aggregated':
591 for mem_type in dump['mempool']['by_pool']:
592 result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
593 result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
594 else:
595 self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
596 except (json.decoder.JSONDecodeError, KeyError) as e:
597 self.log.error("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
598 continue
599
600 if anonymized_daemons:
601 # for debugging purposes only, this data is never reported
602 self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
603
604 return result
605
606 def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
607 # Initialize result dict
608 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
609 lambda: defaultdict(
610 lambda: defaultdict(
611 lambda: defaultdict(
612 lambda: defaultdict(int))))))
613
614 # Get list of osd ids from the metadata
615 osd_metadata = self.get('osd_metadata')
616
617 # Grab output from the "osd.x perf histogram dump" command
618 for osd_id in osd_metadata:
619 cmd_dict = {
620 'prefix': 'perf histogram dump',
621 'id': str(osd_id),
622 'format': 'json'
623 }
624 r, outb, outs = self.osd_command(cmd_dict)
625 # Check for invalid calls
626 if r != 0:
627 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
628 continue
629 else:
630 try:
631 # This is where the histograms will land if there are any.
632 dump = json.loads(outb)
633
634 for histogram in dump['osd']:
635 # Log axis information. There are two axes, each represented
636 # as a dictionary. Both dictionaries are contained inside a
637 # list called 'axes'.
638 axes = []
639 for axis in dump['osd'][histogram]['axes']:
640
641 # This is the dict that contains information for an individual
642 # axis. It will be appended to the 'axes' list at the end.
643 axis_dict: Dict[str, Any] = defaultdict()
644
645 # Collecting information for buckets, min, name, etc.
646 axis_dict['buckets'] = axis['buckets']
647 axis_dict['min'] = axis['min']
648 axis_dict['name'] = axis['name']
649 axis_dict['quant_size'] = axis['quant_size']
650 axis_dict['scale_type'] = axis['scale_type']
651
652 # Collecting ranges; placing them in lists to
653 # improve readability later on.
654 ranges = []
655 for _range in axis['ranges']:
656 _max, _min = None, None
657 if 'max' in _range:
658 _max = _range['max']
659 if 'min' in _range:
660 _min = _range['min']
661 ranges.append([_min, _max])
662 axis_dict['ranges'] = ranges
663
664 # Now that 'axis_dict' contains all the appropriate
665 # information for the current axis, append it to the 'axes' list.
666 # There will end up being two axes in the 'axes' list, since the
667 # histograms are 2D.
668 axes.append(axis_dict)
669
670 # Add the 'axes' list, containing both axes, to result.
671 # At this point, you will see that the name of the key is the string
672 # form of our axes list (str(axes)). This is there so that histograms
673 # with different axis configs will not be combined.
674 # These key names are later dropped when only the values are returned.
675 result[str(axes)][histogram]['axes'] = axes
676
677 # Collect current values and make sure they are in
678 # integer form.
679 values = []
680 for value_list in dump['osd'][histogram]['values']:
681 values.append([int(v) for v in value_list])
682
683 if mode == 'separated':
684 if 'osds' not in result[str(axes)][histogram]:
685 result[str(axes)][histogram]['osds'] = []
686 result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
687
688 elif mode == 'aggregated':
689 # Aggregate values. If 'values' have already been initialized,
690 # we can safely add.
691 if 'values' in result[str(axes)][histogram]:
692 for i in range (0, len(values)):
693 for j in range (0, len(values[i])):
694 values[i][j] += result[str(axes)][histogram]['values'][i][j]
695
696 # Add the values to result.
697 result[str(axes)][histogram]['values'] = values
698
699 # Update num_combined_osds
700 if 'num_combined_osds' not in result[str(axes)][histogram]:
701 result[str(axes)][histogram]['num_combined_osds'] = 1
702 else:
703 result[str(axes)][histogram]['num_combined_osds'] += 1
704 else:
705 self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
706 return list()
707
708 # Sometimes, json errors occur if you give it an empty string.
709 # I am also putting in a catch for a KeyError since it could
710 # happen where the code is assuming that a key exists in the
711 # schema when it doesn't. In either case, we'll handle that
712 # by continuing and collecting what we can from other osds.
713 except (json.decoder.JSONDecodeError, KeyError) as e:
714 self.log.error("Error caught on osd.{}: {}".format(osd_id, e))
715 continue
716
717 return list(result.values())
718
719 def get_io_rate(self) -> dict:
720 return self.get('io_rate')
721
722 def get_stats_per_pool(self) -> dict:
723 result = self.get('pg_dump')['pool_stats']
724
725 # collect application metadata from osd_map
726 osd_map = self.get('osd_map')
727 application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
728
729 # add application to each pool from pg_dump
730 for pool in result:
731 pool['application'] = []
732 # Only include default applications
733 for application in application_metadata[pool['poolid']]:
734 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
735 pool['application'].append(application)
736
737 return result
738
739 def get_stats_per_pg(self) -> dict:
740 return self.get('pg_dump')['pg_stats']
741
742 def get_rocksdb_stats(self) -> Dict[str, str]:
743 # Initalizers
744 result: Dict[str, str] = defaultdict()
745 version = self.get_rocksdb_version()
746
747 # Update result
748 result['version'] = version
749
750 return result
751
752 def gather_crashinfo(self) -> List[Dict[str, str]]:
753 crashlist: List[Dict[str, str]] = list()
754 errno, crashids, err = self.remote('crash', 'ls')
755 if errno:
756 return crashlist
757 for crashid in crashids.split():
758 errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
759 if errno:
760 continue
761 c = json.loads(crashinfo)
762
763 # redact hostname
764 del c['utsname_hostname']
765
766 # entity_name might have more than one '.', beware
767 (etype, eid) = c.get('entity_name', '').split('.', 1)
768 m = hashlib.sha1()
769 assert self.salt
770 m.update(self.salt.encode('utf-8'))
771 m.update(eid.encode('utf-8'))
772 m.update(self.salt.encode('utf-8'))
773 c['entity_name'] = etype + '.' + m.hexdigest()
774
775 # redact final line of python tracebacks, as the exception
776 # payload may contain identifying information
777 if 'mgr_module' in c and 'backtrace' in c:
778 # backtrace might be empty
779 if len(c['backtrace']) > 0:
780 c['backtrace'][-1] = '<redacted>'
781
782 crashlist.append(c)
783 return crashlist
784
785 def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
786 # Extract perf counter data with get_all_perf_counters(), a method
787 # from mgr/mgr_module.py. This method returns a nested dictionary that
788 # looks a lot like perf schema, except with some additional fields.
789 #
790 # Example of output, a snapshot of a mon daemon:
791 # "mon.b": {
792 # "bluestore.kv_flush_lat": {
793 # "count": 2431,
794 # "description": "Average kv_thread flush latency",
795 # "nick": "fl_l",
796 # "priority": 8,
797 # "type": 5,
798 # "units": 1,
799 # "value": 88814109
800 # },
801 # },
802 all_perf_counters = self.get_all_perf_counters()
803
804 # Initialize 'result' dict
805 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
806 lambda: defaultdict(lambda: defaultdict(int))))
807
808 # 'separated' mode
809 anonymized_daemon_dict = {}
810
811 for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
812 daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
813
814 if mode == 'separated':
815 # anonymize individual daemon names except osds
816 if (daemon_type != 'osd'):
817 anonymized_daemon = self.anonymize_entity_name(daemon)
818 anonymized_daemon_dict[anonymized_daemon] = daemon
819 daemon = anonymized_daemon
820
821 # Calculate num combined daemon types if in aggregated mode
822 if mode == 'aggregated':
823 if 'num_combined_daemons' not in result[daemon_type]:
824 result[daemon_type]['num_combined_daemons'] = 1
825 else:
826 result[daemon_type]['num_combined_daemons'] += 1
827
828 for collection in all_perf_counters_by_daemon:
829 # Split the collection to avoid redundancy in final report; i.e.:
830 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
831 # bluestore: kv_flush_lat, kv_final_lat
832 col_0, col_1 = collection.split('.')
833
834 # Debug log for empty keys. This initially was a problem for prioritycache
835 # perf counters, where the col_0 was empty for certain mon counters:
836 #
837 # "mon.a": { instead of "mon.a": {
838 # "": { "prioritycache": {
839 # "cache_bytes": {...}, "cache_bytes": {...},
840 #
841 # This log is here to detect any future instances of a similar issue.
842 if (daemon == "") or (col_0 == "") or (col_1 == ""):
843 self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
844
845 if mode == 'separated':
846 # Add value to result
847 result[daemon][col_0][col_1]['value'] = \
848 all_perf_counters_by_daemon[collection]['value']
849
850 # Check that 'count' exists, as not all counters have a count field.
851 if 'count' in all_perf_counters_by_daemon[collection]:
852 result[daemon][col_0][col_1]['count'] = \
853 all_perf_counters_by_daemon[collection]['count']
854 elif mode == 'aggregated':
855 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
856 # has a uniquely-named collection that starts off identically (i.e.
857 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
858 # This bit of code combines these unique counters all under one rgw instance.
859 # Without this check, the schema would remain separeted out in the final report.
860 if col_0[0:11] == "objecter-0x":
861 col_0 = "objecter-0x"
862
863 # Check that the value can be incremented. In some cases,
864 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
865 # In those cases, the value is a dictionary, and not a number.
866 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
867 if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
868 result[daemon_type][col_0][col_1]['value'] += \
869 all_perf_counters_by_daemon[collection]['value']
870
871 # Check that 'count' exists, as not all counters have a count field.
872 if 'count' in all_perf_counters_by_daemon[collection]:
873 result[daemon_type][col_0][col_1]['count'] += \
874 all_perf_counters_by_daemon[collection]['count']
875 else:
876 self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
877 return {}
878
879 if mode == 'separated':
880 # for debugging purposes only, this data is never reported
881 self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
882
883 return result
884
885 def get_active_channels(self) -> List[str]:
886 r = []
887 if self.channel_basic:
888 r.append('basic')
889 if self.channel_crash:
890 r.append('crash')
891 if self.channel_device:
892 r.append('device')
893 if self.channel_ident:
894 r.append('ident')
895 if self.channel_perf:
896 r.append('perf')
897 return r
898
899 def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
900 try:
901 time_format = self.remote('devicehealth', 'get_time_format')
902 except Exception as e:
903 self.log.debug('Unable to format time: {}'.format(e))
904 return {}
905 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
906 min_sample = cutoff.strftime(time_format)
907
908 devices = self.get('devices')['devices']
909 if not devices:
910 self.log.debug('Unable to get device info from the mgr.')
911 return {}
912
913 # anon-host-id -> anon-devid -> { timestamp -> record }
914 res: Dict[str, Dict[str, Dict[str, str]]] = {}
915 for d in devices:
916 devid = d['devid']
917 try:
918 # this is a map of stamp -> {device info}
919 m = self.remote('devicehealth', 'get_recent_device_metrics',
920 devid, min_sample)
921 except Exception as e:
922 self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
923 continue
924
925 # anonymize host id
926 try:
927 host = d['location'][0]['host']
928 except (KeyError, IndexError) as e:
929 self.log.error('Unable to get host from device with id "{}": {}'.format(devid, e))
930 continue
931 anon_host = self.get_store('host-id/%s' % host)
932 if not anon_host:
933 anon_host = str(uuid.uuid1())
934 self.set_store('host-id/%s' % host, anon_host)
935 serial = None
936 for dev, rep in m.items():
937 rep['host_id'] = anon_host
938 if serial is None and 'serial_number' in rep:
939 serial = rep['serial_number']
940
941 # anonymize device id
942 anon_devid = self.get_store('devid-id/%s' % devid)
943 if not anon_devid:
944 # ideally devid is 'vendor_model_serial',
945 # but can also be 'model_serial', 'serial'
946 if '_' in devid:
947 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
948 else:
949 anon_devid = str(uuid.uuid1())
950 self.set_store('devid-id/%s' % devid, anon_devid)
951 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
952 host, anon_host))
953
954 # anonymize the smartctl report itself
955 if serial:
956 m_str = json.dumps(m)
957 m = json.loads(m_str.replace(serial, 'deleted'))
958
959 if anon_host not in res:
960 res[anon_host] = {}
961 res[anon_host][anon_devid] = m
962 return res
963
964 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
965 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
966 if data:
967 return data[-1][1]
968 else:
969 return 0
970
971 def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
972 if not channels:
973 channels = self.get_active_channels()
974 report = {
975 'leaderboard': self.leaderboard,
976 'report_version': 1,
977 'report_timestamp': datetime.utcnow().isoformat(),
978 'report_id': self.report_id,
979 'channels': channels,
980 'channels_available': ALL_CHANNELS,
981 'license': LICENSE,
982 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
983 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
984 }
985
986 if 'ident' in channels:
987 for option in ['description', 'contact', 'organization']:
988 report[option] = getattr(self, option)
989
990 if 'basic' in channels:
991 mon_map = self.get('mon_map')
992 osd_map = self.get('osd_map')
993 service_map = self.get('service_map')
994 fs_map = self.get('fs_map')
995 df = self.get('df')
996 df_pools = {pool['id']: pool for pool in df['pools']}
997
998 report['created'] = mon_map['created']
999
1000 # mons
1001 v1_mons = 0
1002 v2_mons = 0
1003 ipv4_mons = 0
1004 ipv6_mons = 0
1005 for mon in mon_map['mons']:
1006 for a in mon['public_addrs']['addrvec']:
1007 if a['type'] == 'v2':
1008 v2_mons += 1
1009 elif a['type'] == 'v1':
1010 v1_mons += 1
1011 if a['addr'].startswith('['):
1012 ipv6_mons += 1
1013 else:
1014 ipv4_mons += 1
1015 report['mon'] = {
1016 'count': len(mon_map['mons']),
1017 'features': mon_map['features'],
1018 'min_mon_release': mon_map['min_mon_release'],
1019 'v1_addr_mons': v1_mons,
1020 'v2_addr_mons': v2_mons,
1021 'ipv4_addr_mons': ipv4_mons,
1022 'ipv6_addr_mons': ipv6_mons,
1023 }
1024
1025 report['config'] = self.gather_configs()
1026
1027 # pools
1028
1029 rbd_num_pools = 0
1030 rbd_num_images_by_pool = []
1031 rbd_mirroring_by_pool = []
1032 num_pg = 0
1033 report['pools'] = list()
1034 for pool in osd_map['pools']:
1035 num_pg += pool['pg_num']
1036 ec_profile = {}
1037 if pool['erasure_code_profile']:
1038 orig = osd_map['erasure_code_profiles'].get(
1039 pool['erasure_code_profile'], {})
1040 ec_profile = {
1041 k: orig[k] for k in orig.keys()
1042 if k in ['k', 'm', 'plugin', 'technique',
1043 'crush-failure-domain', 'l']
1044 }
1045 pool_data = {
1046 'pool': pool['pool'],
1047 'pg_num': pool['pg_num'],
1048 'pgp_num': pool['pg_placement_num'],
1049 'size': pool['size'],
1050 'min_size': pool['min_size'],
1051 'pg_autoscale_mode': pool['pg_autoscale_mode'],
1052 'target_max_bytes': pool['target_max_bytes'],
1053 'target_max_objects': pool['target_max_objects'],
1054 'type': ['', 'replicated', '', 'erasure'][pool['type']],
1055 'erasure_code_profile': ec_profile,
1056 'cache_mode': pool['cache_mode'],
1057 }
1058
1059 # basic_pool_usage collection
1060 if self.is_enabled_collection(Collection.basic_pool_usage):
1061 pool_data['application'] = []
1062 for application in pool['application_metadata']:
1063 # Only include default applications
1064 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
1065 pool_data['application'].append(application)
1066 pool_stats = df_pools[pool['pool']]['stats']
1067 pool_data['stats'] = { # filter out kb_used
1068 'avail_raw': pool_stats['avail_raw'],
1069 'bytes_used': pool_stats['bytes_used'],
1070 'compress_bytes_used': pool_stats['compress_bytes_used'],
1071 'compress_under_bytes': pool_stats['compress_under_bytes'],
1072 'data_bytes_used': pool_stats['data_bytes_used'],
1073 'dirty': pool_stats['dirty'],
1074 'max_avail': pool_stats['max_avail'],
1075 'objects': pool_stats['objects'],
1076 'omap_bytes_used': pool_stats['omap_bytes_used'],
1077 'percent_used': pool_stats['percent_used'],
1078 'quota_bytes': pool_stats['quota_bytes'],
1079 'quota_objects': pool_stats['quota_objects'],
1080 'rd': pool_stats['rd'],
1081 'rd_bytes': pool_stats['rd_bytes'],
1082 'stored': pool_stats['stored'],
1083 'stored_data': pool_stats['stored_data'],
1084 'stored_omap': pool_stats['stored_omap'],
1085 'stored_raw': pool_stats['stored_raw'],
1086 'wr': pool_stats['wr'],
1087 'wr_bytes': pool_stats['wr_bytes']
1088 }
1089
1090 cast(List[Dict[str, Any]], report['pools']).append(pool_data)
1091 if 'rbd' in pool['application_metadata']:
1092 rbd_num_pools += 1
1093 ioctx = self.rados.open_ioctx(pool['pool_name'])
1094 rbd_num_images_by_pool.append(
1095 sum(1 for _ in rbd.RBD().list2(ioctx)))
1096 rbd_mirroring_by_pool.append(
1097 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
1098 report['rbd'] = {
1099 'num_pools': rbd_num_pools,
1100 'num_images_by_pool': rbd_num_images_by_pool,
1101 'mirroring_by_pool': rbd_mirroring_by_pool}
1102
1103 # osds
1104 cluster_network = False
1105 for osd in osd_map['osds']:
1106 if osd['up'] and not cluster_network:
1107 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
1108 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
1109 if front_ip != back_ip:
1110 cluster_network = True
1111 report['osd'] = {
1112 'count': len(osd_map['osds']),
1113 'require_osd_release': osd_map['require_osd_release'],
1114 'require_min_compat_client': osd_map['require_min_compat_client'],
1115 'cluster_network': cluster_network,
1116 }
1117
1118 # crush
1119 report['crush'] = self.gather_crush_info()
1120
1121 # cephfs
1122 report['fs'] = {
1123 'count': len(fs_map['filesystems']),
1124 'feature_flags': fs_map['feature_flags'],
1125 'num_standby_mds': len(fs_map['standbys']),
1126 'filesystems': [],
1127 }
1128 num_mds = len(fs_map['standbys'])
1129 for fsm in fs_map['filesystems']:
1130 fs = fsm['mdsmap']
1131 num_sessions = 0
1132 cached_ino = 0
1133 cached_dn = 0
1134 cached_cap = 0
1135 subtrees = 0
1136 rfiles = 0
1137 rbytes = 0
1138 rsnaps = 0
1139 for gid, mds in fs['info'].items():
1140 num_sessions += self.get_latest('mds', mds['name'],
1141 'mds_sessions.session_count')
1142 cached_ino += self.get_latest('mds', mds['name'],
1143 'mds_mem.ino')
1144 cached_dn += self.get_latest('mds', mds['name'],
1145 'mds_mem.dn')
1146 cached_cap += self.get_latest('mds', mds['name'],
1147 'mds_mem.cap')
1148 subtrees += self.get_latest('mds', mds['name'],
1149 'mds.subtrees')
1150 if mds['rank'] == 0:
1151 rfiles = self.get_latest('mds', mds['name'],
1152 'mds.root_rfiles')
1153 rbytes = self.get_latest('mds', mds['name'],
1154 'mds.root_rbytes')
1155 rsnaps = self.get_latest('mds', mds['name'],
1156 'mds.root_rsnaps')
1157 report['fs']['filesystems'].append({ # type: ignore
1158 'max_mds': fs['max_mds'],
1159 'ever_allowed_features': fs['ever_allowed_features'],
1160 'explicitly_allowed_features': fs['explicitly_allowed_features'],
1161 'num_in': len(fs['in']),
1162 'num_up': len(fs['up']),
1163 'num_standby_replay': len(
1164 [mds for gid, mds in fs['info'].items()
1165 if mds['state'] == 'up:standby-replay']),
1166 'num_mds': len(fs['info']),
1167 'num_sessions': num_sessions,
1168 'cached_inos': cached_ino,
1169 'cached_dns': cached_dn,
1170 'cached_caps': cached_cap,
1171 'cached_subtrees': subtrees,
1172 'balancer_enabled': len(fs['balancer']) > 0,
1173 'num_data_pools': len(fs['data_pools']),
1174 'standby_count_wanted': fs['standby_count_wanted'],
1175 'approx_ctime': fs['created'][0:7],
1176 'files': rfiles,
1177 'bytes': rbytes,
1178 'snaps': rsnaps,
1179 })
1180 num_mds += len(fs['info'])
1181 report['fs']['total_num_mds'] = num_mds # type: ignore
1182
1183 # daemons
1184 report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
1185 mon=self.gather_mon_metadata(mon_map))
1186
1187 if self.is_enabled_collection(Collection.basic_mds_metadata):
1188 report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
1189
1190 # host counts
1191 servers = self.list_servers()
1192 self.log.debug('servers %s' % servers)
1193 hosts = {
1194 'num': len([h for h in servers if h['hostname']]),
1195 }
1196 for t in ['mon', 'mds', 'osd', 'mgr']:
1197 nr_services = sum(1 for host in servers if
1198 any(service for service in cast(List[ServiceInfoT],
1199 host['services'])
1200 if service['type'] == t))
1201 hosts['num_with_' + t] = nr_services
1202 report['hosts'] = hosts
1203
1204 report['usage'] = {
1205 'pools': len(df['pools']),
1206 'pg_num': num_pg,
1207 'total_used_bytes': df['stats']['total_used_bytes'],
1208 'total_bytes': df['stats']['total_bytes'],
1209 'total_avail_bytes': df['stats']['total_avail_bytes']
1210 }
1211 # basic_usage_by_class collection
1212 if self.is_enabled_collection(Collection.basic_usage_by_class):
1213 report['usage']['stats_by_class'] = {} # type: ignore
1214 for device_class in df['stats_by_class']:
1215 if device_class in ['hdd', 'ssd', 'nvme']:
1216 report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
1217
1218 services: DefaultDict[str, int] = defaultdict(int)
1219 for key, value in service_map['services'].items():
1220 services[key] += 1
1221 if key == 'rgw':
1222 rgw = {}
1223 zones = set()
1224 zonegroups = set()
1225 frontends = set()
1226 count = 0
1227 d = value.get('daemons', dict())
1228 for k, v in d.items():
1229 if k == 'summary' and v:
1230 rgw[k] = v
1231 elif isinstance(v, dict) and 'metadata' in v:
1232 count += 1
1233 zones.add(v['metadata']['zone_id'])
1234 zonegroups.add(v['metadata']['zonegroup_id'])
1235 frontends.add(v['metadata']['frontend_type#0'])
1236
1237 # we could actually iterate over all the keys of
1238 # the dict and check for how many frontends there
1239 # are, but it is unlikely that one would be running
1240 # more than 2 supported ones
1241 f2 = v['metadata'].get('frontend_type#1', None)
1242 if f2:
1243 frontends.add(f2)
1244
1245 rgw['count'] = count
1246 rgw['zones'] = len(zones)
1247 rgw['zonegroups'] = len(zonegroups)
1248 rgw['frontends'] = list(frontends) # sets aren't json-serializable
1249 report['rgw'] = rgw
1250 report['services'] = services
1251
1252 try:
1253 report['balancer'] = self.remote('balancer', 'gather_telemetry')
1254 except ImportError:
1255 report['balancer'] = {
1256 'active': False
1257 }
1258
1259 # Rook
1260 self.get_rook_data(report)
1261
1262 if 'crash' in channels:
1263 report['crashes'] = self.gather_crashinfo()
1264
1265 if 'perf' in channels:
1266 if self.is_enabled_collection(Collection.perf_perf):
1267 report['perf_counters'] = self.gather_perf_counters('separated')
1268 report['stats_per_pool'] = self.get_stats_per_pool()
1269 report['stats_per_pg'] = self.get_stats_per_pg()
1270 report['io_rate'] = self.get_io_rate()
1271 report['osd_perf_histograms'] = self.get_osd_histograms('separated')
1272 report['mempool'] = self.get_mempool('separated')
1273 report['heap_stats'] = self.get_heap_stats()
1274 report['rocksdb_stats'] = self.get_rocksdb_stats()
1275
1276 # NOTE: We do not include the 'device' channel in this report; it is
1277 # sent to a different endpoint.
1278
1279 return report
1280
1281 def get_rook_data(self, report: Dict[str, object]) -> None:
1282 r, outb, outs = self.mon_command({
1283 'prefix': 'config-key dump',
1284 'format': 'json'
1285 })
1286 if r != 0:
1287 return
1288 try:
1289 config_kv_dump = json.loads(outb)
1290 except json.decoder.JSONDecodeError:
1291 return
1292
1293 for elem in ROOK_KEYS_BY_COLLECTION:
1294 # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
1295 # elem[1] is the Collection this key belongs to
1296 if self.is_enabled_collection(elem[1]):
1297 self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
1298
1299 def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
1300 last_node = key_path.split('/')[-1]
1301 for node in key_path.split('/')[0:-1]:
1302 if node not in report:
1303 report[node] = {}
1304 report = report[node] # type: ignore
1305
1306 # sanity check of keys correctness
1307 if not isinstance(report, dict):
1308 self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
1309 return
1310
1311 if last_node in report:
1312 self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
1313 return
1314
1315 report[last_node] = value
1316
1317 def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
1318 self.log.info('Sending %s to: %s' % (what, url))
1319 proxies = dict()
1320 if self.proxy:
1321 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
1322 proxies['http'] = self.proxy
1323 proxies['https'] = self.proxy
1324 try:
1325 resp = requests.put(url=url, json=report, proxies=proxies)
1326 resp.raise_for_status()
1327 except Exception as e:
1328 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
1329 self.log.error(fail_reason)
1330 return fail_reason
1331 return None
1332
1333 class EndPoint(enum.Enum):
1334 ceph = 'ceph'
1335 device = 'device'
1336
1337 def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
1338 '''
1339 Find collections that are available in the module, but are not in the db
1340 '''
1341 if self.db_collection is None:
1342 return None
1343
1344 if not channels:
1345 channels = ALL_CHANNELS
1346 else:
1347 for ch in channels:
1348 if ch not in ALL_CHANNELS:
1349 self.log.debug(f"invalid channel name: {ch}")
1350 return None
1351
1352 new_collection : List[Collection] = []
1353
1354 for c in MODULE_COLLECTION:
1355 if c['name'].name not in self.db_collection:
1356 if c['channel'] in channels:
1357 new_collection.append(c['name'])
1358
1359 return new_collection
1360
1361 def is_major_upgrade(self) -> bool:
1362 '''
1363 Returns True only if the user last opted-in to an older major
1364 '''
1365 if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
1366 # we do not know what Ceph version was when the user last opted-in,
1367 # thus we do not wish to nag in case of a major upgrade
1368 return False
1369
1370 mon_map = self.get('mon_map')
1371 mon_min = mon_map.get("min_mon_release", 0)
1372
1373 if mon_min - self.last_opted_in_ceph_version > 0:
1374 self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1375 return True
1376
1377 return False
1378
1379 def is_opted_in(self) -> bool:
1380 # If len is 0 it means that the user is either opted-out (never
1381 # opted-in, or invoked `telemetry off`), or they upgraded from a
1382 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1383 # regardless, hence is considered as opted-out
1384 if self.db_collection is None:
1385 return False
1386 return len(self.db_collection) > 0
1387
1388 def should_nag(self) -> bool:
1389 # Find delta between opted-in collections and module collections;
1390 # nag only if module has a collection which is not in db, and nag == True.
1391
1392 # We currently do not nag if the user is opted-out (or never opted-in).
1393 # If we wish to do this in the future, we need to have a tri-mode state
1394 # (opted in, opted out, no action yet), and it needs to be guarded by a
1395 # config option (so that nagging can be turned off via config).
1396 # We also need to add a last_opted_out_ceph_version variable, for the
1397 # major upgrade check.
1398
1399 # check if there are collections the user is not opt-in to
1400 # that we should nag about
1401 if self.db_collection is not None:
1402 for c in MODULE_COLLECTION:
1403 if c['name'].name not in self.db_collection:
1404 if c['nag'] == True:
1405 self.log.debug(f"The collection: {c['name']} is not reported")
1406 return True
1407
1408 # user might be opted-in to the most recent collection, or there is no
1409 # new collection which requires nagging about; thus nag in case it's a
1410 # major upgrade and there are new collections
1411 # (which their own nag == False):
1412 new_collections = False
1413 col_delta = self.collection_delta()
1414 if col_delta is not None and len(col_delta) > 0:
1415 new_collections = True
1416
1417 return self.is_major_upgrade() and new_collections
1418
1419 def init_collection(self) -> None:
1420 # We fetch from db the collections the user had already opted-in to.
1421 # During the transition the results will be empty, but the user might
1422 # be opted-in to an older version (e.g. revision = 3)
1423
1424 collection = self.get_store('collection')
1425
1426 if collection is not None:
1427 self.db_collection = json.loads(collection)
1428
1429 if self.db_collection is None:
1430 # happens once on upgrade
1431 if not self.enabled:
1432 # user is not opted-in
1433 self.set_store('collection', json.dumps([]))
1434 self.log.debug("user is not opted-in")
1435 else:
1436 # user is opted-in, verify the revision:
1437 if self.last_opt_revision == REVISION:
1438 self.log.debug(f"telemetry revision is {REVISION}")
1439 base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
1440 self.set_store('collection', json.dumps(base_collection))
1441 else:
1442 # user is opted-in to an older version, meaning they need
1443 # to re-opt in regardless
1444 self.set_store('collection', json.dumps([]))
1445 self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1446
1447 # reload collection after setting
1448 collection = self.get_store('collection')
1449 if collection is not None:
1450 self.db_collection = json.loads(collection)
1451 else:
1452 raise RuntimeError('collection is None after initial setting')
1453 else:
1454 # user has already upgraded
1455 self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
1456
1457 def is_enabled_collection(self, collection: Collection) -> bool:
1458 if self.db_collection is None:
1459 return False
1460 return collection.name in self.db_collection
1461
1462 def opt_in_all_collections(self) -> None:
1463 """
1464 Opt-in to all collections; Update db with the currently available collections in the module
1465 """
1466 if self.db_collection is None:
1467 raise RuntimeError('db_collection is None after initial setting')
1468
1469 for c in MODULE_COLLECTION:
1470 if c['name'].name not in self.db_collection:
1471 self.db_collection.append(c['name'])
1472
1473 self.set_store('collection', json.dumps(self.db_collection))
1474
1475 def send(self,
1476 report: Dict[str, Dict[str, str]],
1477 endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
1478 if not endpoint:
1479 endpoint = [self.EndPoint.ceph, self.EndPoint.device]
1480 failed = []
1481 success = []
1482 self.log.debug('Send endpoints %s' % endpoint)
1483 for e in endpoint:
1484 if e == self.EndPoint.ceph:
1485 fail_reason = self._try_post('ceph report', self.url, report)
1486 if fail_reason:
1487 failed.append(fail_reason)
1488 else:
1489 now = int(time.time())
1490 self.last_upload = now
1491 self.set_store('last_upload', str(now))
1492 success.append('Ceph report sent to {0}'.format(self.url))
1493 self.log.info('Sent report to {0}'.format(self.url))
1494 elif e == self.EndPoint.device:
1495 if 'device' in self.get_active_channels():
1496 devices = self.gather_device_report()
1497 if devices:
1498 num_devs = 0
1499 num_hosts = 0
1500 for host, ls in devices.items():
1501 self.log.debug('host %s devices %s' % (host, ls))
1502 if not len(ls):
1503 continue
1504 fail_reason = self._try_post('devices', self.device_url,
1505 ls)
1506 if fail_reason:
1507 failed.append(fail_reason)
1508 else:
1509 num_devs += len(ls)
1510 num_hosts += 1
1511 if num_devs:
1512 success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1513 num_devs, num_hosts, len(devices)))
1514 else:
1515 fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
1516 failed.append(fail_reason)
1517 self.log.error(fail_reason)
1518 if failed:
1519 return 1, '', '\n'.join(success + failed)
1520 return 0, '', '\n'.join(success)
1521
1522 def format_perf_histogram(self, report: Dict[str, Any]) -> None:
1523 # Formatting the perf histograms so they are human-readable. This will change the
1524 # ranges and values, which are currently in list form, into strings so that
1525 # they are displayed horizontally instead of vertically.
1526 try:
1527 # Formatting ranges and values in osd_perf_histograms
1528 mode = 'osd_perf_histograms'
1529 for config in report[mode]:
1530 for histogram in config:
1531 # Adjust ranges by converting lists into strings
1532 for axis in config[histogram]['axes']:
1533 for i in range(0, len(axis['ranges'])):
1534 axis['ranges'][i] = str(axis['ranges'][i])
1535
1536 for osd in config[histogram]['osds']:
1537 for i in range(0, len(osd['values'])):
1538 osd['values'][i] = str(osd['values'][i])
1539 except KeyError:
1540 # If the perf channel is not enabled, there should be a KeyError since
1541 # 'osd_perf_histograms' would not be present in the report. In that case,
1542 # the show function should pass as usual without trying to format the
1543 # histograms.
1544 pass
1545
1546 def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1547 '''
1548 Enable or disable a list of channels
1549 '''
1550 if not self.enabled:
1551 # telemetry should be on for channels to be toggled
1552 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1553 'Preview sample reports with `ceph telemetry preview`.'
1554 return 0, msg, ''
1555
1556 if channels is None:
1557 msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1558 return 0, msg, ''
1559
1560 state = action == 'enable'
1561 msg = ''
1562 for c in channels:
1563 if c not in ALL_CHANNELS:
1564 msg = f"{msg}{c} is not a valid channel name. "\
1565 f"Available channels: {ALL_CHANNELS}.\n"
1566 else:
1567 self.set_module_option(f"channel_{c}", state)
1568 setattr(self,
1569 f"channel_{c}",
1570 state)
1571 msg = f"{msg}channel_{c} is {action}d\n"
1572
1573 return 0, msg, ''
1574
1575 @CLIReadCommand('telemetry status')
1576 def status(self) -> Tuple[int, str, str]:
1577 '''
1578 Show current configuration
1579 '''
1580 r = {}
1581 for opt in self.MODULE_OPTIONS:
1582 r[opt['name']] = getattr(self, opt['name'])
1583 r['last_upload'] = (time.ctime(self.last_upload)
1584 if self.last_upload else self.last_upload)
1585 return 0, json.dumps(r, indent=4, sort_keys=True), ''
1586
1587 @CLIReadCommand('telemetry diff')
1588 def diff(self) -> Tuple[int, str, str]:
1589 '''
1590 Show the diff between opted-in collection and available collection
1591 '''
1592 diff = []
1593 keys = ['nag']
1594
1595 for c in MODULE_COLLECTION:
1596 if not self.is_enabled_collection(c['name']):
1597 diff.append({key: val for key, val in c.items() if key not in keys})
1598
1599 r = None
1600 if diff == []:
1601 r = "Telemetry is up to date"
1602 else:
1603 r = json.dumps(diff, indent=4, sort_keys=True)
1604
1605 return 0, r, ''
1606
1607 @CLICommand('telemetry on')
1608 def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
1609 '''
1610 Enable telemetry reports from this cluster
1611 '''
1612 if license != LICENSE:
1613 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1614 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1615 else:
1616 self.set_module_option('enabled', True)
1617 self.enabled = True
1618 self.opt_in_all_collections()
1619
1620 # for major releases upgrade nagging
1621 mon_map = self.get('mon_map')
1622 mon_min = mon_map.get("min_mon_release", 0)
1623 self.set_store('last_opted_in_ceph_version', str(mon_min))
1624 self.last_opted_in_ceph_version = mon_min
1625
1626 msg = 'Telemetry is on.'
1627 disabled_channels = ''
1628 active_channels = self.get_active_channels()
1629 for c in ALL_CHANNELS:
1630 if c not in active_channels and c != 'ident':
1631 disabled_channels = f"{disabled_channels} {c}"
1632
1633 if len(disabled_channels) > 0:
1634 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
1635 f"`ceph telemetry enable channel{disabled_channels}`"
1636
1637 # wake up serve() to reset health warning
1638 self.event.set()
1639
1640 return 0, msg, ''
1641
1642 @CLICommand('telemetry off')
1643 def off(self) -> Tuple[int, str, str]:
1644 '''
1645 Disable telemetry reports from this cluster
1646 '''
1647 if not self.enabled:
1648 # telemetry is already off
1649 msg = 'Telemetry is currently not enabled, nothing to turn off. '\
1650 'Please consider opting-in with `ceph telemetry on`.\n' \
1651 'Preview sample reports with `ceph telemetry preview`.'
1652 return 0, msg, ''
1653
1654 self.set_module_option('enabled', False)
1655 self.enabled = False
1656 self.set_store('collection', json.dumps([]))
1657 self.db_collection = []
1658
1659 # we might need this info in the future, in case
1660 # of nagging when user is opted-out
1661 mon_map = self.get('mon_map')
1662 mon_min = mon_map.get("min_mon_release", 0)
1663 self.set_store('last_opted_out_ceph_version', str(mon_min))
1664 self.last_opted_out_ceph_version = mon_min
1665
1666 msg = 'Telemetry is now disabled.'
1667 return 0, msg, ''
1668
1669 @CLIReadCommand('telemetry enable channel all')
1670 def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1671 '''
1672 Enable all channels
1673 '''
1674 return self.toggle_channel('enable', channels)
1675
1676 @CLIReadCommand('telemetry enable channel')
1677 def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1678 '''
1679 Enable a list of channels
1680 '''
1681 return self.toggle_channel('enable', channels)
1682
1683 @CLIReadCommand('telemetry disable channel all')
1684 def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1685 '''
1686 Disable all channels
1687 '''
1688 return self.toggle_channel('disable', channels)
1689
1690 @CLIReadCommand('telemetry disable channel')
1691 def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1692 '''
1693 Disable a list of channels
1694 '''
1695 return self.toggle_channel('disable', channels)
1696
1697 @CLIReadCommand('telemetry channel ls')
1698 def channel_ls(self) -> Tuple[int, str, str]:
1699 '''
1700 List all channels
1701 '''
1702 table = PrettyTable(
1703 [
1704 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1705 ],
1706 border=False)
1707 table.align['NAME'] = 'l'
1708 table.align['ENABLED'] = 'l'
1709 table.align['DEFAULT'] = 'l'
1710 table.align['DESC'] = 'l'
1711 table.left_padding_width = 0
1712 table.right_padding_width = 4
1713
1714 for c in ALL_CHANNELS:
1715 enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
1716 for o in self.MODULE_OPTIONS:
1717 if o['name'] == f"channel_{c}":
1718 default = "ON" if o.get('default', None) else "OFF"
1719 desc = o.get('desc', None)
1720
1721 table.add_row((
1722 c,
1723 enabled,
1724 default,
1725 desc,
1726 ))
1727
1728 return 0, table.get_string(sortby="NAME"), ''
1729
1730 @CLIReadCommand('telemetry collection ls')
1731 def collection_ls(self) -> Tuple[int, str, str]:
1732 '''
1733 List all collections
1734 '''
1735 col_delta = self.collection_delta()
1736 msg = ''
1737 if col_delta is not None and len(col_delta) > 0:
1738 msg = f"New collections are available:\n" \
1739 f"{sorted([c.name for c in col_delta])}\n" \
1740 f"Run `ceph telemetry on` to opt-in to these collections.\n"
1741
1742 table = PrettyTable(
1743 [
1744 'NAME', 'STATUS', 'DESC',
1745 ],
1746 border=False)
1747 table.align['NAME'] = 'l'
1748 table.align['STATUS'] = 'l'
1749 table.align['DESC'] = 'l'
1750 table.left_padding_width = 0
1751 table.right_padding_width = 4
1752
1753 for c in MODULE_COLLECTION:
1754 name = c['name']
1755 opted_in = self.is_enabled_collection(name)
1756 channel_enabled = getattr(self, f"channel_{c['channel']}")
1757
1758 status = ''
1759 if channel_enabled and opted_in:
1760 status = "REPORTING"
1761 else:
1762 why = ''
1763 delimiter = ''
1764
1765 if not opted_in:
1766 why += "NOT OPTED-IN"
1767 delimiter = ', '
1768 if not channel_enabled:
1769 why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
1770
1771 status = f"NOT REPORTING: {why}"
1772
1773 desc = c['description']
1774
1775 table.add_row((
1776 name,
1777 status,
1778 desc,
1779 ))
1780
1781 if len(msg):
1782 # add a new line between message and table output
1783 msg = f"{msg} \n"
1784
1785 return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
1786
1787 @CLICommand('telemetry send')
1788 def do_send(self,
1789 endpoint: Optional[List[EndPoint]] = None,
1790 license: Optional[str] = None) -> Tuple[int, str, str]:
1791 '''
1792 Send a sample report
1793 '''
1794 if not self.is_opted_in() and license != LICENSE:
1795 self.log.debug(('A telemetry send attempt while opted-out. '
1796 'Asking for license agreement'))
1797 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1798 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1799 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1800 else:
1801 self.last_report = self.compile_report()
1802 return self.send(self.last_report, endpoint)
1803
1804 @CLIReadCommand('telemetry show')
1805 def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1806 '''
1807 Show a sample report of opted-in collections (except for 'device')
1808 '''
1809 if not self.enabled:
1810 # if telemetry is off, no report is being sent, hence nothing to show
1811 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1812 'Preview sample reports with `ceph telemetry preview`.'
1813 return 0, msg, ''
1814
1815 report = self.get_report_locked(channels=channels)
1816 self.format_perf_histogram(report)
1817 report = json.dumps(report, indent=4, sort_keys=True)
1818
1819 if self.channel_device:
1820 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1821
1822 return 0, report, ''
1823
1824 @CLIReadCommand('telemetry preview')
1825 def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1826 '''
1827 Preview a sample report of the most recent collections available (except for 'device')
1828 '''
1829 report = {}
1830
1831 # We use a lock to prevent a scenario where the user wishes to preview
1832 # the report, and at the same time the module hits the interval of
1833 # sending a report with the opted-in collection, which has less data
1834 # than in the preview report.
1835 col_delta = self.collection_delta()
1836 with self.get_report_lock:
1837 if col_delta is not None and len(col_delta) == 0:
1838 # user is already opted-in to the most recent collection
1839 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1840 return 0, msg, ''
1841 else:
1842 # there are collections the user is not opted-in to
1843 next_collection = []
1844
1845 for c in MODULE_COLLECTION:
1846 next_collection.append(c['name'].name)
1847
1848 opted_in_collection = self.db_collection
1849 self.db_collection = next_collection
1850 report = self.get_report(channels=channels)
1851 self.db_collection = opted_in_collection
1852
1853 self.format_perf_histogram(report)
1854 report = json.dumps(report, indent=4, sort_keys=True)
1855
1856 if self.channel_device:
1857 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1858
1859 return 0, report, ''
1860
1861 @CLIReadCommand('telemetry show-device')
1862 def show_device(self) -> Tuple[int, str, str]:
1863 '''
1864 Show a sample device report
1865 '''
1866 if not self.enabled:
1867 # if telemetry is off, no report is being sent, hence nothing to show
1868 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1869 'Preview sample device reports with `ceph telemetry preview-device`.'
1870 return 0, msg, ''
1871
1872 if not self.channel_device:
1873 # if device channel is off, device report is not being sent, hence nothing to show
1874 msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1875 'Preview sample device reports with `ceph telemetry preview-device`.'
1876 return 0, msg, ''
1877
1878 return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
1879
1880 @CLIReadCommand('telemetry preview-device')
1881 def preview_device(self) -> Tuple[int, str, str]:
1882 '''
1883 Preview a sample device report of the most recent device collection
1884 '''
1885 report = {}
1886
1887 device_col_delta = self.collection_delta(['device'])
1888 with self.get_report_lock:
1889 if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
1890 # user is already opted-in to the most recent device collection,
1891 # and device channel is on, thus `show-device` should be called
1892 msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1893 return 0, msg, ''
1894
1895 # either the user is not opted-in at all, or there are collections
1896 # they are not opted-in to
1897 next_collection = []
1898
1899 for c in MODULE_COLLECTION:
1900 next_collection.append(c['name'].name)
1901
1902 opted_in_collection = self.db_collection
1903 self.db_collection = next_collection
1904 report = self.get_report('device')
1905 self.db_collection = opted_in_collection
1906
1907 report = json.dumps(report, indent=4, sort_keys=True)
1908 return 0, report, ''
1909
1910 @CLIReadCommand('telemetry show-all')
1911 def show_all(self) -> Tuple[int, str, str]:
1912 '''
1913 Show a sample report of all enabled channels (including 'device' channel)
1914 '''
1915 if not self.enabled:
1916 # if telemetry is off, no report is being sent, hence nothing to show
1917 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1918 'Preview sample reports with `ceph telemetry preview`.'
1919 return 0, msg, ''
1920
1921 if not self.channel_device:
1922 # device channel is off, no need to display its report
1923 return 0, json.dumps(self.get_report_locked('default'), indent=4, sort_keys=True), ''
1924
1925 # telemetry is on and device channel is enabled, show both
1926 return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), ''
1927
1928 @CLIReadCommand('telemetry preview-all')
1929 def preview_all(self) -> Tuple[int, str, str]:
1930 '''
1931 Preview a sample report of the most recent collections available of all channels (including 'device')
1932 '''
1933 report = {}
1934
1935 col_delta = self.collection_delta()
1936 with self.get_report_lock:
1937 if col_delta is not None and len(col_delta) == 0:
1938 # user is already opted-in to the most recent collection
1939 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1940 return 0, msg, ''
1941
1942 # there are collections the user is not opted-in to
1943 next_collection = []
1944
1945 for c in MODULE_COLLECTION:
1946 next_collection.append(c['name'].name)
1947
1948 opted_in_collection = self.db_collection
1949 self.db_collection = next_collection
1950 report = self.get_report('all')
1951 self.db_collection = opted_in_collection
1952
1953 self.format_perf_histogram(report)
1954 report = json.dumps(report, indent=4, sort_keys=True)
1955
1956 return 0, report, ''
1957
1958 def get_report_locked(self,
1959 report_type: str = 'default',
1960 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1961 '''
1962 A wrapper around get_report to allow for compiling a report of the most recent module collections
1963 '''
1964 with self.get_report_lock:
1965 return self.get_report(report_type, channels)
1966
1967 def get_report(self,
1968 report_type: str = 'default',
1969 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1970 if report_type == 'default':
1971 return self.compile_report(channels=channels)
1972 elif report_type == 'device':
1973 return self.gather_device_report()
1974 elif report_type == 'all':
1975 return {'report': self.compile_report(channels=channels),
1976 'device_report': self.gather_device_report()}
1977 return {}
1978
1979 def self_test(self) -> None:
1980 report = self.compile_report()
1981 if len(report) == 0:
1982 raise RuntimeError('Report is empty')
1983
1984 if 'report_id' not in report:
1985 raise RuntimeError('report_id not found in report')
1986
1987 def shutdown(self) -> None:
1988 self.run = False
1989 self.event.set()
1990
1991 def refresh_health_checks(self) -> None:
1992 health_checks = {}
1993 # TODO do we want to nag also in case the user is not opted-in?
1994 if self.enabled and self.should_nag():
1995 health_checks['TELEMETRY_CHANGED'] = {
1996 'severity': 'warning',
1997 'summary': 'Telemetry requires re-opt-in',
1998 'detail': [
1999 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
2000 ]
2001 }
2002 self.set_health_checks(health_checks)
2003
2004 def serve(self) -> None:
2005 self.load()
2006 self.run = True
2007
2008 self.log.debug('Waiting for mgr to warm up')
2009 time.sleep(10)
2010
2011 while self.run:
2012 self.event.clear()
2013
2014 self.refresh_health_checks()
2015
2016 if not self.is_opted_in():
2017 self.log.debug('Not sending report until user re-opts-in')
2018 self.event.wait(1800)
2019 continue
2020 if not self.enabled:
2021 self.log.debug('Not sending report until configured to do so')
2022 self.event.wait(1800)
2023 continue
2024
2025 now = int(time.time())
2026 if not self.last_upload or \
2027 (now - self.last_upload) > self.interval * 3600:
2028 self.log.info('Compiling and sending report to %s',
2029 self.url)
2030
2031 try:
2032 self.last_report = self.compile_report()
2033 except Exception:
2034 self.log.exception('Exception while compiling report:')
2035
2036 self.send(self.last_report)
2037 else:
2038 self.log.debug('Interval for sending new report has not expired')
2039
2040 sleep = 3600
2041 self.log.debug('Sleeping for %d seconds', sleep)
2042 self.event.wait(sleep)
2043
2044 @staticmethod
2045 def can_run() -> Tuple[bool, str]:
2046 return True, ''