]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/telemetry/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
CommitLineData
11fdf7f2
TL
1"""
2Telemetry module for ceph-mgr
3
4Collect statistics from Ceph cluster and send this back to the Ceph project
5when user has opted-in
6"""
20effc67
TL
7import logging
8import numbers
9import enum
11fdf7f2 10import errno
eafe8130 11import hashlib
11fdf7f2 12import json
eafe8130 13import rbd
11fdf7f2
TL
14import requests
15import uuid
16import time
eafe8130 17from datetime import datetime, timedelta
20effc67
TL
18from prettytable import PrettyTable
19from threading import Event, Lock
11fdf7f2 20from collections import defaultdict
20effc67 21from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
11fdf7f2 22
20effc67 23from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
11fdf7f2
TL
24
25
20effc67 26ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
eafe8130 27
20effc67
TL
28LICENSE = 'sharing-1-0'
29LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
30LICENSE_URL = 'https://cdla.io/sharing-1-0/'
eafe8130
TL
31
32# Latest revision of the telemetry report. Bump this each time we make
33# *any* change.
34REVISION = 3
35
36# History of revisions
37# --------------------
38#
39# Version 1:
40# Mimic and/or nautilus are lumped together here, since
41# we didn't track revisions yet.
42#
43# Version 2:
44# - added revision tracking, nagging, etc.
45# - added config option changes
46# - added channels
47# - added explicit license acknowledgement to the opt-in process
48#
49# Version 3:
50# - added device health metrics (i.e., SMART data, minus serial number)
51# - remove crush_rule
52# - added CephFS metadata (how many MDSs, fs features, how many data pools,
53# how much metadata is cached, rfiles, rbytes, rsnapshots)
54# - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
55# - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
56# - whether an OSD cluster network is in use
57# - rbd pool and image count, and rbd mirror mode (pool-level)
58# - rgw daemons, zones, zonegroups; which rgw frontends
59# - crush map stats
60
20effc67
TL
61class Collection(str, enum.Enum):
62 basic_base = 'basic_base'
63 device_base = 'device_base'
64 crash_base = 'crash_base'
65 ident_base = 'ident_base'
66 perf_perf = 'perf_perf'
67 basic_mds_metadata = 'basic_mds_metadata'
68 basic_pool_usage = 'basic_pool_usage'
69 basic_usage_by_class = 'basic_usage_by_class'
70
71MODULE_COLLECTION : List[Dict] = [
72 {
73 "name": Collection.basic_base,
74 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
75 "channel": "basic",
76 "nag": False
77 },
78 {
79 "name": Collection.device_base,
80 "description": "Information about device health metrics",
81 "channel": "device",
82 "nag": False
83 },
84 {
85 "name": Collection.crash_base,
86 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
87 "channel": "crash",
88 "nag": False
89 },
90 {
91 "name": Collection.ident_base,
92 "description": "User-provided identifying information about the cluster",
93 "channel": "ident",
94 "nag": False
95 },
96 {
97 "name": Collection.perf_perf,
98 "description": "Information about performance counters of the cluster",
99 "channel": "perf",
100 "nag": True
101 },
102 {
103 "name": Collection.basic_mds_metadata,
104 "description": "MDS metadata",
105 "channel": "basic",
106 "nag": False
107 },
108 {
109 "name": Collection.basic_pool_usage,
110 "description": "Default pool application and usage statistics",
111 "channel": "basic",
112 "nag": False
113 },
114 {
115 "name": Collection.basic_usage_by_class,
116 "description": "Default device class usage statistics",
117 "channel": "basic",
118 "nag": False
119 }
120]
11fdf7f2 121
20effc67 122class Module(MgrModule):
11fdf7f2 123 metadata_keys = [
20effc67
TL
124 "arch",
125 "ceph_version",
126 "os",
127 "cpu",
128 "kernel_description",
129 "kernel_version",
130 "distro_description",
131 "distro"
11fdf7f2
TL
132 ]
133
134 MODULE_OPTIONS = [
20effc67
TL
135 Option(name='url',
136 type='str',
137 default='https://telemetry.ceph.com/report'),
138 Option(name='device_url',
139 type='str',
140 default='https://telemetry.ceph.com/device'),
141 Option(name='enabled',
142 type='bool',
143 default=False),
144 Option(name='last_opt_revision',
145 type='int',
146 default=1),
147 Option(name='leaderboard',
148 type='bool',
149 default=False),
150 Option(name='description',
151 type='str',
152 default=None),
153 Option(name='contact',
154 type='str',
155 default=None),
156 Option(name='organization',
157 type='str',
158 default=None),
159 Option(name='proxy',
160 type='str',
161 default=None),
162 Option(name='interval',
163 type='int',
164 default=24,
165 min=8),
166 Option(name='channel_basic',
167 type='bool',
168 default=True,
169 desc='Share basic cluster information (size, version)'),
170 Option(name='channel_ident',
171 type='bool',
172 default=False,
173 desc='Share a user-provided description and/or contact email for the cluster'),
174 Option(name='channel_crash',
175 type='bool',
176 default=True,
177 desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
178 Option(name='channel_device',
179 type='bool',
180 default=True,
181 desc=('Share device health metrics '
182 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
183 Option(name='channel_perf',
184 type='bool',
185 default=False,
186 desc='Share various performance metrics of a cluster'),
11fdf7f2
TL
187 ]
188
189 @property
20effc67 190 def config_keys(self) -> Dict[str, OptionValue]:
11fdf7f2
TL
191 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
192
20effc67 193 def __init__(self, *args: Any, **kwargs: Any) -> None:
11fdf7f2
TL
194 super(Module, self).__init__(*args, **kwargs)
195 self.event = Event()
196 self.run = False
20effc67
TL
197 self.db_collection: Optional[List[str]] = None
198 self.last_opted_in_ceph_version: Optional[int] = None
199 self.last_opted_out_ceph_version: Optional[int] = None
200 self.last_upload: Optional[int] = None
201 self.last_report: Dict[str, Any] = dict()
202 self.report_id: Optional[str] = None
203 self.salt: Optional[str] = None
204 self.get_report_lock = Lock()
205 self.config_update_module_option()
206 # for mypy which does not run the code
207 if TYPE_CHECKING:
208 self.url = ''
209 self.device_url = ''
210 self.enabled = False
211 self.last_opt_revision = 0
212 self.leaderboard = ''
213 self.interval = 0
214 self.proxy = ''
215 self.channel_basic = True
216 self.channel_ident = False
217 self.channel_crash = True
218 self.channel_device = True
219 self.channel_perf = False
220 self.db_collection = ['basic_base', 'device_base']
221 self.last_opted_in_ceph_version = 17
222 self.last_opted_out_ceph_version = 0
223
224 def config_update_module_option(self) -> None:
11fdf7f2
TL
225 for opt in self.MODULE_OPTIONS:
226 setattr(self,
227 opt['name'],
228 self.get_module_option(opt['name']))
229 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
20effc67
TL
230
231 def config_notify(self) -> None:
232 self.config_update_module_option()
eafe8130
TL
233 # wake up serve() thread
234 self.event.set()
11fdf7f2 235
20effc67
TL
236 def load(self) -> None:
237 last_upload = self.get_store('last_upload', None)
238 if last_upload is None:
239 self.last_upload = None
240 else:
241 self.last_upload = int(last_upload)
11fdf7f2 242
20effc67
TL
243 report_id = self.get_store('report_id', None)
244 if report_id is None:
11fdf7f2
TL
245 self.report_id = str(uuid.uuid4())
246 self.set_store('report_id', self.report_id)
20effc67
TL
247 else:
248 self.report_id = report_id
11fdf7f2 249
20effc67
TL
250 salt = self.get_store('salt', None)
251 if salt is None:
eafe8130
TL
252 self.salt = str(uuid.uuid4())
253 self.set_store('salt', self.salt)
20effc67
TL
254 else:
255 self.salt = salt
256
257 self.init_collection()
258
259 last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
260 if last_opted_in_ceph_version is None:
261 self.last_opted_in_ceph_version = None
262 else:
263 self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
264
265 last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
266 if last_opted_out_ceph_version is None:
267 self.last_opted_out_ceph_version = None
268 else:
269 self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
eafe8130 270
20effc67
TL
271 def gather_osd_metadata(self,
272 osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
11fdf7f2
TL
273 keys = ["osd_objectstore", "rotational"]
274 keys += self.metadata_keys
275
20effc67 276 metadata: Dict[str, Dict[str, int]] = dict()
11fdf7f2
TL
277 for key in keys:
278 metadata[key] = defaultdict(int)
279
280 for osd in osd_map['osds']:
20effc67 281 res = self.get_metadata('osd', str(osd['osd']))
92f5a8d4
TL
282 if res is None:
283 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
284 continue
20effc67 285 for k, v in res.items():
11fdf7f2
TL
286 if k not in keys:
287 continue
288
289 metadata[k][v] += 1
290
291 return metadata
292
20effc67
TL
293 def gather_mon_metadata(self,
294 mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
11fdf7f2
TL
295 keys = list()
296 keys += self.metadata_keys
297
20effc67 298 metadata: Dict[str, Dict[str, int]] = dict()
11fdf7f2
TL
299 for key in keys:
300 metadata[key] = defaultdict(int)
301
302 for mon in mon_map['mons']:
20effc67 303 res = self.get_metadata('mon', mon['name'])
92f5a8d4
TL
304 if res is None:
305 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
306 continue
20effc67
TL
307 for k, v in res.items():
308 if k not in keys:
309 continue
310
311 metadata[k][v] += 1
312
313 return metadata
314
315 def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
316 metadata: Dict[str, Dict[str, int]] = dict()
317
318 res = self.get('mds_metadata') # metadata of *all* mds daemons
319 if res is None or not res:
320 self.log.debug('Could not get metadata for mds daemons')
321 return metadata
322
323 keys = list()
324 keys += self.metadata_keys
325
326 for key in keys:
327 metadata[key] = defaultdict(int)
328
329 for mds in res.values():
330 for k, v in mds.items():
11fdf7f2
TL
331 if k not in keys:
332 continue
333
334 metadata[k][v] += 1
335
336 return metadata
337
20effc67
TL
338 def gather_crush_info(self) -> Dict[str, Union[int,
339 bool,
340 List[int],
341 Dict[str, int],
342 Dict[int, int]]]:
eafe8130
TL
343 osdmap = self.get_osdmap()
344 crush_raw = osdmap.get_crush()
345 crush = crush_raw.dump()
346
20effc67
TL
347 BucketKeyT = TypeVar('BucketKeyT', int, str)
348
349 def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
eafe8130
TL
350 if k in d:
351 d[k] += 1
352 else:
353 d[k] = 1
354
20effc67 355 device_classes: Dict[str, int] = {}
eafe8130
TL
356 for dev in crush['devices']:
357 inc(device_classes, dev.get('class', ''))
358
20effc67
TL
359 bucket_algs: Dict[str, int] = {}
360 bucket_types: Dict[str, int] = {}
361 bucket_sizes: Dict[int, int] = {}
eafe8130
TL
362 for bucket in crush['buckets']:
363 if '~' in bucket['name']: # ignore shadow buckets
364 continue
365 inc(bucket_algs, bucket['alg'])
366 inc(bucket_types, bucket['type_id'])
367 inc(bucket_sizes, len(bucket['items']))
368
369 return {
370 'num_devices': len(crush['devices']),
371 'num_types': len(crush['types']),
372 'num_buckets': len(crush['buckets']),
373 'num_rules': len(crush['rules']),
374 'device_classes': list(device_classes.values()),
375 'tunables': crush['tunables'],
376 'compat_weight_set': '-1' in crush['choose_args'],
377 'num_weight_sets': len(crush['choose_args']),
378 'bucket_algs': bucket_algs,
379 'bucket_sizes': bucket_sizes,
380 'bucket_types': bucket_types,
381 }
382
20effc67 383 def gather_configs(self) -> Dict[str, List[str]]:
eafe8130
TL
384 # cluster config options
385 cluster = set()
386 r, outb, outs = self.mon_command({
387 'prefix': 'config dump',
388 'format': 'json'
20effc67 389 })
eafe8130
TL
390 if r != 0:
391 return {}
392 try:
393 dump = json.loads(outb)
394 except json.decoder.JSONDecodeError:
395 return {}
396 for opt in dump:
397 name = opt.get('name')
398 if name:
399 cluster.add(name)
400 # daemon-reported options (which may include ceph.conf)
401 active = set()
20effc67 402 ls = self.get("modified_config_options")
eafe8130
TL
403 for opt in ls.get('options', {}):
404 active.add(opt)
405 return {
406 'cluster_changed': sorted(list(cluster)),
407 'active_changed': sorted(list(active)),
408 }
409
20effc67
TL
410 def get_heap_stats(self) -> Dict[str, dict]:
411 # Initialize result dict
412 result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
413
414 # Get list of osd ids from the metadata
415 osd_metadata = self.get('osd_metadata')
416
417 # Grab output from the "osd.x heap stats" command
418 for osd_id in osd_metadata:
419 cmd_dict = {
420 'prefix': 'heap',
421 'heapcmd': 'stats',
422 'id': str(osd_id),
423 }
424 r, outb, outs = self.osd_command(cmd_dict)
425 if r != 0:
426 self.log.debug("Invalid command dictionary.")
427 continue
428 else:
429 if 'tcmalloc heap stats' in outs:
430 values = [int(i) for i in outs.split() if i.isdigit()]
431 # `categories` must be ordered this way for the correct output to be parsed
432 categories = ['use_by_application',
433 'page_heap_freelist',
434 'central_cache_freelist',
435 'transfer_cache_freelist',
436 'thread_cache_freelists',
437 'malloc_metadata',
438 'actual_memory_used',
439 'released_to_os',
440 'virtual_address_space_used',
441 'spans_in_use',
442 'thread_heaps_in_use',
443 'tcmalloc_page_size']
444 if len(values) != len(categories):
445 self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
446 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
447 continue
448 osd = 'osd.' + str(osd_id)
449 result[osd] = dict(zip(categories, values))
450 else:
451 self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
452 continue
453
454 return result
455
456 def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
457 # Initialize result dict
458 result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
459
460 # Get list of osd ids from the metadata
461 osd_metadata = self.get('osd_metadata')
462
463 # Grab output from the "osd.x dump_mempools" command
464 for osd_id in osd_metadata:
465 cmd_dict = {
466 'prefix': 'dump_mempools',
467 'id': str(osd_id),
468 'format': 'json'
469 }
470 r, outb, outs = self.osd_command(cmd_dict)
471 if r != 0:
472 self.log.debug("Invalid command dictionary.")
473 continue
474 else:
475 try:
476 # This is where the mempool will land.
477 dump = json.loads(outb)
478 if mode == 'separated':
479 result["osd." + str(osd_id)] = dump['mempool']['by_pool']
480 elif mode == 'aggregated':
481 for mem_type in dump['mempool']['by_pool']:
482 result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
483 result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
484 else:
485 self.log.debug("Incorrect mode specified in get_mempool")
486 except (json.decoder.JSONDecodeError, KeyError) as e:
487 self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
488 continue
489
490 return result
491
492 def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
493 # Initialize result dict
494 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
495 lambda: defaultdict(
496 lambda: defaultdict(
497 lambda: defaultdict(
498 lambda: defaultdict(int))))))
499
500 # Get list of osd ids from the metadata
501 osd_metadata = self.get('osd_metadata')
502
503 # Grab output from the "osd.x perf histogram dump" command
504 for osd_id in osd_metadata:
505 cmd_dict = {
506 'prefix': 'perf histogram dump',
507 'id': str(osd_id),
508 'format': 'json'
509 }
510 r, outb, outs = self.osd_command(cmd_dict)
511 # Check for invalid calls
512 if r != 0:
513 self.log.debug("Invalid command dictionary.")
514 continue
515 else:
516 try:
517 # This is where the histograms will land if there are any.
518 dump = json.loads(outb)
519
520 for histogram in dump['osd']:
521 # Log axis information. There are two axes, each represented
522 # as a dictionary. Both dictionaries are contained inside a
523 # list called 'axes'.
524 axes = []
525 for axis in dump['osd'][histogram]['axes']:
526
527 # This is the dict that contains information for an individual
528 # axis. It will be appended to the 'axes' list at the end.
529 axis_dict: Dict[str, Any] = defaultdict()
530
531 # Collecting information for buckets, min, name, etc.
532 axis_dict['buckets'] = axis['buckets']
533 axis_dict['min'] = axis['min']
534 axis_dict['name'] = axis['name']
535 axis_dict['quant_size'] = axis['quant_size']
536 axis_dict['scale_type'] = axis['scale_type']
537
538 # Collecting ranges; placing them in lists to
539 # improve readability later on.
540 ranges = []
541 for _range in axis['ranges']:
542 _max, _min = None, None
543 if 'max' in _range:
544 _max = _range['max']
545 if 'min' in _range:
546 _min = _range['min']
547 ranges.append([_min, _max])
548 axis_dict['ranges'] = ranges
549
550 # Now that 'axis_dict' contains all the appropriate
551 # information for the current axis, append it to the 'axes' list.
552 # There will end up being two axes in the 'axes' list, since the
553 # histograms are 2D.
554 axes.append(axis_dict)
555
556 # Add the 'axes' list, containing both axes, to result.
557 # At this point, you will see that the name of the key is the string
558 # form of our axes list (str(axes)). This is there so that histograms
559 # with different axis configs will not be combined.
560 # These key names are later dropped when only the values are returned.
561 result[str(axes)][histogram]['axes'] = axes
562
563 # Collect current values and make sure they are in
564 # integer form.
565 values = []
566 for value_list in dump['osd'][histogram]['values']:
567 values.append([int(v) for v in value_list])
568
569 if mode == 'separated':
570 if 'osds' not in result[str(axes)][histogram]:
571 result[str(axes)][histogram]['osds'] = []
572 result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
573
574 elif mode == 'aggregated':
575 # Aggregate values. If 'values' have already been initialized,
576 # we can safely add.
577 if 'values' in result[str(axes)][histogram]:
578 for i in range (0, len(values)):
579 for j in range (0, len(values[i])):
580 values[i][j] += result[str(axes)][histogram]['values'][i][j]
581
582 # Add the values to result.
583 result[str(axes)][histogram]['values'] = values
584
585 # Update num_combined_osds
586 if 'num_combined_osds' not in result[str(axes)][histogram]:
587 result[str(axes)][histogram]['num_combined_osds'] = 1
588 else:
589 result[str(axes)][histogram]['num_combined_osds'] += 1
590 else:
591 self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
592 return list()
593
594 # Sometimes, json errors occur if you give it an empty string.
595 # I am also putting in a catch for a KeyError since it could
596 # happen where the code is assuming that a key exists in the
597 # schema when it doesn't. In either case, we'll handle that
598 # by continuing and collecting what we can from other osds.
599 except (json.decoder.JSONDecodeError, KeyError) as e:
600 self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
601 continue
602
603 return list(result.values())
604
605 def get_io_rate(self) -> dict:
606 return self.get('io_rate')
607
608 def get_stats_per_pool(self) -> dict:
609 result = self.get('pg_dump')['pool_stats']
610
611 # collect application metadata from osd_map
612 osd_map = self.get('osd_map')
613 application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
614
615 # add application to each pool from pg_dump
616 for pool in result:
617 pool['application'] = []
618 # Only include default applications
619 for application in application_metadata[pool['poolid']]:
620 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
621 pool['application'].append(application)
622
623 return result
624
625 def get_stats_per_pg(self) -> dict:
626 return self.get('pg_dump')['pg_stats']
627
628 def get_rocksdb_stats(self) -> Dict[str, str]:
629 # Initalizers
630 result: Dict[str, str] = defaultdict()
631 version = self.get_rocksdb_version()
632
633 # Update result
634 result['version'] = version
635
636 return result
637
638 def gather_crashinfo(self) -> List[Dict[str, str]]:
639 crashlist: List[Dict[str, str]] = list()
eafe8130 640 errno, crashids, err = self.remote('crash', 'ls')
11fdf7f2 641 if errno:
20effc67 642 return crashlist
11fdf7f2 643 for crashid in crashids.split():
20effc67 644 errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
11fdf7f2
TL
645 if errno:
646 continue
647 c = json.loads(crashinfo)
20effc67
TL
648
649 # redact hostname
11fdf7f2 650 del c['utsname_hostname']
20effc67 651
92f5a8d4
TL
652 # entity_name might have more than one '.', beware
653 (etype, eid) = c.get('entity_name', '').split('.', 1)
eafe8130 654 m = hashlib.sha1()
20effc67 655 assert self.salt
eafe8130
TL
656 m.update(self.salt.encode('utf-8'))
657 m.update(eid.encode('utf-8'))
658 m.update(self.salt.encode('utf-8'))
659 c['entity_name'] = etype + '.' + m.hexdigest()
20effc67
TL
660
661 # redact final line of python tracebacks, as the exception
662 # payload may contain identifying information
663 if 'mgr_module' in c and 'backtrace' in c:
664 # backtrace might be empty
665 if len(c['backtrace']) > 0:
666 c['backtrace'][-1] = '<redacted>'
667
11fdf7f2
TL
668 crashlist.append(c)
669 return crashlist
670
20effc67
TL
671 def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
672 # Extract perf counter data with get_all_perf_counters(), a method
673 # from mgr/mgr_module.py. This method returns a nested dictionary that
674 # looks a lot like perf schema, except with some additional fields.
675 #
676 # Example of output, a snapshot of a mon daemon:
677 # "mon.b": {
678 # "bluestore.kv_flush_lat": {
679 # "count": 2431,
680 # "description": "Average kv_thread flush latency",
681 # "nick": "fl_l",
682 # "priority": 8,
683 # "type": 5,
684 # "units": 1,
685 # "value": 88814109
686 # },
687 # },
688 all_perf_counters = self.get_all_perf_counters()
689
690 # Initialize 'result' dict
691 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
692 lambda: defaultdict(lambda: defaultdict(int))))
693
694 for daemon in all_perf_counters:
695
696 # Calculate num combined daemon types if in aggregated mode
697 if mode == 'aggregated':
698 daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
699 if 'num_combined_daemons' not in result[daemon_type]:
700 result[daemon_type]['num_combined_daemons'] = 1
701 else:
702 result[daemon_type]['num_combined_daemons'] += 1
703
704 for collection in all_perf_counters[daemon]:
705 # Split the collection to avoid redundancy in final report; i.e.:
706 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
707 # bluestore: kv_flush_lat, kv_final_lat
708 col_0, col_1 = collection.split('.')
709
710 # Debug log for empty keys. This initially was a problem for prioritycache
711 # perf counters, where the col_0 was empty for certain mon counters:
712 #
713 # "mon.a": { instead of "mon.a": {
714 # "": { "prioritycache": {
715 # "cache_bytes": {...}, "cache_bytes": {...},
716 #
717 # This log is here to detect any future instances of a similar issue.
718 if (daemon == "") or (col_0 == "") or (col_1 == ""):
719 self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
720
721 if mode == 'separated':
722 # Add value to result
723 result[daemon][col_0][col_1]['value'] = \
724 all_perf_counters[daemon][collection]['value']
725
726 # Check that 'count' exists, as not all counters have a count field.
727 if 'count' in all_perf_counters[daemon][collection]:
728 result[daemon][col_0][col_1]['count'] = \
729 all_perf_counters[daemon][collection]['count']
730 elif mode == 'aggregated':
731 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
732 # has a uniquely-named collection that starts off identically (i.e.
733 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
734 # This bit of code combines these unique counters all under one rgw instance.
735 # Without this check, the schema would remain separeted out in the final report.
736 if col_0[0:11] == "objecter-0x":
737 col_0 = "objecter-0x"
738
739 # Check that the value can be incremented. In some cases,
740 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
741 # In those cases, the value is a dictionary, and not a number.
742 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
743 if isinstance(all_perf_counters[daemon][collection]['value'], numbers.Number):
744 result[daemon_type][col_0][col_1]['value'] += \
745 all_perf_counters[daemon][collection]['value']
746
747 # Check that 'count' exists, as not all counters have a count field.
748 if 'count' in all_perf_counters[daemon][collection]:
749 result[daemon_type][col_0][col_1]['count'] += \
750 all_perf_counters[daemon][collection]['count']
751 else:
752 self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
753 return {}
754
755 return result
756
757 def get_active_channels(self) -> List[str]:
eafe8130
TL
758 r = []
759 if self.channel_basic:
760 r.append('basic')
761 if self.channel_crash:
762 r.append('crash')
763 if self.channel_device:
764 r.append('device')
f67539c2
TL
765 if self.channel_ident:
766 r.append('ident')
20effc67
TL
767 if self.channel_perf:
768 r.append('perf')
eafe8130
TL
769 return r
770
20effc67 771 def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
eafe8130
TL
772 try:
773 time_format = self.remote('devicehealth', 'get_time_format')
20effc67
TL
774 except Exception as e:
775 self.log.debug('Unable to format time: {}'.format(e))
776 return {}
eafe8130
TL
777 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
778 min_sample = cutoff.strftime(time_format)
779
780 devices = self.get('devices')['devices']
20effc67
TL
781 if not devices:
782 self.log.debug('Unable to get device info from the mgr.')
783 return {}
eafe8130 784
20effc67
TL
785 # anon-host-id -> anon-devid -> { timestamp -> record }
786 res: Dict[str, Dict[str, Dict[str, str]]] = {}
eafe8130
TL
787 for d in devices:
788 devid = d['devid']
789 try:
790 # this is a map of stamp -> {device info}
791 m = self.remote('devicehealth', 'get_recent_device_metrics',
792 devid, min_sample)
20effc67
TL
793 except Exception as e:
794 self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
eafe8130
TL
795 continue
796
797 # anonymize host id
798 try:
799 host = d['location'][0]['host']
20effc67
TL
800 except (KeyError, IndexError) as e:
801 self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e))
eafe8130
TL
802 continue
803 anon_host = self.get_store('host-id/%s' % host)
804 if not anon_host:
805 anon_host = str(uuid.uuid1())
806 self.set_store('host-id/%s' % host, anon_host)
f91f0fd5 807 serial = None
eafe8130
TL
808 for dev, rep in m.items():
809 rep['host_id'] = anon_host
f91f0fd5
TL
810 if serial is None and 'serial_number' in rep:
811 serial = rep['serial_number']
eafe8130
TL
812
813 # anonymize device id
eafe8130
TL
814 anon_devid = self.get_store('devid-id/%s' % devid)
815 if not anon_devid:
f91f0fd5
TL
816 # ideally devid is 'vendor_model_serial',
817 # but can also be 'model_serial', 'serial'
818 if '_' in devid:
819 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
820 else:
821 anon_devid = str(uuid.uuid1())
eafe8130
TL
822 self.set_store('devid-id/%s' % devid, anon_devid)
823 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
824 host, anon_host))
825
826 # anonymize the smartctl report itself
f91f0fd5
TL
827 if serial:
828 m_str = json.dumps(m)
829 m = json.loads(m_str.replace(serial, 'deleted'))
eafe8130
TL
830
831 if anon_host not in res:
832 res[anon_host] = {}
833 res[anon_host][anon_devid] = m
834 return res
835
20effc67 836 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
eafe8130 837 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
eafe8130
TL
838 if data:
839 return data[-1][1]
840 else:
841 return 0
842
20effc67 843 def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
eafe8130
TL
844 if not channels:
845 channels = self.get_active_channels()
11fdf7f2 846 report = {
522d829b 847 'leaderboard': self.leaderboard,
11fdf7f2 848 'report_version': 1,
eafe8130
TL
849 'report_timestamp': datetime.utcnow().isoformat(),
850 'report_id': self.report_id,
851 'channels': channels,
852 'channels_available': ALL_CHANNELS,
853 'license': LICENSE,
20effc67
TL
854 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
855 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
11fdf7f2
TL
856 }
857
eafe8130 858 if 'ident' in channels:
eafe8130
TL
859 for option in ['description', 'contact', 'organization']:
860 report[option] = getattr(self, option)
861
862 if 'basic' in channels:
863 mon_map = self.get('mon_map')
864 osd_map = self.get('osd_map')
865 service_map = self.get('service_map')
866 fs_map = self.get('fs_map')
867 df = self.get('df')
20effc67 868 df_pools = {pool['id']: pool for pool in df['pools']}
eafe8130 869
9f95a23c 870 report['created'] = mon_map['created']
eafe8130
TL
871
872 # mons
873 v1_mons = 0
874 v2_mons = 0
875 ipv4_mons = 0
876 ipv6_mons = 0
877 for mon in mon_map['mons']:
878 for a in mon['public_addrs']['addrvec']:
879 if a['type'] == 'v2':
880 v2_mons += 1
881 elif a['type'] == 'v1':
882 v1_mons += 1
883 if a['addr'].startswith('['):
884 ipv6_mons += 1
885 else:
886 ipv4_mons += 1
887 report['mon'] = {
888 'count': len(mon_map['mons']),
889 'features': mon_map['features'],
890 'min_mon_release': mon_map['min_mon_release'],
891 'v1_addr_mons': v1_mons,
892 'v2_addr_mons': v2_mons,
893 'ipv4_addr_mons': ipv4_mons,
894 'ipv6_addr_mons': ipv6_mons,
895 }
896
897 report['config'] = self.gather_configs()
898
899 # pools
20effc67
TL
900
901 rbd_num_pools = 0
902 rbd_num_images_by_pool = []
903 rbd_mirroring_by_pool = []
eafe8130
TL
904 num_pg = 0
905 report['pools'] = list()
906 for pool in osd_map['pools']:
907 num_pg += pool['pg_num']
908 ec_profile = {}
909 if pool['erasure_code_profile']:
910 orig = osd_map['erasure_code_profiles'].get(
911 pool['erasure_code_profile'], {})
912 ec_profile = {
913 k: orig[k] for k in orig.keys()
914 if k in ['k', 'm', 'plugin', 'technique',
915 'crush-failure-domain', 'l']
916 }
20effc67 917 pool_data = {
eafe8130 918 'pool': pool['pool'],
eafe8130
TL
919 'pg_num': pool['pg_num'],
920 'pgp_num': pool['pg_placement_num'],
921 'size': pool['size'],
922 'min_size': pool['min_size'],
923 'pg_autoscale_mode': pool['pg_autoscale_mode'],
924 'target_max_bytes': pool['target_max_bytes'],
925 'target_max_objects': pool['target_max_objects'],
926 'type': ['', 'replicated', '', 'erasure'][pool['type']],
927 'erasure_code_profile': ec_profile,
928 'cache_mode': pool['cache_mode'],
929 }
20effc67
TL
930
931 # basic_pool_usage collection
932 if self.is_enabled_collection(Collection.basic_pool_usage):
933 pool_data['application'] = []
934 for application in pool['application_metadata']:
935 # Only include default applications
936 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
937 pool_data['application'].append(application)
938 pool_stats = df_pools[pool['pool']]['stats']
939 pool_data['stats'] = { # filter out kb_used
940 'avail_raw': pool_stats['avail_raw'],
941 'bytes_used': pool_stats['bytes_used'],
942 'compress_bytes_used': pool_stats['compress_bytes_used'],
943 'compress_under_bytes': pool_stats['compress_under_bytes'],
944 'data_bytes_used': pool_stats['data_bytes_used'],
945 'dirty': pool_stats['dirty'],
946 'max_avail': pool_stats['max_avail'],
947 'objects': pool_stats['objects'],
948 'omap_bytes_used': pool_stats['omap_bytes_used'],
949 'percent_used': pool_stats['percent_used'],
950 'quota_bytes': pool_stats['quota_bytes'],
951 'quota_objects': pool_stats['quota_objects'],
952 'rd': pool_stats['rd'],
953 'rd_bytes': pool_stats['rd_bytes'],
954 'stored': pool_stats['stored'],
955 'stored_data': pool_stats['stored_data'],
956 'stored_omap': pool_stats['stored_omap'],
957 'stored_raw': pool_stats['stored_raw'],
958 'wr': pool_stats['wr'],
959 'wr_bytes': pool_stats['wr_bytes']
960 }
961
962 cast(List[Dict[str, Any]], report['pools']).append(pool_data)
eafe8130 963 if 'rbd' in pool['application_metadata']:
20effc67 964 rbd_num_pools += 1
eafe8130 965 ioctx = self.rados.open_ioctx(pool['pool_name'])
20effc67 966 rbd_num_images_by_pool.append(
eafe8130 967 sum(1 for _ in rbd.RBD().list2(ioctx)))
20effc67 968 rbd_mirroring_by_pool.append(
eafe8130 969 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
20effc67
TL
970 report['rbd'] = {
971 'num_pools': rbd_num_pools,
972 'num_images_by_pool': rbd_num_images_by_pool,
973 'mirroring_by_pool': rbd_mirroring_by_pool}
eafe8130
TL
974
975 # osds
976 cluster_network = False
977 for osd in osd_map['osds']:
978 if osd['up'] and not cluster_network:
979 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
92f5a8d4 980 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
eafe8130
TL
981 if front_ip != back_ip:
982 cluster_network = True
983 report['osd'] = {
984 'count': len(osd_map['osds']),
985 'require_osd_release': osd_map['require_osd_release'],
986 'require_min_compat_client': osd_map['require_min_compat_client'],
987 'cluster_network': cluster_network,
988 }
989
990 # crush
991 report['crush'] = self.gather_crush_info()
992
993 # cephfs
994 report['fs'] = {
995 'count': len(fs_map['filesystems']),
996 'feature_flags': fs_map['feature_flags'],
997 'num_standby_mds': len(fs_map['standbys']),
998 'filesystems': [],
999 }
1000 num_mds = len(fs_map['standbys'])
1001 for fsm in fs_map['filesystems']:
1002 fs = fsm['mdsmap']
1003 num_sessions = 0
1004 cached_ino = 0
1005 cached_dn = 0
1006 cached_cap = 0
1007 subtrees = 0
1008 rfiles = 0
1009 rbytes = 0
1010 rsnaps = 0
1011 for gid, mds in fs['info'].items():
1012 num_sessions += self.get_latest('mds', mds['name'],
1013 'mds_sessions.session_count')
1014 cached_ino += self.get_latest('mds', mds['name'],
1015 'mds_mem.ino')
1016 cached_dn += self.get_latest('mds', mds['name'],
1017 'mds_mem.dn')
1018 cached_cap += self.get_latest('mds', mds['name'],
1019 'mds_mem.cap')
1020 subtrees += self.get_latest('mds', mds['name'],
1021 'mds.subtrees')
1022 if mds['rank'] == 0:
1023 rfiles = self.get_latest('mds', mds['name'],
1024 'mds.root_rfiles')
1025 rbytes = self.get_latest('mds', mds['name'],
1026 'mds.root_rbytes')
1027 rsnaps = self.get_latest('mds', mds['name'],
1028 'mds.root_rsnaps')
20effc67 1029 report['fs']['filesystems'].append({ # type: ignore
eafe8130
TL
1030 'max_mds': fs['max_mds'],
1031 'ever_allowed_features': fs['ever_allowed_features'],
1032 'explicitly_allowed_features': fs['explicitly_allowed_features'],
1033 'num_in': len(fs['in']),
1034 'num_up': len(fs['up']),
1035 'num_standby_replay': len(
1036 [mds for gid, mds in fs['info'].items()
1037 if mds['state'] == 'up:standby-replay']),
1038 'num_mds': len(fs['info']),
1039 'num_sessions': num_sessions,
1040 'cached_inos': cached_ino,
1041 'cached_dns': cached_dn,
1042 'cached_caps': cached_cap,
1043 'cached_subtrees': subtrees,
1044 'balancer_enabled': len(fs['balancer']) > 0,
1045 'num_data_pools': len(fs['data_pools']),
1046 'standby_count_wanted': fs['standby_count_wanted'],
1047 'approx_ctime': fs['created'][0:7],
1048 'files': rfiles,
1049 'bytes': rbytes,
1050 'snaps': rsnaps,
1051 })
1052 num_mds += len(fs['info'])
20effc67 1053 report['fs']['total_num_mds'] = num_mds # type: ignore
eafe8130
TL
1054
1055 # daemons
20effc67
TL
1056 report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
1057 mon=self.gather_mon_metadata(mon_map))
1058
1059 if self.is_enabled_collection(Collection.basic_mds_metadata):
1060 report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
eafe8130
TL
1061
1062 # host counts
1063 servers = self.list_servers()
1064 self.log.debug('servers %s' % servers)
20effc67 1065 hosts = {
eafe8130
TL
1066 'num': len([h for h in servers if h['hostname']]),
1067 }
1068 for t in ['mon', 'mds', 'osd', 'mgr']:
20effc67
TL
1069 nr_services = sum(1 for host in servers if
1070 any(service for service in cast(List[ServiceInfoT],
1071 host['services'])
1072 if service['type'] == t))
1073 hosts['num_with_' + t] = nr_services
1074 report['hosts'] = hosts
eafe8130
TL
1075
1076 report['usage'] = {
1077 'pools': len(df['pools']),
92f5a8d4 1078 'pg_num': num_pg,
eafe8130
TL
1079 'total_used_bytes': df['stats']['total_used_bytes'],
1080 'total_bytes': df['stats']['total_bytes'],
1081 'total_avail_bytes': df['stats']['total_avail_bytes']
1082 }
20effc67
TL
1083 # basic_usage_by_class collection
1084 if self.is_enabled_collection(Collection.basic_usage_by_class):
1085 report['usage']['stats_by_class'] = {} # type: ignore
1086 for device_class in df['stats_by_class']:
1087 if device_class in ['hdd', 'ssd', 'nvme']:
1088 report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
1089
1090 services: DefaultDict[str, int] = defaultdict(int)
eafe8130 1091 for key, value in service_map['services'].items():
20effc67 1092 services[key] += 1
eafe8130 1093 if key == 'rgw':
20effc67 1094 rgw = {}
eafe8130 1095 zones = set()
eafe8130
TL
1096 zonegroups = set()
1097 frontends = set()
20effc67 1098 count = 0
eafe8130 1099 d = value.get('daemons', dict())
20effc67 1100 for k, v in d.items():
eafe8130 1101 if k == 'summary' and v:
20effc67 1102 rgw[k] = v
eafe8130 1103 elif isinstance(v, dict) and 'metadata' in v:
20effc67 1104 count += 1
eafe8130
TL
1105 zones.add(v['metadata']['zone_id'])
1106 zonegroups.add(v['metadata']['zonegroup_id'])
1107 frontends.add(v['metadata']['frontend_type#0'])
1108
1109 # we could actually iterate over all the keys of
1110 # the dict and check for how many frontends there
1111 # are, but it is unlikely that one would be running
1112 # more than 2 supported ones
1113 f2 = v['metadata'].get('frontend_type#1', None)
1114 if f2:
1115 frontends.add(f2)
1116
20effc67
TL
1117 rgw['count'] = count
1118 rgw['zones'] = len(zones)
1119 rgw['zonegroups'] = len(zonegroups)
1120 rgw['frontends'] = list(frontends) # sets aren't json-serializable
1121 report['rgw'] = rgw
1122 report['services'] = services
eafe8130
TL
1123
1124 try:
1125 report['balancer'] = self.remote('balancer', 'gather_telemetry')
1126 except ImportError:
1127 report['balancer'] = {
1128 'active': False
11fdf7f2 1129 }
11fdf7f2 1130
eafe8130
TL
1131 if 'crash' in channels:
1132 report['crashes'] = self.gather_crashinfo()
11fdf7f2 1133
20effc67
TL
1134 if 'perf' in channels:
1135 if self.is_enabled_collection(Collection.perf_perf):
1136 report['perf_counters'] = self.gather_perf_counters('separated')
1137 report['stats_per_pool'] = self.get_stats_per_pool()
1138 report['stats_per_pg'] = self.get_stats_per_pg()
1139 report['io_rate'] = self.get_io_rate()
1140 report['osd_perf_histograms'] = self.get_osd_histograms('separated')
1141 report['mempool'] = self.get_mempool('separated')
1142 report['heap_stats'] = self.get_heap_stats()
1143 report['rocksdb_stats'] = self.get_rocksdb_stats()
1144
eafe8130
TL
1145 # NOTE: We do not include the 'device' channel in this report; it is
1146 # sent to a different endpoint.
11fdf7f2
TL
1147
1148 return report
1149
20effc67 1150 def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
9f95a23c
TL
1151 self.log.info('Sending %s to: %s' % (what, url))
1152 proxies = dict()
1153 if self.proxy:
1154 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
1155 proxies['http'] = self.proxy
1156 proxies['https'] = self.proxy
1157 try:
1158 resp = requests.put(url=url, json=report, proxies=proxies)
1159 resp.raise_for_status()
1160 except Exception as e:
1161 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
1162 self.log.error(fail_reason)
1163 return fail_reason
1164 return None
1165
20effc67
TL
1166 class EndPoint(enum.Enum):
1167 ceph = 'ceph'
1168 device = 'device'
1169
1170 def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
1171 '''
1172 Find collections that are available in the module, but are not in the db
1173 '''
1174 if self.db_collection is None:
1175 return None
1176
1177 if not channels:
1178 channels = ALL_CHANNELS
1179 else:
1180 for ch in channels:
1181 if ch not in ALL_CHANNELS:
1182 self.log.debug(f"invalid channel name: {ch}")
1183 return None
1184
1185 new_collection : List[Collection] = []
1186
1187 for c in MODULE_COLLECTION:
1188 if c['name'].name not in self.db_collection:
1189 if c['channel'] in channels:
1190 new_collection.append(c['name'])
1191
1192 return new_collection
1193
1194 def is_major_upgrade(self) -> bool:
1195 '''
1196 Returns True only if the user last opted-in to an older major
1197 '''
1198 if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
1199 # we do not know what Ceph version was when the user last opted-in,
1200 # thus we do not wish to nag in case of a major upgrade
1201 return False
1202
1203 mon_map = self.get('mon_map')
1204 mon_min = mon_map.get("min_mon_release", 0)
1205
1206 if mon_min - self.last_opted_in_ceph_version > 0:
1207 self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1208 return True
1209
1210 return False
1211
1212 def is_opted_in(self) -> bool:
1213 # If len is 0 it means that the user is either opted-out (never
1214 # opted-in, or invoked `telemetry off`), or they upgraded from a
1215 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1216 # regardless, hence is considered as opted-out
1217 if self.db_collection is None:
1218 return False
1219 return len(self.db_collection) > 0
1220
1221 def should_nag(self) -> bool:
1222 # Find delta between opted-in collections and module collections;
1223 # nag only if module has a collection which is not in db, and nag == True.
1224
1225 # We currently do not nag if the user is opted-out (or never opted-in).
1226 # If we wish to do this in the future, we need to have a tri-mode state
1227 # (opted in, opted out, no action yet), and it needs to be guarded by a
1228 # config option (so that nagging can be turned off via config).
1229 # We also need to add a last_opted_out_ceph_version variable, for the
1230 # major upgrade check.
1231
1232 # check if there are collections the user is not opt-in to
1233 # that we should nag about
1234 if self.db_collection is not None:
1235 for c in MODULE_COLLECTION:
1236 if c['name'].name not in self.db_collection:
1237 if c['nag'] == True:
1238 self.log.debug(f"The collection: {c['name']} is not reported")
1239 return True
1240
1241 # user might be opted-in to the most recent collection, or there is no
1242 # new collection which requires nagging about; thus nag in case it's a
1243 # major upgrade and there are new collections
1244 # (which their own nag == False):
1245 new_collections = False
1246 col_delta = self.collection_delta()
1247 if col_delta is not None and len(col_delta) > 0:
1248 new_collections = True
1249
1250 return self.is_major_upgrade() and new_collections
1251
1252 def init_collection(self) -> None:
1253 # We fetch from db the collections the user had already opted-in to.
1254 # During the transition the results will be empty, but the user might
1255 # be opted-in to an older version (e.g. revision = 3)
1256
1257 collection = self.get_store('collection')
1258
1259 if collection is not None:
1260 self.db_collection = json.loads(collection)
1261
1262 if self.db_collection is None:
1263 # happens once on upgrade
1264 if not self.enabled:
1265 # user is not opted-in
1266 self.set_store('collection', json.dumps([]))
1267 self.log.debug("user is not opted-in")
1268 else:
1269 # user is opted-in, verify the revision:
1270 if self.last_opt_revision == REVISION:
1271 self.log.debug(f"telemetry revision is {REVISION}")
1272 base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
1273 self.set_store('collection', json.dumps(base_collection))
1274 else:
1275 # user is opted-in to an older version, meaning they need
1276 # to re-opt in regardless
1277 self.set_store('collection', json.dumps([]))
1278 self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1279
1280 # reload collection after setting
1281 collection = self.get_store('collection')
1282 if collection is not None:
1283 self.db_collection = json.loads(collection)
1284 else:
1285 raise RuntimeError('collection is None after initial setting')
1286 else:
1287 # user has already upgraded
1288 self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
1289
1290 def is_enabled_collection(self, collection: Collection) -> bool:
1291 if self.db_collection is None:
1292 return False
1293 return collection.name in self.db_collection
1294
1295 def opt_in_all_collections(self) -> None:
1296 """
1297 Opt-in to all collections; Update db with the currently available collections in the module
1298 """
1299 if self.db_collection is None:
1300 raise RuntimeError('db_collection is None after initial setting')
1301
1302 for c in MODULE_COLLECTION:
1303 if c['name'].name not in self.db_collection:
1304 self.db_collection.append(c['name'])
1305
1306 self.set_store('collection', json.dumps(self.db_collection))
1307
1308 def send(self,
1309 report: Dict[str, Dict[str, str]],
1310 endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
eafe8130 1311 if not endpoint:
20effc67 1312 endpoint = [self.EndPoint.ceph, self.EndPoint.device]
eafe8130
TL
1313 failed = []
1314 success = []
eafe8130 1315 self.log.debug('Send endpoints %s' % endpoint)
eafe8130 1316 for e in endpoint:
20effc67 1317 if e == self.EndPoint.ceph:
9f95a23c
TL
1318 fail_reason = self._try_post('ceph report', self.url, report)
1319 if fail_reason:
1320 failed.append(fail_reason)
eafe8130
TL
1321 else:
1322 now = int(time.time())
1323 self.last_upload = now
1324 self.set_store('last_upload', str(now))
1325 success.append('Ceph report sent to {0}'.format(self.url))
1326 self.log.info('Sent report to {0}'.format(self.url))
20effc67 1327 elif e == self.EndPoint.device:
eafe8130 1328 if 'device' in self.get_active_channels():
eafe8130 1329 devices = self.gather_device_report()
20effc67
TL
1330 if devices:
1331 num_devs = 0
1332 num_hosts = 0
1333 for host, ls in devices.items():
1334 self.log.debug('host %s devices %s' % (host, ls))
1335 if not len(ls):
1336 continue
1337 fail_reason = self._try_post('devices', self.device_url,
1338 ls)
1339 if fail_reason:
1340 failed.append(fail_reason)
1341 else:
1342 num_devs += len(ls)
1343 num_hosts += 1
1344 if num_devs:
1345 success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1346 num_devs, num_hosts, len(devices)))
1347 else:
1348 fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
1349 failed.append(fail_reason)
1350 self.log.error(fail_reason)
eafe8130
TL
1351 if failed:
1352 return 1, '', '\n'.join(success + failed)
1353 return 0, '', '\n'.join(success)
11fdf7f2 1354
20effc67
TL
1355 def format_perf_histogram(self, report: Dict[str, Any]) -> None:
1356 # Formatting the perf histograms so they are human-readable. This will change the
1357 # ranges and values, which are currently in list form, into strings so that
1358 # they are displayed horizontally instead of vertically.
1359 try:
1360 # Formatting ranges and values in osd_perf_histograms
1361 mode = 'osd_perf_histograms'
1362 for config in report[mode]:
1363 for histogram in config:
1364 # Adjust ranges by converting lists into strings
1365 for axis in config[histogram]['axes']:
1366 for i in range(0, len(axis['ranges'])):
1367 axis['ranges'][i] = str(axis['ranges'][i])
1368
1369 for osd in config[histogram]['osds']:
1370 for i in range(0, len(osd['values'])):
1371 osd['values'][i] = str(osd['values'][i])
1372 except KeyError:
1373 # If the perf channel is not enabled, there should be a KeyError since
1374 # 'osd_perf_histograms' would not be present in the report. In that case,
1375 # the show function should pass as usual without trying to format the
1376 # histograms.
1377 pass
1378
1379 def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1380 '''
1381 Enable or disable a list of channels
1382 '''
1383 if not self.enabled:
1384 # telemetry should be on for channels to be toggled
1385 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1386 'Preview sample reports with `ceph telemetry preview`.'
1387 return 0, msg, ''
1388
1389 if channels is None:
1390 msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1391 return 0, msg, ''
1392
1393 state = action == 'enable'
1394 msg = ''
1395 for c in channels:
1396 if c not in ALL_CHANNELS:
1397 msg = f"{msg}{c} is not a valid channel name. "\
1398 f"Available channels: {ALL_CHANNELS}.\n"
1399 else:
1400 self.set_module_option(f"channel_{c}", state)
1401 setattr(self,
1402 f"channel_{c}",
1403 state)
1404 msg = f"{msg}channel_{c} is {action}d\n"
1405
1406 return 0, msg, ''
1407
1408 @CLIReadCommand('telemetry status')
1409 def status(self) -> Tuple[int, str, str]:
1410 '''
1411 Show current configuration
1412 '''
1413 r = {}
1414 for opt in self.MODULE_OPTIONS:
1415 r[opt['name']] = getattr(self, opt['name'])
1416 r['last_upload'] = (time.ctime(self.last_upload)
1417 if self.last_upload else self.last_upload)
1418 return 0, json.dumps(r, indent=4, sort_keys=True), ''
1419
1420 @CLIReadCommand('telemetry diff')
1421 def diff(self) -> Tuple[int, str, str]:
1422 '''
1423 Show the diff between opted-in collection and available collection
1424 '''
1425 diff = []
1426 keys = ['nag']
1427
1428 for c in MODULE_COLLECTION:
1429 if not self.is_enabled_collection(c['name']):
1430 diff.append({key: val for key, val in c.items() if key not in keys})
1431
1432 r = None
1433 if diff == []:
1434 r = "Telemetry is up to date"
1435 else:
1436 r = json.dumps(diff, indent=4, sort_keys=True)
1437
1438 return 0, r, ''
1439
1440 @CLICommand('telemetry on')
1441 def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
1442 '''
1443 Enable telemetry reports from this cluster
1444 '''
1445 if license != LICENSE:
1446 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1447To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
11fdf7f2 1448 else:
20effc67
TL
1449 self.set_module_option('enabled', True)
1450 self.enabled = True
1451 self.opt_in_all_collections()
11fdf7f2 1452
20effc67
TL
1453 # for major releases upgrade nagging
1454 mon_map = self.get('mon_map')
1455 mon_min = mon_map.get("min_mon_release", 0)
1456 self.set_store('last_opted_in_ceph_version', str(mon_min))
1457 self.last_opted_in_ceph_version = mon_min
1458
1459 msg = 'Telemetry is on.'
1460 disabled_channels = ''
1461 active_channels = self.get_active_channels()
1462 for c in ALL_CHANNELS:
1463 if c not in active_channels and c != 'ident':
1464 disabled_channels = f"{disabled_channels} {c}"
1465
1466 if len(disabled_channels) > 0:
1467 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
1468 f"`ceph telemetry enable channel{disabled_channels}`"
1469
1470 return 0, msg, ''
1471
1472 @CLICommand('telemetry off')
1473 def off(self) -> Tuple[int, str, str]:
1474 '''
1475 Disable telemetry reports from this cluster
1476 '''
1477 if not self.enabled:
1478 # telemetry is already off
1479 msg = 'Telemetry is currently not enabled, nothing to turn off. '\
1480 'Please consider opting-in with `ceph telemetry on`.\n' \
1481 'Preview sample reports with `ceph telemetry preview`.'
1482 return 0, msg, ''
e306af50 1483
e306af50 1484 self.set_module_option('enabled', False)
20effc67
TL
1485 self.enabled = False
1486 self.set_store('collection', json.dumps([]))
1487 self.db_collection = []
1488
1489 # we might need this info in the future, in case
1490 # of nagging when user is opted-out
1491 mon_map = self.get('mon_map')
1492 mon_min = mon_map.get("min_mon_release", 0)
1493 self.set_store('last_opted_out_ceph_version', str(mon_min))
1494 self.last_opted_out_ceph_version = mon_min
1495
1496 msg = 'Telemetry is now disabled.'
1497 return 0, msg, ''
1498
1499 @CLIReadCommand('telemetry enable channel all')
1500 def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1501 '''
1502 Enable all channels
1503 '''
1504 return self.toggle_channel('enable', channels)
1505
1506 @CLIReadCommand('telemetry enable channel')
1507 def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1508 '''
1509 Enable a list of channels
1510 '''
1511 return self.toggle_channel('enable', channels)
1512
1513 @CLIReadCommand('telemetry disable channel all')
1514 def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1515 '''
1516 Disable all channels
1517 '''
1518 return self.toggle_channel('disable', channels)
1519
1520 @CLIReadCommand('telemetry disable channel')
1521 def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1522 '''
1523 Disable a list of channels
1524 '''
1525 return self.toggle_channel('disable', channels)
1526
1527 @CLIReadCommand('telemetry channel ls')
1528 def channel_ls(self) -> Tuple[int, str, str]:
1529 '''
1530 List all channels
1531 '''
1532 table = PrettyTable(
1533 [
1534 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1535 ],
1536 border=False)
1537 table.align['NAME'] = 'l'
1538 table.align['ENABLED'] = 'l'
1539 table.align['DEFAULT'] = 'l'
1540 table.align['DESC'] = 'l'
1541 table.left_padding_width = 0
1542 table.right_padding_width = 4
1543
1544 for c in ALL_CHANNELS:
1545 enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
1546 for o in self.MODULE_OPTIONS:
1547 if o['name'] == f"channel_{c}":
1548 default = "ON" if o.get('default', None) else "OFF"
1549 desc = o.get('desc', None)
1550
1551 table.add_row((
1552 c,
1553 enabled,
1554 default,
1555 desc,
1556 ))
1557
1558 return 0, table.get_string(sortby="NAME"), ''
1559
1560 @CLIReadCommand('telemetry collection ls')
1561 def collection_ls(self) -> Tuple[int, str, str]:
1562 '''
1563 List all collections
1564 '''
1565 col_delta = self.collection_delta()
1566 msg = ''
1567 if col_delta is not None and len(col_delta) > 0:
1568 msg = f"New collections are available:\n" \
1569 f"{sorted([c.name for c in col_delta])}\n" \
1570 f"Run `ceph telemetry on` to opt-in to these collections.\n"
1571
1572 table = PrettyTable(
1573 [
1574 'NAME', 'STATUS', 'DESC',
1575 ],
1576 border=False)
1577 table.align['NAME'] = 'l'
1578 table.align['STATUS'] = 'l'
1579 table.align['DESC'] = 'l'
1580 table.left_padding_width = 0
1581 table.right_padding_width = 4
1582
1583 for c in MODULE_COLLECTION:
1584 name = c['name']
1585 opted_in = self.is_enabled_collection(name)
1586 channel_enabled = getattr(self, f"channel_{c['channel']}")
1587
1588 status = ''
1589 if channel_enabled and opted_in:
1590 status = "REPORTING"
1591 else:
1592 why = ''
1593 delimiter = ''
1594
1595 if not opted_in:
1596 why += "NOT OPTED-IN"
1597 delimiter = ', '
1598 if not channel_enabled:
1599 why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
1600
1601 status = f"NOT REPORTING: {why}"
1602
1603 desc = c['description']
1604
1605 table.add_row((
1606 name,
1607 status,
1608 desc,
1609 ))
1610
1611 if len(msg):
1612 # add a new line between message and table output
1613 msg = f"{msg} \n"
1614
1615 return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
1616
1617 @CLICommand('telemetry send')
1618 def do_send(self,
1619 endpoint: Optional[List[EndPoint]] = None,
1620 license: Optional[str] = None) -> Tuple[int, str, str]:
1621 '''
1622 Send a sample report
1623 '''
1624 if not self.is_opted_in() and license != LICENSE:
1625 self.log.debug(('A telemetry send attempt while opted-out. '
1626 'Asking for license agreement'))
1627 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1628To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1629Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1630 else:
1631 self.last_report = self.compile_report()
1632 return self.send(self.last_report, endpoint)
1633
1634 @CLIReadCommand('telemetry show')
1635 def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1636 '''
1637 Show a sample report of opted-in collections (except for 'device')
1638 '''
1639 if not self.enabled:
1640 # if telemetry is off, no report is being sent, hence nothing to show
1641 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1642 'Preview sample reports with `ceph telemetry preview`.'
1643 return 0, msg, ''
1644
1645 report = self.get_report_locked(channels=channels)
1646 self.format_perf_histogram(report)
1647 report = json.dumps(report, indent=4, sort_keys=True)
1648
1649 if self.channel_device:
1650 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1651
1652 return 0, report, ''
1653
1654 @CLIReadCommand('telemetry preview')
1655 def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1656 '''
1657 Preview a sample report of the most recent collections available (except for 'device')
1658 '''
1659 report = {}
1660
1661 # We use a lock to prevent a scenario where the user wishes to preview
1662 # the report, and at the same time the module hits the interval of
1663 # sending a report with the opted-in collection, which has less data
1664 # than in the preview report.
1665 col_delta = self.collection_delta()
1666 with self.get_report_lock:
1667 if col_delta is not None and len(col_delta) == 0:
1668 # user is already opted-in to the most recent collection
1669 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1670 return 0, msg, ''
1671 else:
1672 # there are collections the user is not opted-in to
1673 next_collection = []
1674
1675 for c in MODULE_COLLECTION:
1676 next_collection.append(c['name'].name)
1677
1678 opted_in_collection = self.db_collection
1679 self.db_collection = next_collection
1680 report = self.get_report(channels=channels)
1681 self.db_collection = opted_in_collection
e306af50 1682
20effc67
TL
1683 self.format_perf_histogram(report)
1684 report = json.dumps(report, indent=4, sort_keys=True)
1685
1686 if self.channel_device:
1687 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1688
1689 return 0, report, ''
1690
1691 @CLIReadCommand('telemetry show-device')
1692 def show_device(self) -> Tuple[int, str, str]:
1693 '''
1694 Show a sample device report
1695 '''
1696 if not self.enabled:
1697 # if telemetry is off, no report is being sent, hence nothing to show
1698 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1699 'Preview sample device reports with `ceph telemetry preview-device`.'
1700 return 0, msg, ''
1701
1702 if not self.channel_device:
1703 # if device channel is off, device report is not being sent, hence nothing to show
1704 msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1705 'Preview sample device reports with `ceph telemetry preview-device`.'
1706 return 0, msg, ''
1707
1708 return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
1709
1710 @CLIReadCommand('telemetry preview-device')
1711 def preview_device(self) -> Tuple[int, str, str]:
1712 '''
1713 Preview a sample device report of the most recent device collection
1714 '''
1715 report = {}
1716
1717 device_col_delta = self.collection_delta(['device'])
1718 with self.get_report_lock:
1719 if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
1720 # user is already opted-in to the most recent device collection,
1721 # and device channel is on, thus `show-device` should be called
1722 msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1723 return 0, msg, ''
1724
1725 # either the user is not opted-in at all, or there are collections
1726 # they are not opted-in to
1727 next_collection = []
1728
1729 for c in MODULE_COLLECTION:
1730 next_collection.append(c['name'].name)
1731
1732 opted_in_collection = self.db_collection
1733 self.db_collection = next_collection
1734 report = self.get_report('device')
1735 self.db_collection = opted_in_collection
1736
1737 report = json.dumps(report, indent=4, sort_keys=True)
1738 return 0, report, ''
1739
1740 @CLIReadCommand('telemetry show-all')
1741 def show_all(self) -> Tuple[int, str, str]:
1742 '''
1743 Show a sample report of all enabled channels (including 'device' channel)
1744 '''
1745 if not self.enabled:
1746 # if telemetry is off, no report is being sent, hence nothing to show
1747 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1748 'Preview sample reports with `ceph telemetry preview`.'
1749 return 0, msg, ''
1750
1751 if not self.channel_device:
1752 # device channel is off, no need to display its report
1753 return 0, json.dumps(self.get_report_locked('default'), indent=4, sort_keys=True), ''
1754
1755 # telemetry is on and device channel is enabled, show both
1756 return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), ''
1757
1758 @CLIReadCommand('telemetry preview-all')
1759 def preview_all(self) -> Tuple[int, str, str]:
1760 '''
1761 Preview a sample report of the most recent collections available of all channels (including 'device')
1762 '''
1763 report = {}
1764
1765 col_delta = self.collection_delta()
1766 with self.get_report_lock:
1767 if col_delta is not None and len(col_delta) == 0:
1768 # user is already opted-in to the most recent collection
1769 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1770 return 0, msg, ''
1771
1772 # there are collections the user is not opted-in to
1773 next_collection = []
1774
1775 for c in MODULE_COLLECTION:
1776 next_collection.append(c['name'].name)
1777
1778 opted_in_collection = self.db_collection
1779 self.db_collection = next_collection
1780 report = self.get_report('all')
1781 self.db_collection = opted_in_collection
1782
1783 self.format_perf_histogram(report)
1784 report = json.dumps(report, indent=4, sort_keys=True)
1785
1786 return 0, report, ''
1787
1788 def get_report_locked(self,
1789 report_type: str = 'default',
1790 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1791 '''
1792 A wrapper around get_report to allow for compiling a report of the most recent module collections
1793 '''
1794 with self.get_report_lock:
1795 return self.get_report(report_type, channels)
1796
1797 def get_report(self,
1798 report_type: str = 'default',
1799 channels: Optional[List[str]] = None) -> Dict[str, Any]:
e306af50
TL
1800 if report_type == 'default':
1801 return self.compile_report(channels=channels)
1802 elif report_type == 'device':
1803 return self.gather_device_report()
1804 elif report_type == 'all':
1805 return {'report': self.compile_report(channels=channels),
1806 'device_report': self.gather_device_report()}
1807 return {}
1808
20effc67 1809 def self_test(self) -> None:
11fdf7f2
TL
1810 report = self.compile_report()
1811 if len(report) == 0:
1812 raise RuntimeError('Report is empty')
1813
1814 if 'report_id' not in report:
1815 raise RuntimeError('report_id not found in report')
1816
20effc67 1817 def shutdown(self) -> None:
11fdf7f2
TL
1818 self.run = False
1819 self.event.set()
1820
20effc67 1821 def refresh_health_checks(self) -> None:
eafe8130 1822 health_checks = {}
20effc67
TL
1823 # TODO do we want to nag also in case the user is not opted-in?
1824 if self.enabled and self.should_nag():
eafe8130
TL
1825 health_checks['TELEMETRY_CHANGED'] = {
1826 'severity': 'warning',
1827 'summary': 'Telemetry requires re-opt-in',
1828 'detail': [
20effc67 1829 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
eafe8130
TL
1830 ]
1831 }
1832 self.set_health_checks(health_checks)
1833
20effc67 1834 def serve(self) -> None:
11fdf7f2 1835 self.load()
11fdf7f2
TL
1836 self.run = True
1837
1838 self.log.debug('Waiting for mgr to warm up')
20effc67 1839 time.sleep(10)
11fdf7f2
TL
1840
1841 while self.run:
eafe8130
TL
1842 self.event.clear()
1843
1844 self.refresh_health_checks()
1845
20effc67 1846 if not self.is_opted_in():
eafe8130
TL
1847 self.log.debug('Not sending report until user re-opts-in')
1848 self.event.wait(1800)
1849 continue
11fdf7f2 1850 if not self.enabled:
eafe8130 1851 self.log.debug('Not sending report until configured to do so')
11fdf7f2
TL
1852 self.event.wait(1800)
1853 continue
1854
1855 now = int(time.time())
20effc67
TL
1856 if not self.last_upload or \
1857 (now - self.last_upload) > self.interval * 3600:
11fdf7f2
TL
1858 self.log.info('Compiling and sending report to %s',
1859 self.url)
1860
1861 try:
1862 self.last_report = self.compile_report()
20effc67 1863 except Exception:
11fdf7f2
TL
1864 self.log.exception('Exception while compiling report:')
1865
eafe8130 1866 self.send(self.last_report)
11fdf7f2 1867 else:
eafe8130 1868 self.log.debug('Interval for sending new report has not expired')
11fdf7f2
TL
1869
1870 sleep = 3600
1871 self.log.debug('Sleeping for %d seconds', sleep)
1872 self.event.wait(sleep)
1873
11fdf7f2 1874 @staticmethod
20effc67 1875 def can_run() -> Tuple[bool, str]:
11fdf7f2 1876 return True, ''