]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/telemetry/module.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
CommitLineData
11fdf7f2
TL
1"""
2Telemetry module for ceph-mgr
3
4Collect statistics from Ceph cluster and send this back to the Ceph project
5when user has opted-in
6"""
20effc67
TL
7import logging
8import numbers
9import enum
11fdf7f2 10import errno
eafe8130 11import hashlib
11fdf7f2 12import json
eafe8130 13import rbd
11fdf7f2
TL
14import requests
15import uuid
16import time
eafe8130 17from datetime import datetime, timedelta
20effc67
TL
18from prettytable import PrettyTable
19from threading import Event, Lock
11fdf7f2 20from collections import defaultdict
20effc67 21from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
11fdf7f2 22
20effc67 23from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
11fdf7f2
TL
24
25
20effc67 26ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
eafe8130 27
20effc67
TL
28LICENSE = 'sharing-1-0'
29LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
30LICENSE_URL = 'https://cdla.io/sharing-1-0/'
1d09f67e 31NO_SALT_CNT = 0
eafe8130
TL
32
33# Latest revision of the telemetry report. Bump this each time we make
34# *any* change.
35REVISION = 3
36
37# History of revisions
38# --------------------
39#
40# Version 1:
41# Mimic and/or nautilus are lumped together here, since
42# we didn't track revisions yet.
43#
44# Version 2:
45# - added revision tracking, nagging, etc.
46# - added config option changes
47# - added channels
48# - added explicit license acknowledgement to the opt-in process
49#
50# Version 3:
51# - added device health metrics (i.e., SMART data, minus serial number)
52# - remove crush_rule
53# - added CephFS metadata (how many MDSs, fs features, how many data pools,
54# how much metadata is cached, rfiles, rbytes, rsnapshots)
55# - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56# - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57# - whether an OSD cluster network is in use
58# - rbd pool and image count, and rbd mirror mode (pool-level)
59# - rgw daemons, zones, zonegroups; which rgw frontends
60# - crush map stats
61
20effc67
TL
62class Collection(str, enum.Enum):
63 basic_base = 'basic_base'
64 device_base = 'device_base'
65 crash_base = 'crash_base'
66 ident_base = 'ident_base'
67 perf_perf = 'perf_perf'
68 basic_mds_metadata = 'basic_mds_metadata'
69 basic_pool_usage = 'basic_pool_usage'
70 basic_usage_by_class = 'basic_usage_by_class'
33c7a0ef 71 basic_rook_v01 = 'basic_rook_v01'
2a845540 72 perf_memory_metrics = 'perf_memory_metrics'
39ae355f 73 basic_pool_options_bluestore = 'basic_pool_options_bluestore'
20effc67
TL
74
75MODULE_COLLECTION : List[Dict] = [
76 {
77 "name": Collection.basic_base,
78 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
79 "channel": "basic",
80 "nag": False
81 },
82 {
83 "name": Collection.device_base,
84 "description": "Information about device health metrics",
85 "channel": "device",
86 "nag": False
87 },
88 {
89 "name": Collection.crash_base,
90 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
91 "channel": "crash",
92 "nag": False
93 },
94 {
95 "name": Collection.ident_base,
96 "description": "User-provided identifying information about the cluster",
97 "channel": "ident",
98 "nag": False
99 },
100 {
101 "name": Collection.perf_perf,
102 "description": "Information about performance counters of the cluster",
103 "channel": "perf",
104 "nag": True
105 },
106 {
107 "name": Collection.basic_mds_metadata,
108 "description": "MDS metadata",
109 "channel": "basic",
110 "nag": False
111 },
112 {
113 "name": Collection.basic_pool_usage,
114 "description": "Default pool application and usage statistics",
115 "channel": "basic",
116 "nag": False
117 },
118 {
119 "name": Collection.basic_usage_by_class,
120 "description": "Default device class usage statistics",
121 "channel": "basic",
122 "nag": False
33c7a0ef
TL
123 },
124 {
125 "name": Collection.basic_rook_v01,
126 "description": "Basic Rook deployment data",
127 "channel": "basic",
128 "nag": True
129 },
2a845540
TL
130 {
131 "name": Collection.perf_memory_metrics,
132 "description": "Heap stats and mempools for mon and mds",
133 "channel": "perf",
134 "nag": False
135 },
39ae355f
TL
136 {
137 "name": Collection.basic_pool_options_bluestore,
138 "description": "Per-pool bluestore config options",
139 "channel": "basic",
140 "nag": False
141 },
33c7a0ef
TL
142]
143
144ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
145 # Note: a key cannot be both a node and a leaf, e.g.
146 # "rook/a/b"
147 # "rook/a/b/c"
148 ("rook/version", Collection.basic_rook_v01),
149 ("rook/kubernetes/version", Collection.basic_rook_v01),
150 ("rook/csi/version", Collection.basic_rook_v01),
151 ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
152 ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
153 ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
154 ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
155 ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
156 ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
157 ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
158 ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
159 ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
160 ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
161 ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
162 ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
163 ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
164 ("rook/cluster/mon/count", Collection.basic_rook_v01),
165 ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
166 ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
167 ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
168 ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
169 ("rook/cluster/network/provider", Collection.basic_rook_v01),
170 ("rook/cluster/external-mode", Collection.basic_rook_v01),
20effc67 171]
11fdf7f2 172
20effc67 173class Module(MgrModule):
11fdf7f2 174 metadata_keys = [
20effc67
TL
175 "arch",
176 "ceph_version",
177 "os",
178 "cpu",
179 "kernel_description",
180 "kernel_version",
181 "distro_description",
182 "distro"
11fdf7f2
TL
183 ]
184
185 MODULE_OPTIONS = [
20effc67
TL
186 Option(name='url',
187 type='str',
188 default='https://telemetry.ceph.com/report'),
189 Option(name='device_url',
190 type='str',
191 default='https://telemetry.ceph.com/device'),
192 Option(name='enabled',
193 type='bool',
194 default=False),
195 Option(name='last_opt_revision',
196 type='int',
197 default=1),
198 Option(name='leaderboard',
199 type='bool',
200 default=False),
1e59de90
TL
201 Option(name='leaderboard_description',
202 type='str',
203 default=None),
20effc67
TL
204 Option(name='description',
205 type='str',
206 default=None),
207 Option(name='contact',
208 type='str',
209 default=None),
210 Option(name='organization',
211 type='str',
212 default=None),
213 Option(name='proxy',
214 type='str',
215 default=None),
216 Option(name='interval',
217 type='int',
218 default=24,
219 min=8),
220 Option(name='channel_basic',
221 type='bool',
222 default=True,
223 desc='Share basic cluster information (size, version)'),
224 Option(name='channel_ident',
225 type='bool',
226 default=False,
227 desc='Share a user-provided description and/or contact email for the cluster'),
228 Option(name='channel_crash',
229 type='bool',
230 default=True,
231 desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
232 Option(name='channel_device',
233 type='bool',
234 default=True,
235 desc=('Share device health metrics '
236 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
237 Option(name='channel_perf',
238 type='bool',
239 default=False,
240 desc='Share various performance metrics of a cluster'),
11fdf7f2
TL
241 ]
242
243 @property
20effc67 244 def config_keys(self) -> Dict[str, OptionValue]:
11fdf7f2
TL
245 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
246
20effc67 247 def __init__(self, *args: Any, **kwargs: Any) -> None:
11fdf7f2
TL
248 super(Module, self).__init__(*args, **kwargs)
249 self.event = Event()
250 self.run = False
20effc67
TL
251 self.db_collection: Optional[List[str]] = None
252 self.last_opted_in_ceph_version: Optional[int] = None
253 self.last_opted_out_ceph_version: Optional[int] = None
254 self.last_upload: Optional[int] = None
255 self.last_report: Dict[str, Any] = dict()
256 self.report_id: Optional[str] = None
257 self.salt: Optional[str] = None
258 self.get_report_lock = Lock()
259 self.config_update_module_option()
260 # for mypy which does not run the code
261 if TYPE_CHECKING:
262 self.url = ''
263 self.device_url = ''
264 self.enabled = False
265 self.last_opt_revision = 0
266 self.leaderboard = ''
1e59de90 267 self.leaderboard_description = ''
20effc67
TL
268 self.interval = 0
269 self.proxy = ''
270 self.channel_basic = True
271 self.channel_ident = False
272 self.channel_crash = True
273 self.channel_device = True
274 self.channel_perf = False
275 self.db_collection = ['basic_base', 'device_base']
276 self.last_opted_in_ceph_version = 17
277 self.last_opted_out_ceph_version = 0
278
279 def config_update_module_option(self) -> None:
11fdf7f2
TL
280 for opt in self.MODULE_OPTIONS:
281 setattr(self,
282 opt['name'],
283 self.get_module_option(opt['name']))
284 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
20effc67
TL
285
286 def config_notify(self) -> None:
287 self.config_update_module_option()
eafe8130
TL
288 # wake up serve() thread
289 self.event.set()
11fdf7f2 290
20effc67
TL
291 def load(self) -> None:
292 last_upload = self.get_store('last_upload', None)
293 if last_upload is None:
294 self.last_upload = None
295 else:
296 self.last_upload = int(last_upload)
11fdf7f2 297
20effc67
TL
298 report_id = self.get_store('report_id', None)
299 if report_id is None:
11fdf7f2
TL
300 self.report_id = str(uuid.uuid4())
301 self.set_store('report_id', self.report_id)
20effc67
TL
302 else:
303 self.report_id = report_id
11fdf7f2 304
20effc67
TL
305 salt = self.get_store('salt', None)
306 if salt is None:
eafe8130
TL
307 self.salt = str(uuid.uuid4())
308 self.set_store('salt', self.salt)
20effc67
TL
309 else:
310 self.salt = salt
311
312 self.init_collection()
313
314 last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
315 if last_opted_in_ceph_version is None:
316 self.last_opted_in_ceph_version = None
317 else:
318 self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
319
320 last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
321 if last_opted_out_ceph_version is None:
322 self.last_opted_out_ceph_version = None
323 else:
324 self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
eafe8130 325
20effc67
TL
326 def gather_osd_metadata(self,
327 osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
11fdf7f2
TL
328 keys = ["osd_objectstore", "rotational"]
329 keys += self.metadata_keys
330
20effc67 331 metadata: Dict[str, Dict[str, int]] = dict()
11fdf7f2
TL
332 for key in keys:
333 metadata[key] = defaultdict(int)
334
335 for osd in osd_map['osds']:
20effc67 336 res = self.get_metadata('osd', str(osd['osd']))
92f5a8d4
TL
337 if res is None:
338 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
339 continue
20effc67 340 for k, v in res.items():
11fdf7f2
TL
341 if k not in keys:
342 continue
343
344 metadata[k][v] += 1
345
346 return metadata
347
20effc67
TL
348 def gather_mon_metadata(self,
349 mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
11fdf7f2
TL
350 keys = list()
351 keys += self.metadata_keys
352
20effc67 353 metadata: Dict[str, Dict[str, int]] = dict()
11fdf7f2
TL
354 for key in keys:
355 metadata[key] = defaultdict(int)
356
357 for mon in mon_map['mons']:
20effc67 358 res = self.get_metadata('mon', mon['name'])
92f5a8d4
TL
359 if res is None:
360 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
361 continue
20effc67
TL
362 for k, v in res.items():
363 if k not in keys:
364 continue
365
366 metadata[k][v] += 1
367
368 return metadata
369
370 def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
371 metadata: Dict[str, Dict[str, int]] = dict()
372
373 res = self.get('mds_metadata') # metadata of *all* mds daemons
374 if res is None or not res:
375 self.log.debug('Could not get metadata for mds daemons')
376 return metadata
377
378 keys = list()
379 keys += self.metadata_keys
380
381 for key in keys:
382 metadata[key] = defaultdict(int)
383
384 for mds in res.values():
385 for k, v in mds.items():
11fdf7f2
TL
386 if k not in keys:
387 continue
388
389 metadata[k][v] += 1
390
391 return metadata
392
20effc67
TL
393 def gather_crush_info(self) -> Dict[str, Union[int,
394 bool,
395 List[int],
396 Dict[str, int],
397 Dict[int, int]]]:
eafe8130
TL
398 osdmap = self.get_osdmap()
399 crush_raw = osdmap.get_crush()
400 crush = crush_raw.dump()
401
20effc67
TL
402 BucketKeyT = TypeVar('BucketKeyT', int, str)
403
404 def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
eafe8130
TL
405 if k in d:
406 d[k] += 1
407 else:
408 d[k] = 1
409
20effc67 410 device_classes: Dict[str, int] = {}
eafe8130
TL
411 for dev in crush['devices']:
412 inc(device_classes, dev.get('class', ''))
413
20effc67
TL
414 bucket_algs: Dict[str, int] = {}
415 bucket_types: Dict[str, int] = {}
416 bucket_sizes: Dict[int, int] = {}
eafe8130
TL
417 for bucket in crush['buckets']:
418 if '~' in bucket['name']: # ignore shadow buckets
419 continue
420 inc(bucket_algs, bucket['alg'])
421 inc(bucket_types, bucket['type_id'])
422 inc(bucket_sizes, len(bucket['items']))
423
424 return {
425 'num_devices': len(crush['devices']),
426 'num_types': len(crush['types']),
427 'num_buckets': len(crush['buckets']),
428 'num_rules': len(crush['rules']),
429 'device_classes': list(device_classes.values()),
430 'tunables': crush['tunables'],
431 'compat_weight_set': '-1' in crush['choose_args'],
432 'num_weight_sets': len(crush['choose_args']),
433 'bucket_algs': bucket_algs,
434 'bucket_sizes': bucket_sizes,
435 'bucket_types': bucket_types,
436 }
437
20effc67 438 def gather_configs(self) -> Dict[str, List[str]]:
eafe8130
TL
439 # cluster config options
440 cluster = set()
441 r, outb, outs = self.mon_command({
442 'prefix': 'config dump',
443 'format': 'json'
20effc67 444 })
eafe8130
TL
445 if r != 0:
446 return {}
447 try:
448 dump = json.loads(outb)
449 except json.decoder.JSONDecodeError:
450 return {}
451 for opt in dump:
452 name = opt.get('name')
453 if name:
454 cluster.add(name)
455 # daemon-reported options (which may include ceph.conf)
456 active = set()
20effc67 457 ls = self.get("modified_config_options")
eafe8130
TL
458 for opt in ls.get('options', {}):
459 active.add(opt)
460 return {
461 'cluster_changed': sorted(list(cluster)),
462 'active_changed': sorted(list(active)),
463 }
464
1d09f67e
TL
465 def anonymize_entity_name(self, entity_name:str) -> str:
466 if '.' not in entity_name:
467 self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
468 return entity_name
469
470 (etype, eid) = entity_name.split('.', 1)
471 m = hashlib.sha1()
472 salt = ''
473 if self.salt is not None:
474 salt = self.salt
475 # avoid asserting that salt exists
476 if not self.salt:
477 # do not set self.salt to a temp value
478 salt = f"no_salt_found_{NO_SALT_CNT}"
479 NO_SALT_CNT += 1
480 self.log.debug(f"No salt found, created a temp one: {salt}")
481 m.update(salt.encode('utf-8'))
482 m.update(eid.encode('utf-8'))
483 m.update(salt.encode('utf-8'))
484
485 return etype + '.' + m.hexdigest()
486
20effc67 487 def get_heap_stats(self) -> Dict[str, dict]:
2a845540
TL
488 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
489 anonymized_daemons = {}
490 osd_map = self.get('osd_map')
20effc67 491
2a845540
TL
492 # Combine available daemons
493 daemons = []
494 for osd in osd_map['osds']:
495 daemons.append('osd'+'.'+str(osd['osd']))
496 # perf_memory_metrics collection (1/2)
497 if self.is_enabled_collection(Collection.perf_memory_metrics):
498 mon_map = self.get('mon_map')
499 mds_metadata = self.get('mds_metadata')
500 for mon in mon_map['mons']:
501 daemons.append('mon'+'.'+mon['name'])
502 for mds in mds_metadata:
503 daemons.append('mds'+'.'+mds)
504
505 # Grab output from the "daemon.x heap stats" command
506 for daemon in daemons:
e3986515 507 daemon_type, daemon_id = daemon.split('.', 1)
2a845540
TL
508 heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
509 if heap_stats:
510 if (daemon_type != 'osd'):
511 # Anonymize mon and mds
512 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
513 daemon = anonymized_daemons[daemon]
514 result[daemon_type][daemon] = heap_stats
20effc67 515 else:
2a845540 516 continue
20effc67 517
2a845540
TL
518 if anonymized_daemons:
519 # for debugging purposes only, this data is never reported
520 self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
20effc67
TL
521 return result
522
2a845540
TL
523 def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
524 parsed_output = {}
525
526 cmd_dict = {
527 'prefix': 'heap',
528 'heapcmd': 'stats'
529 }
530 r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
531
532 if r != 0:
533 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
534 else:
535 if 'tcmalloc heap stats' in outb:
536 values = [int(i) for i in outb.split() if i.isdigit()]
537 # `categories` must be ordered this way for the correct output to be parsed
538 categories = ['use_by_application',
539 'page_heap_freelist',
540 'central_cache_freelist',
541 'transfer_cache_freelist',
542 'thread_cache_freelists',
543 'malloc_metadata',
544 'actual_memory_used',
545 'released_to_os',
546 'virtual_address_space_used',
547 'spans_in_use',
548 'thread_heaps_in_use',
549 'tcmalloc_page_size']
550 if len(values) != len(categories):
551 self.log.error('Received unexpected output from {}.{}; ' \
552 'number of values should match the number' \
553 'of expected categories:\n values: len={} {} '\
554 '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
555 else:
556 parsed_output = dict(zip(categories, values))
557 else:
558 self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
559
560 return parsed_output
561
20effc67 562 def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
2a845540
TL
563 result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
564 anonymized_daemons = {}
565 osd_map = self.get('osd_map')
20effc67 566
2a845540
TL
567 # Combine available daemons
568 daemons = []
569 for osd in osd_map['osds']:
570 daemons.append('osd'+'.'+str(osd['osd']))
571 # perf_memory_metrics collection (2/2)
572 if self.is_enabled_collection(Collection.perf_memory_metrics):
573 mon_map = self.get('mon_map')
574 mds_metadata = self.get('mds_metadata')
575 for mon in mon_map['mons']:
576 daemons.append('mon'+'.'+mon['name'])
577 for mds in mds_metadata:
578 daemons.append('mds'+'.'+mds)
20effc67 579
2a845540
TL
580 # Grab output from the "dump_mempools" command
581 for daemon in daemons:
e3986515 582 daemon_type, daemon_id = daemon.split('.', 1)
20effc67
TL
583 cmd_dict = {
584 'prefix': 'dump_mempools',
20effc67
TL
585 'format': 'json'
586 }
2a845540 587 r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
20effc67 588 if r != 0:
2a845540 589 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
20effc67
TL
590 continue
591 else:
592 try:
593 # This is where the mempool will land.
594 dump = json.loads(outb)
595 if mode == 'separated':
2a845540
TL
596 # Anonymize mon and mds
597 if daemon_type != 'osd':
598 anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
599 daemon = anonymized_daemons[daemon]
600 result[daemon_type][daemon] = dump['mempool']['by_pool']
20effc67
TL
601 elif mode == 'aggregated':
602 for mem_type in dump['mempool']['by_pool']:
2a845540
TL
603 result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
604 result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
20effc67 605 else:
2a845540 606 self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
20effc67 607 except (json.decoder.JSONDecodeError, KeyError) as e:
1e59de90 608 self.log.exception("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
20effc67
TL
609 continue
610
2a845540
TL
611 if anonymized_daemons:
612 # for debugging purposes only, this data is never reported
613 self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
614
20effc67
TL
615 return result
616
617 def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
618 # Initialize result dict
619 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
620 lambda: defaultdict(
621 lambda: defaultdict(
622 lambda: defaultdict(
623 lambda: defaultdict(int))))))
624
625 # Get list of osd ids from the metadata
626 osd_metadata = self.get('osd_metadata')
627
628 # Grab output from the "osd.x perf histogram dump" command
629 for osd_id in osd_metadata:
630 cmd_dict = {
631 'prefix': 'perf histogram dump',
632 'id': str(osd_id),
633 'format': 'json'
634 }
635 r, outb, outs = self.osd_command(cmd_dict)
636 # Check for invalid calls
637 if r != 0:
2a845540 638 self.log.error("Invalid command dictionary: {}".format(cmd_dict))
20effc67
TL
639 continue
640 else:
641 try:
642 # This is where the histograms will land if there are any.
643 dump = json.loads(outb)
644
645 for histogram in dump['osd']:
646 # Log axis information. There are two axes, each represented
647 # as a dictionary. Both dictionaries are contained inside a
648 # list called 'axes'.
649 axes = []
650 for axis in dump['osd'][histogram]['axes']:
651
652 # This is the dict that contains information for an individual
653 # axis. It will be appended to the 'axes' list at the end.
654 axis_dict: Dict[str, Any] = defaultdict()
655
656 # Collecting information for buckets, min, name, etc.
657 axis_dict['buckets'] = axis['buckets']
658 axis_dict['min'] = axis['min']
659 axis_dict['name'] = axis['name']
660 axis_dict['quant_size'] = axis['quant_size']
661 axis_dict['scale_type'] = axis['scale_type']
662
663 # Collecting ranges; placing them in lists to
664 # improve readability later on.
665 ranges = []
666 for _range in axis['ranges']:
667 _max, _min = None, None
668 if 'max' in _range:
669 _max = _range['max']
670 if 'min' in _range:
671 _min = _range['min']
672 ranges.append([_min, _max])
673 axis_dict['ranges'] = ranges
674
675 # Now that 'axis_dict' contains all the appropriate
676 # information for the current axis, append it to the 'axes' list.
677 # There will end up being two axes in the 'axes' list, since the
678 # histograms are 2D.
679 axes.append(axis_dict)
680
681 # Add the 'axes' list, containing both axes, to result.
682 # At this point, you will see that the name of the key is the string
683 # form of our axes list (str(axes)). This is there so that histograms
684 # with different axis configs will not be combined.
685 # These key names are later dropped when only the values are returned.
686 result[str(axes)][histogram]['axes'] = axes
687
688 # Collect current values and make sure they are in
689 # integer form.
690 values = []
691 for value_list in dump['osd'][histogram]['values']:
692 values.append([int(v) for v in value_list])
693
694 if mode == 'separated':
695 if 'osds' not in result[str(axes)][histogram]:
696 result[str(axes)][histogram]['osds'] = []
697 result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
698
699 elif mode == 'aggregated':
700 # Aggregate values. If 'values' have already been initialized,
701 # we can safely add.
702 if 'values' in result[str(axes)][histogram]:
703 for i in range (0, len(values)):
704 for j in range (0, len(values[i])):
705 values[i][j] += result[str(axes)][histogram]['values'][i][j]
706
707 # Add the values to result.
708 result[str(axes)][histogram]['values'] = values
709
710 # Update num_combined_osds
711 if 'num_combined_osds' not in result[str(axes)][histogram]:
712 result[str(axes)][histogram]['num_combined_osds'] = 1
713 else:
714 result[str(axes)][histogram]['num_combined_osds'] += 1
715 else:
716 self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
717 return list()
718
719 # Sometimes, json errors occur if you give it an empty string.
720 # I am also putting in a catch for a KeyError since it could
721 # happen where the code is assuming that a key exists in the
722 # schema when it doesn't. In either case, we'll handle that
723 # by continuing and collecting what we can from other osds.
724 except (json.decoder.JSONDecodeError, KeyError) as e:
1e59de90 725 self.log.exception("Error caught on osd.{}: {}".format(osd_id, e))
20effc67
TL
726 continue
727
728 return list(result.values())
729
730 def get_io_rate(self) -> dict:
731 return self.get('io_rate')
732
733 def get_stats_per_pool(self) -> dict:
734 result = self.get('pg_dump')['pool_stats']
735
736 # collect application metadata from osd_map
737 osd_map = self.get('osd_map')
738 application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
739
740 # add application to each pool from pg_dump
741 for pool in result:
742 pool['application'] = []
743 # Only include default applications
744 for application in application_metadata[pool['poolid']]:
745 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
746 pool['application'].append(application)
747
748 return result
749
750 def get_stats_per_pg(self) -> dict:
751 return self.get('pg_dump')['pg_stats']
752
753 def get_rocksdb_stats(self) -> Dict[str, str]:
754 # Initalizers
755 result: Dict[str, str] = defaultdict()
756 version = self.get_rocksdb_version()
757
758 # Update result
759 result['version'] = version
760
761 return result
762
763 def gather_crashinfo(self) -> List[Dict[str, str]]:
764 crashlist: List[Dict[str, str]] = list()
eafe8130 765 errno, crashids, err = self.remote('crash', 'ls')
11fdf7f2 766 if errno:
20effc67 767 return crashlist
11fdf7f2 768 for crashid in crashids.split():
20effc67 769 errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
11fdf7f2
TL
770 if errno:
771 continue
772 c = json.loads(crashinfo)
20effc67
TL
773
774 # redact hostname
11fdf7f2 775 del c['utsname_hostname']
20effc67 776
92f5a8d4
TL
777 # entity_name might have more than one '.', beware
778 (etype, eid) = c.get('entity_name', '').split('.', 1)
eafe8130 779 m = hashlib.sha1()
20effc67 780 assert self.salt
eafe8130
TL
781 m.update(self.salt.encode('utf-8'))
782 m.update(eid.encode('utf-8'))
783 m.update(self.salt.encode('utf-8'))
784 c['entity_name'] = etype + '.' + m.hexdigest()
20effc67
TL
785
786 # redact final line of python tracebacks, as the exception
787 # payload may contain identifying information
788 if 'mgr_module' in c and 'backtrace' in c:
789 # backtrace might be empty
790 if len(c['backtrace']) > 0:
791 c['backtrace'][-1] = '<redacted>'
792
11fdf7f2
TL
793 crashlist.append(c)
794 return crashlist
795
20effc67 796 def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
aee94f69 797 # Extract perf counter data with get_unlabeled_perf_counters(), a method
20effc67
TL
798 # from mgr/mgr_module.py. This method returns a nested dictionary that
799 # looks a lot like perf schema, except with some additional fields.
800 #
801 # Example of output, a snapshot of a mon daemon:
802 # "mon.b": {
803 # "bluestore.kv_flush_lat": {
804 # "count": 2431,
805 # "description": "Average kv_thread flush latency",
806 # "nick": "fl_l",
807 # "priority": 8,
808 # "type": 5,
809 # "units": 1,
810 # "value": 88814109
811 # },
812 # },
aee94f69 813 perf_counters = self.get_unlabeled_perf_counters()
20effc67
TL
814
815 # Initialize 'result' dict
816 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
817 lambda: defaultdict(lambda: defaultdict(int))))
818
1d09f67e
TL
819 # 'separated' mode
820 anonymized_daemon_dict = {}
821
aee94f69 822 for daemon, perf_counters_by_daemon in perf_counters.items():
1d09f67e
TL
823 daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
824
825 if mode == 'separated':
826 # anonymize individual daemon names except osds
827 if (daemon_type != 'osd'):
828 anonymized_daemon = self.anonymize_entity_name(daemon)
829 anonymized_daemon_dict[anonymized_daemon] = daemon
830 daemon = anonymized_daemon
20effc67
TL
831
832 # Calculate num combined daemon types if in aggregated mode
833 if mode == 'aggregated':
20effc67
TL
834 if 'num_combined_daemons' not in result[daemon_type]:
835 result[daemon_type]['num_combined_daemons'] = 1
836 else:
837 result[daemon_type]['num_combined_daemons'] += 1
838
aee94f69 839 for collection in perf_counters_by_daemon:
20effc67
TL
840 # Split the collection to avoid redundancy in final report; i.e.:
841 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
842 # bluestore: kv_flush_lat, kv_final_lat
843 col_0, col_1 = collection.split('.')
844
845 # Debug log for empty keys. This initially was a problem for prioritycache
846 # perf counters, where the col_0 was empty for certain mon counters:
847 #
848 # "mon.a": { instead of "mon.a": {
849 # "": { "prioritycache": {
850 # "cache_bytes": {...}, "cache_bytes": {...},
851 #
852 # This log is here to detect any future instances of a similar issue.
853 if (daemon == "") or (col_0 == "") or (col_1 == ""):
854 self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
855
856 if mode == 'separated':
857 # Add value to result
858 result[daemon][col_0][col_1]['value'] = \
aee94f69 859 perf_counters_by_daemon[collection]['value']
20effc67
TL
860
861 # Check that 'count' exists, as not all counters have a count field.
aee94f69 862 if 'count' in perf_counters_by_daemon[collection]:
20effc67 863 result[daemon][col_0][col_1]['count'] = \
aee94f69 864 perf_counters_by_daemon[collection]['count']
20effc67
TL
865 elif mode == 'aggregated':
866 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
867 # has a uniquely-named collection that starts off identically (i.e.
868 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
869 # This bit of code combines these unique counters all under one rgw instance.
870 # Without this check, the schema would remain separeted out in the final report.
871 if col_0[0:11] == "objecter-0x":
872 col_0 = "objecter-0x"
873
874 # Check that the value can be incremented. In some cases,
875 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
876 # In those cases, the value is a dictionary, and not a number.
877 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
aee94f69 878 if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number):
20effc67 879 result[daemon_type][col_0][col_1]['value'] += \
aee94f69 880 perf_counters_by_daemon[collection]['value']
20effc67
TL
881
882 # Check that 'count' exists, as not all counters have a count field.
aee94f69 883 if 'count' in perf_counters_by_daemon[collection]:
20effc67 884 result[daemon_type][col_0][col_1]['count'] += \
aee94f69 885 perf_counters_by_daemon[collection]['count']
20effc67
TL
886 else:
887 self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
888 return {}
889
1d09f67e
TL
890 if mode == 'separated':
891 # for debugging purposes only, this data is never reported
892 self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
893
20effc67
TL
894 return result
895
896 def get_active_channels(self) -> List[str]:
eafe8130
TL
897 r = []
898 if self.channel_basic:
899 r.append('basic')
900 if self.channel_crash:
901 r.append('crash')
902 if self.channel_device:
903 r.append('device')
f67539c2
TL
904 if self.channel_ident:
905 r.append('ident')
20effc67
TL
906 if self.channel_perf:
907 r.append('perf')
eafe8130
TL
908 return r
909
20effc67 910 def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
eafe8130
TL
911 try:
912 time_format = self.remote('devicehealth', 'get_time_format')
20effc67
TL
913 except Exception as e:
914 self.log.debug('Unable to format time: {}'.format(e))
915 return {}
eafe8130
TL
916 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
917 min_sample = cutoff.strftime(time_format)
918
919 devices = self.get('devices')['devices']
20effc67
TL
920 if not devices:
921 self.log.debug('Unable to get device info from the mgr.')
922 return {}
eafe8130 923
20effc67
TL
924 # anon-host-id -> anon-devid -> { timestamp -> record }
925 res: Dict[str, Dict[str, Dict[str, str]]] = {}
eafe8130
TL
926 for d in devices:
927 devid = d['devid']
928 try:
929 # this is a map of stamp -> {device info}
930 m = self.remote('devicehealth', 'get_recent_device_metrics',
931 devid, min_sample)
20effc67 932 except Exception as e:
2a845540 933 self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
eafe8130
TL
934 continue
935
936 # anonymize host id
937 try:
938 host = d['location'][0]['host']
20effc67 939 except (KeyError, IndexError) as e:
1e59de90 940 self.log.exception('Unable to get host from device with id "{}": {}'.format(devid, e))
eafe8130
TL
941 continue
942 anon_host = self.get_store('host-id/%s' % host)
943 if not anon_host:
944 anon_host = str(uuid.uuid1())
945 self.set_store('host-id/%s' % host, anon_host)
f91f0fd5 946 serial = None
eafe8130
TL
947 for dev, rep in m.items():
948 rep['host_id'] = anon_host
f91f0fd5
TL
949 if serial is None and 'serial_number' in rep:
950 serial = rep['serial_number']
eafe8130
TL
951
952 # anonymize device id
eafe8130
TL
953 anon_devid = self.get_store('devid-id/%s' % devid)
954 if not anon_devid:
f91f0fd5
TL
955 # ideally devid is 'vendor_model_serial',
956 # but can also be 'model_serial', 'serial'
957 if '_' in devid:
958 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
959 else:
960 anon_devid = str(uuid.uuid1())
eafe8130
TL
961 self.set_store('devid-id/%s' % devid, anon_devid)
962 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
963 host, anon_host))
964
965 # anonymize the smartctl report itself
f91f0fd5
TL
966 if serial:
967 m_str = json.dumps(m)
968 m = json.loads(m_str.replace(serial, 'deleted'))
eafe8130
TL
969
970 if anon_host not in res:
971 res[anon_host] = {}
972 res[anon_host][anon_devid] = m
973 return res
974
20effc67 975 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
eafe8130 976 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
eafe8130
TL
977 if data:
978 return data[-1][1]
979 else:
980 return 0
981
20effc67 982 def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
eafe8130
TL
983 if not channels:
984 channels = self.get_active_channels()
11fdf7f2 985 report = {
522d829b 986 'leaderboard': self.leaderboard,
1e59de90 987 'leaderboard_description': self.leaderboard_description,
11fdf7f2 988 'report_version': 1,
eafe8130
TL
989 'report_timestamp': datetime.utcnow().isoformat(),
990 'report_id': self.report_id,
991 'channels': channels,
992 'channels_available': ALL_CHANNELS,
993 'license': LICENSE,
20effc67
TL
994 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
995 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
11fdf7f2
TL
996 }
997
eafe8130 998 if 'ident' in channels:
eafe8130
TL
999 for option in ['description', 'contact', 'organization']:
1000 report[option] = getattr(self, option)
1001
1002 if 'basic' in channels:
1003 mon_map = self.get('mon_map')
1004 osd_map = self.get('osd_map')
1005 service_map = self.get('service_map')
1006 fs_map = self.get('fs_map')
1007 df = self.get('df')
20effc67 1008 df_pools = {pool['id']: pool for pool in df['pools']}
eafe8130 1009
9f95a23c 1010 report['created'] = mon_map['created']
eafe8130
TL
1011
1012 # mons
1013 v1_mons = 0
1014 v2_mons = 0
1015 ipv4_mons = 0
1016 ipv6_mons = 0
1017 for mon in mon_map['mons']:
1018 for a in mon['public_addrs']['addrvec']:
1019 if a['type'] == 'v2':
1020 v2_mons += 1
1021 elif a['type'] == 'v1':
1022 v1_mons += 1
1023 if a['addr'].startswith('['):
1024 ipv6_mons += 1
1025 else:
1026 ipv4_mons += 1
1027 report['mon'] = {
1028 'count': len(mon_map['mons']),
1029 'features': mon_map['features'],
1030 'min_mon_release': mon_map['min_mon_release'],
1031 'v1_addr_mons': v1_mons,
1032 'v2_addr_mons': v2_mons,
1033 'ipv4_addr_mons': ipv4_mons,
1034 'ipv6_addr_mons': ipv6_mons,
1035 }
1036
1037 report['config'] = self.gather_configs()
1038
1039 # pools
20effc67
TL
1040
1041 rbd_num_pools = 0
1042 rbd_num_images_by_pool = []
1043 rbd_mirroring_by_pool = []
eafe8130
TL
1044 num_pg = 0
1045 report['pools'] = list()
1046 for pool in osd_map['pools']:
1047 num_pg += pool['pg_num']
1048 ec_profile = {}
1049 if pool['erasure_code_profile']:
1050 orig = osd_map['erasure_code_profiles'].get(
1051 pool['erasure_code_profile'], {})
1052 ec_profile = {
1053 k: orig[k] for k in orig.keys()
1054 if k in ['k', 'm', 'plugin', 'technique',
1055 'crush-failure-domain', 'l']
1056 }
20effc67 1057 pool_data = {
eafe8130 1058 'pool': pool['pool'],
eafe8130
TL
1059 'pg_num': pool['pg_num'],
1060 'pgp_num': pool['pg_placement_num'],
1061 'size': pool['size'],
1062 'min_size': pool['min_size'],
1063 'pg_autoscale_mode': pool['pg_autoscale_mode'],
1064 'target_max_bytes': pool['target_max_bytes'],
1065 'target_max_objects': pool['target_max_objects'],
1066 'type': ['', 'replicated', '', 'erasure'][pool['type']],
1067 'erasure_code_profile': ec_profile,
1068 'cache_mode': pool['cache_mode'],
1069 }
20effc67
TL
1070
1071 # basic_pool_usage collection
1072 if self.is_enabled_collection(Collection.basic_pool_usage):
1073 pool_data['application'] = []
1074 for application in pool['application_metadata']:
1075 # Only include default applications
1076 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
1077 pool_data['application'].append(application)
1078 pool_stats = df_pools[pool['pool']]['stats']
1079 pool_data['stats'] = { # filter out kb_used
1080 'avail_raw': pool_stats['avail_raw'],
1081 'bytes_used': pool_stats['bytes_used'],
1082 'compress_bytes_used': pool_stats['compress_bytes_used'],
1083 'compress_under_bytes': pool_stats['compress_under_bytes'],
1084 'data_bytes_used': pool_stats['data_bytes_used'],
1085 'dirty': pool_stats['dirty'],
1086 'max_avail': pool_stats['max_avail'],
1087 'objects': pool_stats['objects'],
1088 'omap_bytes_used': pool_stats['omap_bytes_used'],
1089 'percent_used': pool_stats['percent_used'],
1090 'quota_bytes': pool_stats['quota_bytes'],
1091 'quota_objects': pool_stats['quota_objects'],
1092 'rd': pool_stats['rd'],
1093 'rd_bytes': pool_stats['rd_bytes'],
1094 'stored': pool_stats['stored'],
1095 'stored_data': pool_stats['stored_data'],
1096 'stored_omap': pool_stats['stored_omap'],
1097 'stored_raw': pool_stats['stored_raw'],
1098 'wr': pool_stats['wr'],
1099 'wr_bytes': pool_stats['wr_bytes']
1100 }
39ae355f
TL
1101 pool_data['options'] = {}
1102 # basic_pool_options_bluestore collection
1103 if self.is_enabled_collection(Collection.basic_pool_options_bluestore):
1104 bluestore_options = ['compression_algorithm',
1105 'compression_mode',
1106 'compression_required_ratio',
1107 'compression_min_blob_size',
1108 'compression_max_blob_size']
1109 for option in bluestore_options:
1110 if option in pool['options']:
1111 pool_data['options'][option] = pool['options'][option]
20effc67 1112 cast(List[Dict[str, Any]], report['pools']).append(pool_data)
eafe8130 1113 if 'rbd' in pool['application_metadata']:
20effc67 1114 rbd_num_pools += 1
eafe8130 1115 ioctx = self.rados.open_ioctx(pool['pool_name'])
20effc67 1116 rbd_num_images_by_pool.append(
eafe8130 1117 sum(1 for _ in rbd.RBD().list2(ioctx)))
20effc67 1118 rbd_mirroring_by_pool.append(
eafe8130 1119 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
20effc67
TL
1120 report['rbd'] = {
1121 'num_pools': rbd_num_pools,
1122 'num_images_by_pool': rbd_num_images_by_pool,
1123 'mirroring_by_pool': rbd_mirroring_by_pool}
eafe8130
TL
1124
1125 # osds
1126 cluster_network = False
1127 for osd in osd_map['osds']:
1128 if osd['up'] and not cluster_network:
1129 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
92f5a8d4 1130 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
eafe8130
TL
1131 if front_ip != back_ip:
1132 cluster_network = True
1133 report['osd'] = {
1134 'count': len(osd_map['osds']),
1135 'require_osd_release': osd_map['require_osd_release'],
1136 'require_min_compat_client': osd_map['require_min_compat_client'],
1137 'cluster_network': cluster_network,
1138 }
1139
1140 # crush
1141 report['crush'] = self.gather_crush_info()
1142
1143 # cephfs
1144 report['fs'] = {
1145 'count': len(fs_map['filesystems']),
1146 'feature_flags': fs_map['feature_flags'],
1147 'num_standby_mds': len(fs_map['standbys']),
1148 'filesystems': [],
1149 }
1150 num_mds = len(fs_map['standbys'])
1151 for fsm in fs_map['filesystems']:
1152 fs = fsm['mdsmap']
1153 num_sessions = 0
1154 cached_ino = 0
1155 cached_dn = 0
1156 cached_cap = 0
1157 subtrees = 0
1158 rfiles = 0
1159 rbytes = 0
1160 rsnaps = 0
1161 for gid, mds in fs['info'].items():
1162 num_sessions += self.get_latest('mds', mds['name'],
1163 'mds_sessions.session_count')
1164 cached_ino += self.get_latest('mds', mds['name'],
1165 'mds_mem.ino')
1166 cached_dn += self.get_latest('mds', mds['name'],
1167 'mds_mem.dn')
1168 cached_cap += self.get_latest('mds', mds['name'],
1169 'mds_mem.cap')
1170 subtrees += self.get_latest('mds', mds['name'],
1171 'mds.subtrees')
1172 if mds['rank'] == 0:
1173 rfiles = self.get_latest('mds', mds['name'],
1174 'mds.root_rfiles')
1175 rbytes = self.get_latest('mds', mds['name'],
1176 'mds.root_rbytes')
1177 rsnaps = self.get_latest('mds', mds['name'],
1178 'mds.root_rsnaps')
20effc67 1179 report['fs']['filesystems'].append({ # type: ignore
eafe8130
TL
1180 'max_mds': fs['max_mds'],
1181 'ever_allowed_features': fs['ever_allowed_features'],
1182 'explicitly_allowed_features': fs['explicitly_allowed_features'],
1183 'num_in': len(fs['in']),
1184 'num_up': len(fs['up']),
1185 'num_standby_replay': len(
1186 [mds for gid, mds in fs['info'].items()
1187 if mds['state'] == 'up:standby-replay']),
1188 'num_mds': len(fs['info']),
1189 'num_sessions': num_sessions,
1190 'cached_inos': cached_ino,
1191 'cached_dns': cached_dn,
1192 'cached_caps': cached_cap,
1193 'cached_subtrees': subtrees,
1194 'balancer_enabled': len(fs['balancer']) > 0,
1195 'num_data_pools': len(fs['data_pools']),
1196 'standby_count_wanted': fs['standby_count_wanted'],
1197 'approx_ctime': fs['created'][0:7],
1198 'files': rfiles,
1199 'bytes': rbytes,
1200 'snaps': rsnaps,
1201 })
1202 num_mds += len(fs['info'])
20effc67 1203 report['fs']['total_num_mds'] = num_mds # type: ignore
eafe8130
TL
1204
1205 # daemons
20effc67
TL
1206 report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
1207 mon=self.gather_mon_metadata(mon_map))
1208
1209 if self.is_enabled_collection(Collection.basic_mds_metadata):
1210 report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
eafe8130
TL
1211
1212 # host counts
1213 servers = self.list_servers()
1214 self.log.debug('servers %s' % servers)
20effc67 1215 hosts = {
eafe8130
TL
1216 'num': len([h for h in servers if h['hostname']]),
1217 }
1218 for t in ['mon', 'mds', 'osd', 'mgr']:
20effc67
TL
1219 nr_services = sum(1 for host in servers if
1220 any(service for service in cast(List[ServiceInfoT],
1221 host['services'])
1222 if service['type'] == t))
1223 hosts['num_with_' + t] = nr_services
1224 report['hosts'] = hosts
eafe8130
TL
1225
1226 report['usage'] = {
1227 'pools': len(df['pools']),
92f5a8d4 1228 'pg_num': num_pg,
eafe8130
TL
1229 'total_used_bytes': df['stats']['total_used_bytes'],
1230 'total_bytes': df['stats']['total_bytes'],
1231 'total_avail_bytes': df['stats']['total_avail_bytes']
1232 }
20effc67
TL
1233 # basic_usage_by_class collection
1234 if self.is_enabled_collection(Collection.basic_usage_by_class):
1235 report['usage']['stats_by_class'] = {} # type: ignore
1236 for device_class in df['stats_by_class']:
1237 if device_class in ['hdd', 'ssd', 'nvme']:
1238 report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
1239
1240 services: DefaultDict[str, int] = defaultdict(int)
eafe8130 1241 for key, value in service_map['services'].items():
20effc67 1242 services[key] += 1
eafe8130 1243 if key == 'rgw':
20effc67 1244 rgw = {}
eafe8130 1245 zones = set()
eafe8130
TL
1246 zonegroups = set()
1247 frontends = set()
20effc67 1248 count = 0
eafe8130 1249 d = value.get('daemons', dict())
20effc67 1250 for k, v in d.items():
eafe8130 1251 if k == 'summary' and v:
20effc67 1252 rgw[k] = v
eafe8130 1253 elif isinstance(v, dict) and 'metadata' in v:
20effc67 1254 count += 1
eafe8130
TL
1255 zones.add(v['metadata']['zone_id'])
1256 zonegroups.add(v['metadata']['zonegroup_id'])
1257 frontends.add(v['metadata']['frontend_type#0'])
1258
1259 # we could actually iterate over all the keys of
1260 # the dict and check for how many frontends there
1261 # are, but it is unlikely that one would be running
1262 # more than 2 supported ones
1263 f2 = v['metadata'].get('frontend_type#1', None)
1264 if f2:
1265 frontends.add(f2)
1266
20effc67
TL
1267 rgw['count'] = count
1268 rgw['zones'] = len(zones)
1269 rgw['zonegroups'] = len(zonegroups)
1270 rgw['frontends'] = list(frontends) # sets aren't json-serializable
1271 report['rgw'] = rgw
1272 report['services'] = services
eafe8130
TL
1273
1274 try:
1275 report['balancer'] = self.remote('balancer', 'gather_telemetry')
1276 except ImportError:
1277 report['balancer'] = {
1278 'active': False
11fdf7f2 1279 }
11fdf7f2 1280
33c7a0ef
TL
1281 # Rook
1282 self.get_rook_data(report)
1283
eafe8130
TL
1284 if 'crash' in channels:
1285 report['crashes'] = self.gather_crashinfo()
11fdf7f2 1286
20effc67
TL
1287 if 'perf' in channels:
1288 if self.is_enabled_collection(Collection.perf_perf):
1289 report['perf_counters'] = self.gather_perf_counters('separated')
1290 report['stats_per_pool'] = self.get_stats_per_pool()
1291 report['stats_per_pg'] = self.get_stats_per_pg()
1292 report['io_rate'] = self.get_io_rate()
1293 report['osd_perf_histograms'] = self.get_osd_histograms('separated')
1294 report['mempool'] = self.get_mempool('separated')
1295 report['heap_stats'] = self.get_heap_stats()
1296 report['rocksdb_stats'] = self.get_rocksdb_stats()
1297
eafe8130
TL
1298 # NOTE: We do not include the 'device' channel in this report; it is
1299 # sent to a different endpoint.
11fdf7f2
TL
1300
1301 return report
1302
33c7a0ef
TL
1303 def get_rook_data(self, report: Dict[str, object]) -> None:
1304 r, outb, outs = self.mon_command({
1305 'prefix': 'config-key dump',
1306 'format': 'json'
1307 })
1308 if r != 0:
1309 return
1310 try:
1311 config_kv_dump = json.loads(outb)
1312 except json.decoder.JSONDecodeError:
1313 return
1314
1315 for elem in ROOK_KEYS_BY_COLLECTION:
1316 # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
1317 # elem[1] is the Collection this key belongs to
1318 if self.is_enabled_collection(elem[1]):
1319 self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
1320
1321 def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
1322 last_node = key_path.split('/')[-1]
1323 for node in key_path.split('/')[0:-1]:
1324 if node not in report:
1325 report[node] = {}
1326 report = report[node] # type: ignore
1327
1328 # sanity check of keys correctness
1329 if not isinstance(report, dict):
1330 self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
1331 return
1332
1333 if last_node in report:
1334 self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
1335 return
1336
1337 report[last_node] = value
1338
20effc67 1339 def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
9f95a23c
TL
1340 self.log.info('Sending %s to: %s' % (what, url))
1341 proxies = dict()
1342 if self.proxy:
1343 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
1344 proxies['http'] = self.proxy
1345 proxies['https'] = self.proxy
1346 try:
1347 resp = requests.put(url=url, json=report, proxies=proxies)
1348 resp.raise_for_status()
1349 except Exception as e:
1350 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
1351 self.log.error(fail_reason)
1352 return fail_reason
1353 return None
1354
20effc67
TL
1355 class EndPoint(enum.Enum):
1356 ceph = 'ceph'
1357 device = 'device'
1358
1359 def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
1360 '''
1361 Find collections that are available in the module, but are not in the db
1362 '''
1363 if self.db_collection is None:
1364 return None
1365
1366 if not channels:
1367 channels = ALL_CHANNELS
1368 else:
1369 for ch in channels:
1370 if ch not in ALL_CHANNELS:
1371 self.log.debug(f"invalid channel name: {ch}")
1372 return None
1373
1374 new_collection : List[Collection] = []
1375
1376 for c in MODULE_COLLECTION:
1377 if c['name'].name not in self.db_collection:
1378 if c['channel'] in channels:
1379 new_collection.append(c['name'])
1380
1381 return new_collection
1382
1383 def is_major_upgrade(self) -> bool:
1384 '''
1385 Returns True only if the user last opted-in to an older major
1386 '''
1387 if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
1388 # we do not know what Ceph version was when the user last opted-in,
1389 # thus we do not wish to nag in case of a major upgrade
1390 return False
1391
1392 mon_map = self.get('mon_map')
1393 mon_min = mon_map.get("min_mon_release", 0)
1394
1395 if mon_min - self.last_opted_in_ceph_version > 0:
1396 self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1397 return True
1398
1399 return False
1400
1401 def is_opted_in(self) -> bool:
1402 # If len is 0 it means that the user is either opted-out (never
1403 # opted-in, or invoked `telemetry off`), or they upgraded from a
1404 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1405 # regardless, hence is considered as opted-out
1406 if self.db_collection is None:
1407 return False
1408 return len(self.db_collection) > 0
1409
1410 def should_nag(self) -> bool:
1411 # Find delta between opted-in collections and module collections;
1412 # nag only if module has a collection which is not in db, and nag == True.
1413
1414 # We currently do not nag if the user is opted-out (or never opted-in).
1415 # If we wish to do this in the future, we need to have a tri-mode state
1416 # (opted in, opted out, no action yet), and it needs to be guarded by a
1417 # config option (so that nagging can be turned off via config).
1418 # We also need to add a last_opted_out_ceph_version variable, for the
1419 # major upgrade check.
1420
1421 # check if there are collections the user is not opt-in to
1422 # that we should nag about
1423 if self.db_collection is not None:
1424 for c in MODULE_COLLECTION:
1425 if c['name'].name not in self.db_collection:
1426 if c['nag'] == True:
1427 self.log.debug(f"The collection: {c['name']} is not reported")
1428 return True
1429
1430 # user might be opted-in to the most recent collection, or there is no
1431 # new collection which requires nagging about; thus nag in case it's a
1432 # major upgrade and there are new collections
1433 # (which their own nag == False):
1434 new_collections = False
1435 col_delta = self.collection_delta()
1436 if col_delta is not None and len(col_delta) > 0:
1437 new_collections = True
1438
1439 return self.is_major_upgrade() and new_collections
1440
1441 def init_collection(self) -> None:
1442 # We fetch from db the collections the user had already opted-in to.
1443 # During the transition the results will be empty, but the user might
1444 # be opted-in to an older version (e.g. revision = 3)
1445
1446 collection = self.get_store('collection')
1447
1448 if collection is not None:
1449 self.db_collection = json.loads(collection)
1450
1451 if self.db_collection is None:
1452 # happens once on upgrade
1453 if not self.enabled:
1454 # user is not opted-in
1455 self.set_store('collection', json.dumps([]))
1456 self.log.debug("user is not opted-in")
1457 else:
1458 # user is opted-in, verify the revision:
1459 if self.last_opt_revision == REVISION:
1460 self.log.debug(f"telemetry revision is {REVISION}")
1461 base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
1462 self.set_store('collection', json.dumps(base_collection))
1463 else:
1464 # user is opted-in to an older version, meaning they need
1465 # to re-opt in regardless
1466 self.set_store('collection', json.dumps([]))
1467 self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1468
1469 # reload collection after setting
1470 collection = self.get_store('collection')
1471 if collection is not None:
1472 self.db_collection = json.loads(collection)
1473 else:
1474 raise RuntimeError('collection is None after initial setting')
1475 else:
1476 # user has already upgraded
1477 self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
1478
1479 def is_enabled_collection(self, collection: Collection) -> bool:
1480 if self.db_collection is None:
1481 return False
1482 return collection.name in self.db_collection
1483
1484 def opt_in_all_collections(self) -> None:
1485 """
1486 Opt-in to all collections; Update db with the currently available collections in the module
1487 """
1488 if self.db_collection is None:
1489 raise RuntimeError('db_collection is None after initial setting')
1490
1491 for c in MODULE_COLLECTION:
1492 if c['name'].name not in self.db_collection:
1493 self.db_collection.append(c['name'])
1494
1495 self.set_store('collection', json.dumps(self.db_collection))
1496
1497 def send(self,
1498 report: Dict[str, Dict[str, str]],
1499 endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
eafe8130 1500 if not endpoint:
20effc67 1501 endpoint = [self.EndPoint.ceph, self.EndPoint.device]
eafe8130
TL
1502 failed = []
1503 success = []
eafe8130 1504 self.log.debug('Send endpoints %s' % endpoint)
eafe8130 1505 for e in endpoint:
20effc67 1506 if e == self.EndPoint.ceph:
9f95a23c
TL
1507 fail_reason = self._try_post('ceph report', self.url, report)
1508 if fail_reason:
1509 failed.append(fail_reason)
eafe8130
TL
1510 else:
1511 now = int(time.time())
1512 self.last_upload = now
1513 self.set_store('last_upload', str(now))
1514 success.append('Ceph report sent to {0}'.format(self.url))
1515 self.log.info('Sent report to {0}'.format(self.url))
20effc67 1516 elif e == self.EndPoint.device:
eafe8130 1517 if 'device' in self.get_active_channels():
eafe8130 1518 devices = self.gather_device_report()
20effc67
TL
1519 if devices:
1520 num_devs = 0
1521 num_hosts = 0
1522 for host, ls in devices.items():
1523 self.log.debug('host %s devices %s' % (host, ls))
1524 if not len(ls):
1525 continue
1526 fail_reason = self._try_post('devices', self.device_url,
1527 ls)
1528 if fail_reason:
1529 failed.append(fail_reason)
1530 else:
1531 num_devs += len(ls)
1532 num_hosts += 1
1533 if num_devs:
1534 success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1535 num_devs, num_hosts, len(devices)))
1536 else:
1537 fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
1538 failed.append(fail_reason)
1539 self.log.error(fail_reason)
eafe8130
TL
1540 if failed:
1541 return 1, '', '\n'.join(success + failed)
1542 return 0, '', '\n'.join(success)
11fdf7f2 1543
20effc67
TL
1544 def format_perf_histogram(self, report: Dict[str, Any]) -> None:
1545 # Formatting the perf histograms so they are human-readable. This will change the
1546 # ranges and values, which are currently in list form, into strings so that
1547 # they are displayed horizontally instead of vertically.
1e59de90
TL
1548 if 'report' in report:
1549 report = report['report']
20effc67
TL
1550 try:
1551 # Formatting ranges and values in osd_perf_histograms
1552 mode = 'osd_perf_histograms'
1553 for config in report[mode]:
1554 for histogram in config:
1555 # Adjust ranges by converting lists into strings
1556 for axis in config[histogram]['axes']:
1557 for i in range(0, len(axis['ranges'])):
1558 axis['ranges'][i] = str(axis['ranges'][i])
1559
1560 for osd in config[histogram]['osds']:
1561 for i in range(0, len(osd['values'])):
1562 osd['values'][i] = str(osd['values'][i])
1563 except KeyError:
1564 # If the perf channel is not enabled, there should be a KeyError since
1565 # 'osd_perf_histograms' would not be present in the report. In that case,
1566 # the show function should pass as usual without trying to format the
1567 # histograms.
1568 pass
1569
1570 def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1571 '''
1572 Enable or disable a list of channels
1573 '''
1574 if not self.enabled:
1575 # telemetry should be on for channels to be toggled
1576 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1577 'Preview sample reports with `ceph telemetry preview`.'
1578 return 0, msg, ''
1579
1580 if channels is None:
1581 msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1582 return 0, msg, ''
1583
1584 state = action == 'enable'
1585 msg = ''
1586 for c in channels:
1587 if c not in ALL_CHANNELS:
1588 msg = f"{msg}{c} is not a valid channel name. "\
1589 f"Available channels: {ALL_CHANNELS}.\n"
1590 else:
1591 self.set_module_option(f"channel_{c}", state)
1592 setattr(self,
1593 f"channel_{c}",
1594 state)
1595 msg = f"{msg}channel_{c} is {action}d\n"
1596
1597 return 0, msg, ''
1598
1599 @CLIReadCommand('telemetry status')
1600 def status(self) -> Tuple[int, str, str]:
1601 '''
1602 Show current configuration
1603 '''
1604 r = {}
1605 for opt in self.MODULE_OPTIONS:
1606 r[opt['name']] = getattr(self, opt['name'])
1607 r['last_upload'] = (time.ctime(self.last_upload)
1608 if self.last_upload else self.last_upload)
1609 return 0, json.dumps(r, indent=4, sort_keys=True), ''
1610
1611 @CLIReadCommand('telemetry diff')
1612 def diff(self) -> Tuple[int, str, str]:
1613 '''
1614 Show the diff between opted-in collection and available collection
1615 '''
1616 diff = []
1617 keys = ['nag']
1618
1619 for c in MODULE_COLLECTION:
1620 if not self.is_enabled_collection(c['name']):
1621 diff.append({key: val for key, val in c.items() if key not in keys})
1622
1623 r = None
1624 if diff == []:
1625 r = "Telemetry is up to date"
1626 else:
1627 r = json.dumps(diff, indent=4, sort_keys=True)
1628
1629 return 0, r, ''
1630
1631 @CLICommand('telemetry on')
1632 def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
1633 '''
1634 Enable telemetry reports from this cluster
1635 '''
1636 if license != LICENSE:
1637 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1638To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
11fdf7f2 1639 else:
20effc67
TL
1640 self.set_module_option('enabled', True)
1641 self.enabled = True
1642 self.opt_in_all_collections()
11fdf7f2 1643
20effc67
TL
1644 # for major releases upgrade nagging
1645 mon_map = self.get('mon_map')
1646 mon_min = mon_map.get("min_mon_release", 0)
1647 self.set_store('last_opted_in_ceph_version', str(mon_min))
1648 self.last_opted_in_ceph_version = mon_min
1649
1650 msg = 'Telemetry is on.'
1651 disabled_channels = ''
1652 active_channels = self.get_active_channels()
1653 for c in ALL_CHANNELS:
1654 if c not in active_channels and c != 'ident':
1655 disabled_channels = f"{disabled_channels} {c}"
1656
1657 if len(disabled_channels) > 0:
1658 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
1659 f"`ceph telemetry enable channel{disabled_channels}`"
1660
2a845540
TL
1661 # wake up serve() to reset health warning
1662 self.event.set()
1663
20effc67
TL
1664 return 0, msg, ''
1665
1666 @CLICommand('telemetry off')
1667 def off(self) -> Tuple[int, str, str]:
1668 '''
1669 Disable telemetry reports from this cluster
1670 '''
1671 if not self.enabled:
1672 # telemetry is already off
1673 msg = 'Telemetry is currently not enabled, nothing to turn off. '\
1674 'Please consider opting-in with `ceph telemetry on`.\n' \
1675 'Preview sample reports with `ceph telemetry preview`.'
1676 return 0, msg, ''
e306af50 1677
e306af50 1678 self.set_module_option('enabled', False)
20effc67
TL
1679 self.enabled = False
1680 self.set_store('collection', json.dumps([]))
1681 self.db_collection = []
1682
1683 # we might need this info in the future, in case
1684 # of nagging when user is opted-out
1685 mon_map = self.get('mon_map')
1686 mon_min = mon_map.get("min_mon_release", 0)
1687 self.set_store('last_opted_out_ceph_version', str(mon_min))
1688 self.last_opted_out_ceph_version = mon_min
1689
1690 msg = 'Telemetry is now disabled.'
1691 return 0, msg, ''
1692
1693 @CLIReadCommand('telemetry enable channel all')
1694 def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1695 '''
1696 Enable all channels
1697 '''
1698 return self.toggle_channel('enable', channels)
1699
1700 @CLIReadCommand('telemetry enable channel')
1701 def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1702 '''
1703 Enable a list of channels
1704 '''
1705 return self.toggle_channel('enable', channels)
1706
1707 @CLIReadCommand('telemetry disable channel all')
1708 def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1709 '''
1710 Disable all channels
1711 '''
1712 return self.toggle_channel('disable', channels)
1713
1714 @CLIReadCommand('telemetry disable channel')
1715 def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1716 '''
1717 Disable a list of channels
1718 '''
1719 return self.toggle_channel('disable', channels)
1720
1721 @CLIReadCommand('telemetry channel ls')
1722 def channel_ls(self) -> Tuple[int, str, str]:
1723 '''
1724 List all channels
1725 '''
1726 table = PrettyTable(
1727 [
1728 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1729 ],
1730 border=False)
1731 table.align['NAME'] = 'l'
1732 table.align['ENABLED'] = 'l'
1733 table.align['DEFAULT'] = 'l'
1734 table.align['DESC'] = 'l'
1735 table.left_padding_width = 0
1736 table.right_padding_width = 4
1737
1738 for c in ALL_CHANNELS:
1739 enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
1740 for o in self.MODULE_OPTIONS:
1741 if o['name'] == f"channel_{c}":
1742 default = "ON" if o.get('default', None) else "OFF"
1743 desc = o.get('desc', None)
1744
1745 table.add_row((
1746 c,
1747 enabled,
1748 default,
1749 desc,
1750 ))
1751
1752 return 0, table.get_string(sortby="NAME"), ''
1753
1754 @CLIReadCommand('telemetry collection ls')
1755 def collection_ls(self) -> Tuple[int, str, str]:
1756 '''
1757 List all collections
1758 '''
1759 col_delta = self.collection_delta()
1760 msg = ''
1761 if col_delta is not None and len(col_delta) > 0:
1762 msg = f"New collections are available:\n" \
1763 f"{sorted([c.name for c in col_delta])}\n" \
1764 f"Run `ceph telemetry on` to opt-in to these collections.\n"
1765
1766 table = PrettyTable(
1767 [
1768 'NAME', 'STATUS', 'DESC',
1769 ],
1770 border=False)
1771 table.align['NAME'] = 'l'
1772 table.align['STATUS'] = 'l'
1773 table.align['DESC'] = 'l'
1774 table.left_padding_width = 0
1775 table.right_padding_width = 4
1776
1777 for c in MODULE_COLLECTION:
1778 name = c['name']
1779 opted_in = self.is_enabled_collection(name)
1780 channel_enabled = getattr(self, f"channel_{c['channel']}")
1781
1782 status = ''
1783 if channel_enabled and opted_in:
1784 status = "REPORTING"
1785 else:
1786 why = ''
1787 delimiter = ''
1788
1789 if not opted_in:
1790 why += "NOT OPTED-IN"
1791 delimiter = ', '
1792 if not channel_enabled:
1793 why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
1794
1795 status = f"NOT REPORTING: {why}"
1796
1797 desc = c['description']
1798
1799 table.add_row((
1800 name,
1801 status,
1802 desc,
1803 ))
1804
1805 if len(msg):
1806 # add a new line between message and table output
1807 msg = f"{msg} \n"
1808
1809 return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
1810
1811 @CLICommand('telemetry send')
1812 def do_send(self,
1813 endpoint: Optional[List[EndPoint]] = None,
1814 license: Optional[str] = None) -> Tuple[int, str, str]:
1815 '''
1816 Send a sample report
1817 '''
1818 if not self.is_opted_in() and license != LICENSE:
1819 self.log.debug(('A telemetry send attempt while opted-out. '
1820 'Asking for license agreement'))
1821 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1822To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1823Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1824 else:
1825 self.last_report = self.compile_report()
1826 return self.send(self.last_report, endpoint)
1827
1828 @CLIReadCommand('telemetry show')
1829 def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1830 '''
1831 Show a sample report of opted-in collections (except for 'device')
1832 '''
1833 if not self.enabled:
1834 # if telemetry is off, no report is being sent, hence nothing to show
1835 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1836 'Preview sample reports with `ceph telemetry preview`.'
1837 return 0, msg, ''
1838
1839 report = self.get_report_locked(channels=channels)
1840 self.format_perf_histogram(report)
1841 report = json.dumps(report, indent=4, sort_keys=True)
1842
1843 if self.channel_device:
1844 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1845
1846 return 0, report, ''
1847
1848 @CLIReadCommand('telemetry preview')
1849 def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1850 '''
1851 Preview a sample report of the most recent collections available (except for 'device')
1852 '''
1853 report = {}
1854
1855 # We use a lock to prevent a scenario where the user wishes to preview
1856 # the report, and at the same time the module hits the interval of
1857 # sending a report with the opted-in collection, which has less data
1858 # than in the preview report.
1859 col_delta = self.collection_delta()
1860 with self.get_report_lock:
1861 if col_delta is not None and len(col_delta) == 0:
1862 # user is already opted-in to the most recent collection
1863 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1864 return 0, msg, ''
1865 else:
1866 # there are collections the user is not opted-in to
1867 next_collection = []
1868
1869 for c in MODULE_COLLECTION:
1870 next_collection.append(c['name'].name)
1871
1872 opted_in_collection = self.db_collection
1873 self.db_collection = next_collection
1874 report = self.get_report(channels=channels)
1875 self.db_collection = opted_in_collection
e306af50 1876
20effc67
TL
1877 self.format_perf_histogram(report)
1878 report = json.dumps(report, indent=4, sort_keys=True)
1879
1880 if self.channel_device:
1881 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1882
1883 return 0, report, ''
1884
1885 @CLIReadCommand('telemetry show-device')
1886 def show_device(self) -> Tuple[int, str, str]:
1887 '''
1888 Show a sample device report
1889 '''
1890 if not self.enabled:
1891 # if telemetry is off, no report is being sent, hence nothing to show
1892 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1893 'Preview sample device reports with `ceph telemetry preview-device`.'
1894 return 0, msg, ''
1895
1896 if not self.channel_device:
1897 # if device channel is off, device report is not being sent, hence nothing to show
1898 msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1899 'Preview sample device reports with `ceph telemetry preview-device`.'
1900 return 0, msg, ''
1901
1902 return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
1903
1904 @CLIReadCommand('telemetry preview-device')
1905 def preview_device(self) -> Tuple[int, str, str]:
1906 '''
1907 Preview a sample device report of the most recent device collection
1908 '''
1909 report = {}
1910
1911 device_col_delta = self.collection_delta(['device'])
1912 with self.get_report_lock:
1913 if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
1914 # user is already opted-in to the most recent device collection,
1915 # and device channel is on, thus `show-device` should be called
1916 msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1917 return 0, msg, ''
1918
1919 # either the user is not opted-in at all, or there are collections
1920 # they are not opted-in to
1921 next_collection = []
1922
1923 for c in MODULE_COLLECTION:
1924 next_collection.append(c['name'].name)
1925
1926 opted_in_collection = self.db_collection
1927 self.db_collection = next_collection
1928 report = self.get_report('device')
1929 self.db_collection = opted_in_collection
1930
1931 report = json.dumps(report, indent=4, sort_keys=True)
1932 return 0, report, ''
1933
1934 @CLIReadCommand('telemetry show-all')
1935 def show_all(self) -> Tuple[int, str, str]:
1936 '''
1937 Show a sample report of all enabled channels (including 'device' channel)
1938 '''
1939 if not self.enabled:
1940 # if telemetry is off, no report is being sent, hence nothing to show
1941 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1942 'Preview sample reports with `ceph telemetry preview`.'
1943 return 0, msg, ''
1944
1945 if not self.channel_device:
1946 # device channel is off, no need to display its report
1e59de90
TL
1947 report = self.get_report_locked('default')
1948 else:
1949 # telemetry is on and device channel is enabled, show both
1950 report = self.get_report_locked('all')
20effc67 1951
1e59de90
TL
1952 self.format_perf_histogram(report)
1953 return 0, json.dumps(report, indent=4, sort_keys=True), ''
20effc67
TL
1954
1955 @CLIReadCommand('telemetry preview-all')
1956 def preview_all(self) -> Tuple[int, str, str]:
1957 '''
1958 Preview a sample report of the most recent collections available of all channels (including 'device')
1959 '''
1960 report = {}
1961
1962 col_delta = self.collection_delta()
1963 with self.get_report_lock:
1964 if col_delta is not None and len(col_delta) == 0:
1965 # user is already opted-in to the most recent collection
1966 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1967 return 0, msg, ''
1968
1969 # there are collections the user is not opted-in to
1970 next_collection = []
1971
1972 for c in MODULE_COLLECTION:
1973 next_collection.append(c['name'].name)
1974
1975 opted_in_collection = self.db_collection
1976 self.db_collection = next_collection
1977 report = self.get_report('all')
1978 self.db_collection = opted_in_collection
1979
1980 self.format_perf_histogram(report)
1981 report = json.dumps(report, indent=4, sort_keys=True)
1982
1983 return 0, report, ''
1984
1985 def get_report_locked(self,
1986 report_type: str = 'default',
1987 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1988 '''
1989 A wrapper around get_report to allow for compiling a report of the most recent module collections
1990 '''
1991 with self.get_report_lock:
1992 return self.get_report(report_type, channels)
1993
1994 def get_report(self,
1995 report_type: str = 'default',
1996 channels: Optional[List[str]] = None) -> Dict[str, Any]:
e306af50
TL
1997 if report_type == 'default':
1998 return self.compile_report(channels=channels)
1999 elif report_type == 'device':
2000 return self.gather_device_report()
2001 elif report_type == 'all':
2002 return {'report': self.compile_report(channels=channels),
2003 'device_report': self.gather_device_report()}
2004 return {}
2005
20effc67 2006 def self_test(self) -> None:
1e59de90
TL
2007 self.opt_in_all_collections()
2008 report = self.compile_report(channels=ALL_CHANNELS)
11fdf7f2
TL
2009 if len(report) == 0:
2010 raise RuntimeError('Report is empty')
2011
2012 if 'report_id' not in report:
2013 raise RuntimeError('report_id not found in report')
2014
20effc67 2015 def shutdown(self) -> None:
11fdf7f2
TL
2016 self.run = False
2017 self.event.set()
2018
20effc67 2019 def refresh_health_checks(self) -> None:
eafe8130 2020 health_checks = {}
20effc67
TL
2021 # TODO do we want to nag also in case the user is not opted-in?
2022 if self.enabled and self.should_nag():
eafe8130
TL
2023 health_checks['TELEMETRY_CHANGED'] = {
2024 'severity': 'warning',
2025 'summary': 'Telemetry requires re-opt-in',
2026 'detail': [
20effc67 2027 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
eafe8130
TL
2028 ]
2029 }
2030 self.set_health_checks(health_checks)
2031
20effc67 2032 def serve(self) -> None:
11fdf7f2 2033 self.load()
11fdf7f2
TL
2034 self.run = True
2035
2036 self.log.debug('Waiting for mgr to warm up')
20effc67 2037 time.sleep(10)
11fdf7f2
TL
2038
2039 while self.run:
eafe8130
TL
2040 self.event.clear()
2041
2042 self.refresh_health_checks()
2043
20effc67 2044 if not self.is_opted_in():
eafe8130
TL
2045 self.log.debug('Not sending report until user re-opts-in')
2046 self.event.wait(1800)
2047 continue
11fdf7f2 2048 if not self.enabled:
eafe8130 2049 self.log.debug('Not sending report until configured to do so')
11fdf7f2
TL
2050 self.event.wait(1800)
2051 continue
2052
2053 now = int(time.time())
20effc67
TL
2054 if not self.last_upload or \
2055 (now - self.last_upload) > self.interval * 3600:
11fdf7f2
TL
2056 self.log.info('Compiling and sending report to %s',
2057 self.url)
2058
2059 try:
2060 self.last_report = self.compile_report()
20effc67 2061 except Exception:
11fdf7f2
TL
2062 self.log.exception('Exception while compiling report:')
2063
eafe8130 2064 self.send(self.last_report)
11fdf7f2 2065 else:
eafe8130 2066 self.log.debug('Interval for sending new report has not expired')
11fdf7f2
TL
2067
2068 sleep = 3600
2069 self.log.debug('Sleeping for %d seconds', sleep)
2070 self.event.wait(sleep)
2071
11fdf7f2 2072 @staticmethod
20effc67 2073 def can_run() -> Tuple[bool, str]:
11fdf7f2 2074 return True, ''