]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
import quincy 17.2.0
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import logging
8 import numbers
9 import enum
10 import errno
11 import hashlib
12 import json
13 import rbd
14 import requests
15 import uuid
16 import time
17 from datetime import datetime, timedelta
18 from prettytable import PrettyTable
19 from threading import Event, Lock
20 from collections import defaultdict
21 from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
22
23 from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
24
25
26 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
27
28 LICENSE = 'sharing-1-0'
29 LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL = 'https://cdla.io/sharing-1-0/'
31 NO_SALT_CNT = 0
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Collection(str, enum.Enum):
63 basic_base = 'basic_base'
64 device_base = 'device_base'
65 crash_base = 'crash_base'
66 ident_base = 'ident_base'
67 perf_perf = 'perf_perf'
68 basic_mds_metadata = 'basic_mds_metadata'
69 basic_pool_usage = 'basic_pool_usage'
70 basic_usage_by_class = 'basic_usage_by_class'
71
72 MODULE_COLLECTION : List[Dict] = [
73 {
74 "name": Collection.basic_base,
75 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
76 "channel": "basic",
77 "nag": False
78 },
79 {
80 "name": Collection.device_base,
81 "description": "Information about device health metrics",
82 "channel": "device",
83 "nag": False
84 },
85 {
86 "name": Collection.crash_base,
87 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
88 "channel": "crash",
89 "nag": False
90 },
91 {
92 "name": Collection.ident_base,
93 "description": "User-provided identifying information about the cluster",
94 "channel": "ident",
95 "nag": False
96 },
97 {
98 "name": Collection.perf_perf,
99 "description": "Information about performance counters of the cluster",
100 "channel": "perf",
101 "nag": True
102 },
103 {
104 "name": Collection.basic_mds_metadata,
105 "description": "MDS metadata",
106 "channel": "basic",
107 "nag": False
108 },
109 {
110 "name": Collection.basic_pool_usage,
111 "description": "Default pool application and usage statistics",
112 "channel": "basic",
113 "nag": False
114 },
115 {
116 "name": Collection.basic_usage_by_class,
117 "description": "Default device class usage statistics",
118 "channel": "basic",
119 "nag": False
120 }
121 ]
122
123 class Module(MgrModule):
124 metadata_keys = [
125 "arch",
126 "ceph_version",
127 "os",
128 "cpu",
129 "kernel_description",
130 "kernel_version",
131 "distro_description",
132 "distro"
133 ]
134
135 MODULE_OPTIONS = [
136 Option(name='url',
137 type='str',
138 default='https://telemetry.ceph.com/report'),
139 Option(name='device_url',
140 type='str',
141 default='https://telemetry.ceph.com/device'),
142 Option(name='enabled',
143 type='bool',
144 default=False),
145 Option(name='last_opt_revision',
146 type='int',
147 default=1),
148 Option(name='leaderboard',
149 type='bool',
150 default=False),
151 Option(name='description',
152 type='str',
153 default=None),
154 Option(name='contact',
155 type='str',
156 default=None),
157 Option(name='organization',
158 type='str',
159 default=None),
160 Option(name='proxy',
161 type='str',
162 default=None),
163 Option(name='interval',
164 type='int',
165 default=24,
166 min=8),
167 Option(name='channel_basic',
168 type='bool',
169 default=True,
170 desc='Share basic cluster information (size, version)'),
171 Option(name='channel_ident',
172 type='bool',
173 default=False,
174 desc='Share a user-provided description and/or contact email for the cluster'),
175 Option(name='channel_crash',
176 type='bool',
177 default=True,
178 desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
179 Option(name='channel_device',
180 type='bool',
181 default=True,
182 desc=('Share device health metrics '
183 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
184 Option(name='channel_perf',
185 type='bool',
186 default=False,
187 desc='Share various performance metrics of a cluster'),
188 ]
189
190 @property
191 def config_keys(self) -> Dict[str, OptionValue]:
192 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
193
194 def __init__(self, *args: Any, **kwargs: Any) -> None:
195 super(Module, self).__init__(*args, **kwargs)
196 self.event = Event()
197 self.run = False
198 self.db_collection: Optional[List[str]] = None
199 self.last_opted_in_ceph_version: Optional[int] = None
200 self.last_opted_out_ceph_version: Optional[int] = None
201 self.last_upload: Optional[int] = None
202 self.last_report: Dict[str, Any] = dict()
203 self.report_id: Optional[str] = None
204 self.salt: Optional[str] = None
205 self.get_report_lock = Lock()
206 self.config_update_module_option()
207 # for mypy which does not run the code
208 if TYPE_CHECKING:
209 self.url = ''
210 self.device_url = ''
211 self.enabled = False
212 self.last_opt_revision = 0
213 self.leaderboard = ''
214 self.interval = 0
215 self.proxy = ''
216 self.channel_basic = True
217 self.channel_ident = False
218 self.channel_crash = True
219 self.channel_device = True
220 self.channel_perf = False
221 self.db_collection = ['basic_base', 'device_base']
222 self.last_opted_in_ceph_version = 17
223 self.last_opted_out_ceph_version = 0
224
225 def config_update_module_option(self) -> None:
226 for opt in self.MODULE_OPTIONS:
227 setattr(self,
228 opt['name'],
229 self.get_module_option(opt['name']))
230 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
231
232 def config_notify(self) -> None:
233 self.config_update_module_option()
234 # wake up serve() thread
235 self.event.set()
236
237 def load(self) -> None:
238 last_upload = self.get_store('last_upload', None)
239 if last_upload is None:
240 self.last_upload = None
241 else:
242 self.last_upload = int(last_upload)
243
244 report_id = self.get_store('report_id', None)
245 if report_id is None:
246 self.report_id = str(uuid.uuid4())
247 self.set_store('report_id', self.report_id)
248 else:
249 self.report_id = report_id
250
251 salt = self.get_store('salt', None)
252 if salt is None:
253 self.salt = str(uuid.uuid4())
254 self.set_store('salt', self.salt)
255 else:
256 self.salt = salt
257
258 self.init_collection()
259
260 last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
261 if last_opted_in_ceph_version is None:
262 self.last_opted_in_ceph_version = None
263 else:
264 self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
265
266 last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
267 if last_opted_out_ceph_version is None:
268 self.last_opted_out_ceph_version = None
269 else:
270 self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
271
272 def gather_osd_metadata(self,
273 osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
274 keys = ["osd_objectstore", "rotational"]
275 keys += self.metadata_keys
276
277 metadata: Dict[str, Dict[str, int]] = dict()
278 for key in keys:
279 metadata[key] = defaultdict(int)
280
281 for osd in osd_map['osds']:
282 res = self.get_metadata('osd', str(osd['osd']))
283 if res is None:
284 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
285 continue
286 for k, v in res.items():
287 if k not in keys:
288 continue
289
290 metadata[k][v] += 1
291
292 return metadata
293
294 def gather_mon_metadata(self,
295 mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
296 keys = list()
297 keys += self.metadata_keys
298
299 metadata: Dict[str, Dict[str, int]] = dict()
300 for key in keys:
301 metadata[key] = defaultdict(int)
302
303 for mon in mon_map['mons']:
304 res = self.get_metadata('mon', mon['name'])
305 if res is None:
306 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
307 continue
308 for k, v in res.items():
309 if k not in keys:
310 continue
311
312 metadata[k][v] += 1
313
314 return metadata
315
316 def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
317 metadata: Dict[str, Dict[str, int]] = dict()
318
319 res = self.get('mds_metadata') # metadata of *all* mds daemons
320 if res is None or not res:
321 self.log.debug('Could not get metadata for mds daemons')
322 return metadata
323
324 keys = list()
325 keys += self.metadata_keys
326
327 for key in keys:
328 metadata[key] = defaultdict(int)
329
330 for mds in res.values():
331 for k, v in mds.items():
332 if k not in keys:
333 continue
334
335 metadata[k][v] += 1
336
337 return metadata
338
339 def gather_crush_info(self) -> Dict[str, Union[int,
340 bool,
341 List[int],
342 Dict[str, int],
343 Dict[int, int]]]:
344 osdmap = self.get_osdmap()
345 crush_raw = osdmap.get_crush()
346 crush = crush_raw.dump()
347
348 BucketKeyT = TypeVar('BucketKeyT', int, str)
349
350 def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
351 if k in d:
352 d[k] += 1
353 else:
354 d[k] = 1
355
356 device_classes: Dict[str, int] = {}
357 for dev in crush['devices']:
358 inc(device_classes, dev.get('class', ''))
359
360 bucket_algs: Dict[str, int] = {}
361 bucket_types: Dict[str, int] = {}
362 bucket_sizes: Dict[int, int] = {}
363 for bucket in crush['buckets']:
364 if '~' in bucket['name']: # ignore shadow buckets
365 continue
366 inc(bucket_algs, bucket['alg'])
367 inc(bucket_types, bucket['type_id'])
368 inc(bucket_sizes, len(bucket['items']))
369
370 return {
371 'num_devices': len(crush['devices']),
372 'num_types': len(crush['types']),
373 'num_buckets': len(crush['buckets']),
374 'num_rules': len(crush['rules']),
375 'device_classes': list(device_classes.values()),
376 'tunables': crush['tunables'],
377 'compat_weight_set': '-1' in crush['choose_args'],
378 'num_weight_sets': len(crush['choose_args']),
379 'bucket_algs': bucket_algs,
380 'bucket_sizes': bucket_sizes,
381 'bucket_types': bucket_types,
382 }
383
384 def gather_configs(self) -> Dict[str, List[str]]:
385 # cluster config options
386 cluster = set()
387 r, outb, outs = self.mon_command({
388 'prefix': 'config dump',
389 'format': 'json'
390 })
391 if r != 0:
392 return {}
393 try:
394 dump = json.loads(outb)
395 except json.decoder.JSONDecodeError:
396 return {}
397 for opt in dump:
398 name = opt.get('name')
399 if name:
400 cluster.add(name)
401 # daemon-reported options (which may include ceph.conf)
402 active = set()
403 ls = self.get("modified_config_options")
404 for opt in ls.get('options', {}):
405 active.add(opt)
406 return {
407 'cluster_changed': sorted(list(cluster)),
408 'active_changed': sorted(list(active)),
409 }
410
411 def anonymize_entity_name(self, entity_name:str) -> str:
412 if '.' not in entity_name:
413 self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
414 return entity_name
415
416 (etype, eid) = entity_name.split('.', 1)
417 m = hashlib.sha1()
418 salt = ''
419 if self.salt is not None:
420 salt = self.salt
421 # avoid asserting that salt exists
422 if not self.salt:
423 # do not set self.salt to a temp value
424 salt = f"no_salt_found_{NO_SALT_CNT}"
425 NO_SALT_CNT += 1
426 self.log.debug(f"No salt found, created a temp one: {salt}")
427 m.update(salt.encode('utf-8'))
428 m.update(eid.encode('utf-8'))
429 m.update(salt.encode('utf-8'))
430
431 return etype + '.' + m.hexdigest()
432
433 def get_heap_stats(self) -> Dict[str, dict]:
434 # Initialize result dict
435 result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
436
437 # Get list of osd ids from the metadata
438 osd_metadata = self.get('osd_metadata')
439
440 # Grab output from the "osd.x heap stats" command
441 for osd_id in osd_metadata:
442 cmd_dict = {
443 'prefix': 'heap',
444 'heapcmd': 'stats',
445 'id': str(osd_id),
446 }
447 r, outb, outs = self.osd_command(cmd_dict)
448 if r != 0:
449 self.log.debug("Invalid command dictionary.")
450 continue
451 else:
452 if 'tcmalloc heap stats' in outs:
453 values = [int(i) for i in outs.split() if i.isdigit()]
454 # `categories` must be ordered this way for the correct output to be parsed
455 categories = ['use_by_application',
456 'page_heap_freelist',
457 'central_cache_freelist',
458 'transfer_cache_freelist',
459 'thread_cache_freelists',
460 'malloc_metadata',
461 'actual_memory_used',
462 'released_to_os',
463 'virtual_address_space_used',
464 'spans_in_use',
465 'thread_heaps_in_use',
466 'tcmalloc_page_size']
467 if len(values) != len(categories):
468 self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
469 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
470 continue
471 osd = 'osd.' + str(osd_id)
472 result[osd] = dict(zip(categories, values))
473 else:
474 self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
475 continue
476
477 return result
478
479 def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
480 # Initialize result dict
481 result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
482
483 # Get list of osd ids from the metadata
484 osd_metadata = self.get('osd_metadata')
485
486 # Grab output from the "osd.x dump_mempools" command
487 for osd_id in osd_metadata:
488 cmd_dict = {
489 'prefix': 'dump_mempools',
490 'id': str(osd_id),
491 'format': 'json'
492 }
493 r, outb, outs = self.osd_command(cmd_dict)
494 if r != 0:
495 self.log.debug("Invalid command dictionary.")
496 continue
497 else:
498 try:
499 # This is where the mempool will land.
500 dump = json.loads(outb)
501 if mode == 'separated':
502 result["osd." + str(osd_id)] = dump['mempool']['by_pool']
503 elif mode == 'aggregated':
504 for mem_type in dump['mempool']['by_pool']:
505 result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
506 result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
507 else:
508 self.log.debug("Incorrect mode specified in get_mempool")
509 except (json.decoder.JSONDecodeError, KeyError) as e:
510 self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
511 continue
512
513 return result
514
515 def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
516 # Initialize result dict
517 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
518 lambda: defaultdict(
519 lambda: defaultdict(
520 lambda: defaultdict(
521 lambda: defaultdict(int))))))
522
523 # Get list of osd ids from the metadata
524 osd_metadata = self.get('osd_metadata')
525
526 # Grab output from the "osd.x perf histogram dump" command
527 for osd_id in osd_metadata:
528 cmd_dict = {
529 'prefix': 'perf histogram dump',
530 'id': str(osd_id),
531 'format': 'json'
532 }
533 r, outb, outs = self.osd_command(cmd_dict)
534 # Check for invalid calls
535 if r != 0:
536 self.log.debug("Invalid command dictionary.")
537 continue
538 else:
539 try:
540 # This is where the histograms will land if there are any.
541 dump = json.loads(outb)
542
543 for histogram in dump['osd']:
544 # Log axis information. There are two axes, each represented
545 # as a dictionary. Both dictionaries are contained inside a
546 # list called 'axes'.
547 axes = []
548 for axis in dump['osd'][histogram]['axes']:
549
550 # This is the dict that contains information for an individual
551 # axis. It will be appended to the 'axes' list at the end.
552 axis_dict: Dict[str, Any] = defaultdict()
553
554 # Collecting information for buckets, min, name, etc.
555 axis_dict['buckets'] = axis['buckets']
556 axis_dict['min'] = axis['min']
557 axis_dict['name'] = axis['name']
558 axis_dict['quant_size'] = axis['quant_size']
559 axis_dict['scale_type'] = axis['scale_type']
560
561 # Collecting ranges; placing them in lists to
562 # improve readability later on.
563 ranges = []
564 for _range in axis['ranges']:
565 _max, _min = None, None
566 if 'max' in _range:
567 _max = _range['max']
568 if 'min' in _range:
569 _min = _range['min']
570 ranges.append([_min, _max])
571 axis_dict['ranges'] = ranges
572
573 # Now that 'axis_dict' contains all the appropriate
574 # information for the current axis, append it to the 'axes' list.
575 # There will end up being two axes in the 'axes' list, since the
576 # histograms are 2D.
577 axes.append(axis_dict)
578
579 # Add the 'axes' list, containing both axes, to result.
580 # At this point, you will see that the name of the key is the string
581 # form of our axes list (str(axes)). This is there so that histograms
582 # with different axis configs will not be combined.
583 # These key names are later dropped when only the values are returned.
584 result[str(axes)][histogram]['axes'] = axes
585
586 # Collect current values and make sure they are in
587 # integer form.
588 values = []
589 for value_list in dump['osd'][histogram]['values']:
590 values.append([int(v) for v in value_list])
591
592 if mode == 'separated':
593 if 'osds' not in result[str(axes)][histogram]:
594 result[str(axes)][histogram]['osds'] = []
595 result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
596
597 elif mode == 'aggregated':
598 # Aggregate values. If 'values' have already been initialized,
599 # we can safely add.
600 if 'values' in result[str(axes)][histogram]:
601 for i in range (0, len(values)):
602 for j in range (0, len(values[i])):
603 values[i][j] += result[str(axes)][histogram]['values'][i][j]
604
605 # Add the values to result.
606 result[str(axes)][histogram]['values'] = values
607
608 # Update num_combined_osds
609 if 'num_combined_osds' not in result[str(axes)][histogram]:
610 result[str(axes)][histogram]['num_combined_osds'] = 1
611 else:
612 result[str(axes)][histogram]['num_combined_osds'] += 1
613 else:
614 self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
615 return list()
616
617 # Sometimes, json errors occur if you give it an empty string.
618 # I am also putting in a catch for a KeyError since it could
619 # happen where the code is assuming that a key exists in the
620 # schema when it doesn't. In either case, we'll handle that
621 # by continuing and collecting what we can from other osds.
622 except (json.decoder.JSONDecodeError, KeyError) as e:
623 self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
624 continue
625
626 return list(result.values())
627
628 def get_io_rate(self) -> dict:
629 return self.get('io_rate')
630
631 def get_stats_per_pool(self) -> dict:
632 result = self.get('pg_dump')['pool_stats']
633
634 # collect application metadata from osd_map
635 osd_map = self.get('osd_map')
636 application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
637
638 # add application to each pool from pg_dump
639 for pool in result:
640 pool['application'] = []
641 # Only include default applications
642 for application in application_metadata[pool['poolid']]:
643 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
644 pool['application'].append(application)
645
646 return result
647
648 def get_stats_per_pg(self) -> dict:
649 return self.get('pg_dump')['pg_stats']
650
651 def get_rocksdb_stats(self) -> Dict[str, str]:
652 # Initalizers
653 result: Dict[str, str] = defaultdict()
654 version = self.get_rocksdb_version()
655
656 # Update result
657 result['version'] = version
658
659 return result
660
661 def gather_crashinfo(self) -> List[Dict[str, str]]:
662 crashlist: List[Dict[str, str]] = list()
663 errno, crashids, err = self.remote('crash', 'ls')
664 if errno:
665 return crashlist
666 for crashid in crashids.split():
667 errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
668 if errno:
669 continue
670 c = json.loads(crashinfo)
671
672 # redact hostname
673 del c['utsname_hostname']
674
675 # entity_name might have more than one '.', beware
676 (etype, eid) = c.get('entity_name', '').split('.', 1)
677 m = hashlib.sha1()
678 assert self.salt
679 m.update(self.salt.encode('utf-8'))
680 m.update(eid.encode('utf-8'))
681 m.update(self.salt.encode('utf-8'))
682 c['entity_name'] = etype + '.' + m.hexdigest()
683
684 # redact final line of python tracebacks, as the exception
685 # payload may contain identifying information
686 if 'mgr_module' in c and 'backtrace' in c:
687 # backtrace might be empty
688 if len(c['backtrace']) > 0:
689 c['backtrace'][-1] = '<redacted>'
690
691 crashlist.append(c)
692 return crashlist
693
694 def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
695 # Extract perf counter data with get_all_perf_counters(), a method
696 # from mgr/mgr_module.py. This method returns a nested dictionary that
697 # looks a lot like perf schema, except with some additional fields.
698 #
699 # Example of output, a snapshot of a mon daemon:
700 # "mon.b": {
701 # "bluestore.kv_flush_lat": {
702 # "count": 2431,
703 # "description": "Average kv_thread flush latency",
704 # "nick": "fl_l",
705 # "priority": 8,
706 # "type": 5,
707 # "units": 1,
708 # "value": 88814109
709 # },
710 # },
711 all_perf_counters = self.get_all_perf_counters()
712
713 # Initialize 'result' dict
714 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
715 lambda: defaultdict(lambda: defaultdict(int))))
716
717 # 'separated' mode
718 anonymized_daemon_dict = {}
719
720 for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
721 daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
722
723 if mode == 'separated':
724 # anonymize individual daemon names except osds
725 if (daemon_type != 'osd'):
726 anonymized_daemon = self.anonymize_entity_name(daemon)
727 anonymized_daemon_dict[anonymized_daemon] = daemon
728 daemon = anonymized_daemon
729
730 # Calculate num combined daemon types if in aggregated mode
731 if mode == 'aggregated':
732 if 'num_combined_daemons' not in result[daemon_type]:
733 result[daemon_type]['num_combined_daemons'] = 1
734 else:
735 result[daemon_type]['num_combined_daemons'] += 1
736
737 for collection in all_perf_counters_by_daemon:
738 # Split the collection to avoid redundancy in final report; i.e.:
739 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
740 # bluestore: kv_flush_lat, kv_final_lat
741 col_0, col_1 = collection.split('.')
742
743 # Debug log for empty keys. This initially was a problem for prioritycache
744 # perf counters, where the col_0 was empty for certain mon counters:
745 #
746 # "mon.a": { instead of "mon.a": {
747 # "": { "prioritycache": {
748 # "cache_bytes": {...}, "cache_bytes": {...},
749 #
750 # This log is here to detect any future instances of a similar issue.
751 if (daemon == "") or (col_0 == "") or (col_1 == ""):
752 self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
753
754 if mode == 'separated':
755 # Add value to result
756 result[daemon][col_0][col_1]['value'] = \
757 all_perf_counters_by_daemon[collection]['value']
758
759 # Check that 'count' exists, as not all counters have a count field.
760 if 'count' in all_perf_counters_by_daemon[collection]:
761 result[daemon][col_0][col_1]['count'] = \
762 all_perf_counters_by_daemon[collection]['count']
763 elif mode == 'aggregated':
764 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
765 # has a uniquely-named collection that starts off identically (i.e.
766 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
767 # This bit of code combines these unique counters all under one rgw instance.
768 # Without this check, the schema would remain separeted out in the final report.
769 if col_0[0:11] == "objecter-0x":
770 col_0 = "objecter-0x"
771
772 # Check that the value can be incremented. In some cases,
773 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
774 # In those cases, the value is a dictionary, and not a number.
775 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
776 if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
777 result[daemon_type][col_0][col_1]['value'] += \
778 all_perf_counters_by_daemon[collection]['value']
779
780 # Check that 'count' exists, as not all counters have a count field.
781 if 'count' in all_perf_counters_by_daemon[collection]:
782 result[daemon_type][col_0][col_1]['count'] += \
783 all_perf_counters_by_daemon[collection]['count']
784 else:
785 self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
786 return {}
787
788 if mode == 'separated':
789 # for debugging purposes only, this data is never reported
790 self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
791
792 return result
793
794 def get_active_channels(self) -> List[str]:
795 r = []
796 if self.channel_basic:
797 r.append('basic')
798 if self.channel_crash:
799 r.append('crash')
800 if self.channel_device:
801 r.append('device')
802 if self.channel_ident:
803 r.append('ident')
804 if self.channel_perf:
805 r.append('perf')
806 return r
807
808 def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
809 try:
810 time_format = self.remote('devicehealth', 'get_time_format')
811 except Exception as e:
812 self.log.debug('Unable to format time: {}'.format(e))
813 return {}
814 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
815 min_sample = cutoff.strftime(time_format)
816
817 devices = self.get('devices')['devices']
818 if not devices:
819 self.log.debug('Unable to get device info from the mgr.')
820 return {}
821
822 # anon-host-id -> anon-devid -> { timestamp -> record }
823 res: Dict[str, Dict[str, Dict[str, str]]] = {}
824 for d in devices:
825 devid = d['devid']
826 try:
827 # this is a map of stamp -> {device info}
828 m = self.remote('devicehealth', 'get_recent_device_metrics',
829 devid, min_sample)
830 except Exception as e:
831 self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
832 continue
833
834 # anonymize host id
835 try:
836 host = d['location'][0]['host']
837 except (KeyError, IndexError) as e:
838 self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e))
839 continue
840 anon_host = self.get_store('host-id/%s' % host)
841 if not anon_host:
842 anon_host = str(uuid.uuid1())
843 self.set_store('host-id/%s' % host, anon_host)
844 serial = None
845 for dev, rep in m.items():
846 rep['host_id'] = anon_host
847 if serial is None and 'serial_number' in rep:
848 serial = rep['serial_number']
849
850 # anonymize device id
851 anon_devid = self.get_store('devid-id/%s' % devid)
852 if not anon_devid:
853 # ideally devid is 'vendor_model_serial',
854 # but can also be 'model_serial', 'serial'
855 if '_' in devid:
856 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
857 else:
858 anon_devid = str(uuid.uuid1())
859 self.set_store('devid-id/%s' % devid, anon_devid)
860 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
861 host, anon_host))
862
863 # anonymize the smartctl report itself
864 if serial:
865 m_str = json.dumps(m)
866 m = json.loads(m_str.replace(serial, 'deleted'))
867
868 if anon_host not in res:
869 res[anon_host] = {}
870 res[anon_host][anon_devid] = m
871 return res
872
873 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
874 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
875 if data:
876 return data[-1][1]
877 else:
878 return 0
879
880 def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
881 if not channels:
882 channels = self.get_active_channels()
883 report = {
884 'leaderboard': self.leaderboard,
885 'report_version': 1,
886 'report_timestamp': datetime.utcnow().isoformat(),
887 'report_id': self.report_id,
888 'channels': channels,
889 'channels_available': ALL_CHANNELS,
890 'license': LICENSE,
891 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
892 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
893 }
894
895 if 'ident' in channels:
896 for option in ['description', 'contact', 'organization']:
897 report[option] = getattr(self, option)
898
899 if 'basic' in channels:
900 mon_map = self.get('mon_map')
901 osd_map = self.get('osd_map')
902 service_map = self.get('service_map')
903 fs_map = self.get('fs_map')
904 df = self.get('df')
905 df_pools = {pool['id']: pool for pool in df['pools']}
906
907 report['created'] = mon_map['created']
908
909 # mons
910 v1_mons = 0
911 v2_mons = 0
912 ipv4_mons = 0
913 ipv6_mons = 0
914 for mon in mon_map['mons']:
915 for a in mon['public_addrs']['addrvec']:
916 if a['type'] == 'v2':
917 v2_mons += 1
918 elif a['type'] == 'v1':
919 v1_mons += 1
920 if a['addr'].startswith('['):
921 ipv6_mons += 1
922 else:
923 ipv4_mons += 1
924 report['mon'] = {
925 'count': len(mon_map['mons']),
926 'features': mon_map['features'],
927 'min_mon_release': mon_map['min_mon_release'],
928 'v1_addr_mons': v1_mons,
929 'v2_addr_mons': v2_mons,
930 'ipv4_addr_mons': ipv4_mons,
931 'ipv6_addr_mons': ipv6_mons,
932 }
933
934 report['config'] = self.gather_configs()
935
936 # pools
937
938 rbd_num_pools = 0
939 rbd_num_images_by_pool = []
940 rbd_mirroring_by_pool = []
941 num_pg = 0
942 report['pools'] = list()
943 for pool in osd_map['pools']:
944 num_pg += pool['pg_num']
945 ec_profile = {}
946 if pool['erasure_code_profile']:
947 orig = osd_map['erasure_code_profiles'].get(
948 pool['erasure_code_profile'], {})
949 ec_profile = {
950 k: orig[k] for k in orig.keys()
951 if k in ['k', 'm', 'plugin', 'technique',
952 'crush-failure-domain', 'l']
953 }
954 pool_data = {
955 'pool': pool['pool'],
956 'pg_num': pool['pg_num'],
957 'pgp_num': pool['pg_placement_num'],
958 'size': pool['size'],
959 'min_size': pool['min_size'],
960 'pg_autoscale_mode': pool['pg_autoscale_mode'],
961 'target_max_bytes': pool['target_max_bytes'],
962 'target_max_objects': pool['target_max_objects'],
963 'type': ['', 'replicated', '', 'erasure'][pool['type']],
964 'erasure_code_profile': ec_profile,
965 'cache_mode': pool['cache_mode'],
966 }
967
968 # basic_pool_usage collection
969 if self.is_enabled_collection(Collection.basic_pool_usage):
970 pool_data['application'] = []
971 for application in pool['application_metadata']:
972 # Only include default applications
973 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
974 pool_data['application'].append(application)
975 pool_stats = df_pools[pool['pool']]['stats']
976 pool_data['stats'] = { # filter out kb_used
977 'avail_raw': pool_stats['avail_raw'],
978 'bytes_used': pool_stats['bytes_used'],
979 'compress_bytes_used': pool_stats['compress_bytes_used'],
980 'compress_under_bytes': pool_stats['compress_under_bytes'],
981 'data_bytes_used': pool_stats['data_bytes_used'],
982 'dirty': pool_stats['dirty'],
983 'max_avail': pool_stats['max_avail'],
984 'objects': pool_stats['objects'],
985 'omap_bytes_used': pool_stats['omap_bytes_used'],
986 'percent_used': pool_stats['percent_used'],
987 'quota_bytes': pool_stats['quota_bytes'],
988 'quota_objects': pool_stats['quota_objects'],
989 'rd': pool_stats['rd'],
990 'rd_bytes': pool_stats['rd_bytes'],
991 'stored': pool_stats['stored'],
992 'stored_data': pool_stats['stored_data'],
993 'stored_omap': pool_stats['stored_omap'],
994 'stored_raw': pool_stats['stored_raw'],
995 'wr': pool_stats['wr'],
996 'wr_bytes': pool_stats['wr_bytes']
997 }
998
999 cast(List[Dict[str, Any]], report['pools']).append(pool_data)
1000 if 'rbd' in pool['application_metadata']:
1001 rbd_num_pools += 1
1002 ioctx = self.rados.open_ioctx(pool['pool_name'])
1003 rbd_num_images_by_pool.append(
1004 sum(1 for _ in rbd.RBD().list2(ioctx)))
1005 rbd_mirroring_by_pool.append(
1006 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
1007 report['rbd'] = {
1008 'num_pools': rbd_num_pools,
1009 'num_images_by_pool': rbd_num_images_by_pool,
1010 'mirroring_by_pool': rbd_mirroring_by_pool}
1011
1012 # osds
1013 cluster_network = False
1014 for osd in osd_map['osds']:
1015 if osd['up'] and not cluster_network:
1016 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
1017 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
1018 if front_ip != back_ip:
1019 cluster_network = True
1020 report['osd'] = {
1021 'count': len(osd_map['osds']),
1022 'require_osd_release': osd_map['require_osd_release'],
1023 'require_min_compat_client': osd_map['require_min_compat_client'],
1024 'cluster_network': cluster_network,
1025 }
1026
1027 # crush
1028 report['crush'] = self.gather_crush_info()
1029
1030 # cephfs
1031 report['fs'] = {
1032 'count': len(fs_map['filesystems']),
1033 'feature_flags': fs_map['feature_flags'],
1034 'num_standby_mds': len(fs_map['standbys']),
1035 'filesystems': [],
1036 }
1037 num_mds = len(fs_map['standbys'])
1038 for fsm in fs_map['filesystems']:
1039 fs = fsm['mdsmap']
1040 num_sessions = 0
1041 cached_ino = 0
1042 cached_dn = 0
1043 cached_cap = 0
1044 subtrees = 0
1045 rfiles = 0
1046 rbytes = 0
1047 rsnaps = 0
1048 for gid, mds in fs['info'].items():
1049 num_sessions += self.get_latest('mds', mds['name'],
1050 'mds_sessions.session_count')
1051 cached_ino += self.get_latest('mds', mds['name'],
1052 'mds_mem.ino')
1053 cached_dn += self.get_latest('mds', mds['name'],
1054 'mds_mem.dn')
1055 cached_cap += self.get_latest('mds', mds['name'],
1056 'mds_mem.cap')
1057 subtrees += self.get_latest('mds', mds['name'],
1058 'mds.subtrees')
1059 if mds['rank'] == 0:
1060 rfiles = self.get_latest('mds', mds['name'],
1061 'mds.root_rfiles')
1062 rbytes = self.get_latest('mds', mds['name'],
1063 'mds.root_rbytes')
1064 rsnaps = self.get_latest('mds', mds['name'],
1065 'mds.root_rsnaps')
1066 report['fs']['filesystems'].append({ # type: ignore
1067 'max_mds': fs['max_mds'],
1068 'ever_allowed_features': fs['ever_allowed_features'],
1069 'explicitly_allowed_features': fs['explicitly_allowed_features'],
1070 'num_in': len(fs['in']),
1071 'num_up': len(fs['up']),
1072 'num_standby_replay': len(
1073 [mds for gid, mds in fs['info'].items()
1074 if mds['state'] == 'up:standby-replay']),
1075 'num_mds': len(fs['info']),
1076 'num_sessions': num_sessions,
1077 'cached_inos': cached_ino,
1078 'cached_dns': cached_dn,
1079 'cached_caps': cached_cap,
1080 'cached_subtrees': subtrees,
1081 'balancer_enabled': len(fs['balancer']) > 0,
1082 'num_data_pools': len(fs['data_pools']),
1083 'standby_count_wanted': fs['standby_count_wanted'],
1084 'approx_ctime': fs['created'][0:7],
1085 'files': rfiles,
1086 'bytes': rbytes,
1087 'snaps': rsnaps,
1088 })
1089 num_mds += len(fs['info'])
1090 report['fs']['total_num_mds'] = num_mds # type: ignore
1091
1092 # daemons
1093 report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
1094 mon=self.gather_mon_metadata(mon_map))
1095
1096 if self.is_enabled_collection(Collection.basic_mds_metadata):
1097 report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
1098
1099 # host counts
1100 servers = self.list_servers()
1101 self.log.debug('servers %s' % servers)
1102 hosts = {
1103 'num': len([h for h in servers if h['hostname']]),
1104 }
1105 for t in ['mon', 'mds', 'osd', 'mgr']:
1106 nr_services = sum(1 for host in servers if
1107 any(service for service in cast(List[ServiceInfoT],
1108 host['services'])
1109 if service['type'] == t))
1110 hosts['num_with_' + t] = nr_services
1111 report['hosts'] = hosts
1112
1113 report['usage'] = {
1114 'pools': len(df['pools']),
1115 'pg_num': num_pg,
1116 'total_used_bytes': df['stats']['total_used_bytes'],
1117 'total_bytes': df['stats']['total_bytes'],
1118 'total_avail_bytes': df['stats']['total_avail_bytes']
1119 }
1120 # basic_usage_by_class collection
1121 if self.is_enabled_collection(Collection.basic_usage_by_class):
1122 report['usage']['stats_by_class'] = {} # type: ignore
1123 for device_class in df['stats_by_class']:
1124 if device_class in ['hdd', 'ssd', 'nvme']:
1125 report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
1126
1127 services: DefaultDict[str, int] = defaultdict(int)
1128 for key, value in service_map['services'].items():
1129 services[key] += 1
1130 if key == 'rgw':
1131 rgw = {}
1132 zones = set()
1133 zonegroups = set()
1134 frontends = set()
1135 count = 0
1136 d = value.get('daemons', dict())
1137 for k, v in d.items():
1138 if k == 'summary' and v:
1139 rgw[k] = v
1140 elif isinstance(v, dict) and 'metadata' in v:
1141 count += 1
1142 zones.add(v['metadata']['zone_id'])
1143 zonegroups.add(v['metadata']['zonegroup_id'])
1144 frontends.add(v['metadata']['frontend_type#0'])
1145
1146 # we could actually iterate over all the keys of
1147 # the dict and check for how many frontends there
1148 # are, but it is unlikely that one would be running
1149 # more than 2 supported ones
1150 f2 = v['metadata'].get('frontend_type#1', None)
1151 if f2:
1152 frontends.add(f2)
1153
1154 rgw['count'] = count
1155 rgw['zones'] = len(zones)
1156 rgw['zonegroups'] = len(zonegroups)
1157 rgw['frontends'] = list(frontends) # sets aren't json-serializable
1158 report['rgw'] = rgw
1159 report['services'] = services
1160
1161 try:
1162 report['balancer'] = self.remote('balancer', 'gather_telemetry')
1163 except ImportError:
1164 report['balancer'] = {
1165 'active': False
1166 }
1167
1168 if 'crash' in channels:
1169 report['crashes'] = self.gather_crashinfo()
1170
1171 if 'perf' in channels:
1172 if self.is_enabled_collection(Collection.perf_perf):
1173 report['perf_counters'] = self.gather_perf_counters('separated')
1174 report['stats_per_pool'] = self.get_stats_per_pool()
1175 report['stats_per_pg'] = self.get_stats_per_pg()
1176 report['io_rate'] = self.get_io_rate()
1177 report['osd_perf_histograms'] = self.get_osd_histograms('separated')
1178 report['mempool'] = self.get_mempool('separated')
1179 report['heap_stats'] = self.get_heap_stats()
1180 report['rocksdb_stats'] = self.get_rocksdb_stats()
1181
1182 # NOTE: We do not include the 'device' channel in this report; it is
1183 # sent to a different endpoint.
1184
1185 return report
1186
1187 def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
1188 self.log.info('Sending %s to: %s' % (what, url))
1189 proxies = dict()
1190 if self.proxy:
1191 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
1192 proxies['http'] = self.proxy
1193 proxies['https'] = self.proxy
1194 try:
1195 resp = requests.put(url=url, json=report, proxies=proxies)
1196 resp.raise_for_status()
1197 except Exception as e:
1198 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
1199 self.log.error(fail_reason)
1200 return fail_reason
1201 return None
1202
1203 class EndPoint(enum.Enum):
1204 ceph = 'ceph'
1205 device = 'device'
1206
1207 def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
1208 '''
1209 Find collections that are available in the module, but are not in the db
1210 '''
1211 if self.db_collection is None:
1212 return None
1213
1214 if not channels:
1215 channels = ALL_CHANNELS
1216 else:
1217 for ch in channels:
1218 if ch not in ALL_CHANNELS:
1219 self.log.debug(f"invalid channel name: {ch}")
1220 return None
1221
1222 new_collection : List[Collection] = []
1223
1224 for c in MODULE_COLLECTION:
1225 if c['name'].name not in self.db_collection:
1226 if c['channel'] in channels:
1227 new_collection.append(c['name'])
1228
1229 return new_collection
1230
1231 def is_major_upgrade(self) -> bool:
1232 '''
1233 Returns True only if the user last opted-in to an older major
1234 '''
1235 if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
1236 # we do not know what Ceph version was when the user last opted-in,
1237 # thus we do not wish to nag in case of a major upgrade
1238 return False
1239
1240 mon_map = self.get('mon_map')
1241 mon_min = mon_map.get("min_mon_release", 0)
1242
1243 if mon_min - self.last_opted_in_ceph_version > 0:
1244 self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1245 return True
1246
1247 return False
1248
1249 def is_opted_in(self) -> bool:
1250 # If len is 0 it means that the user is either opted-out (never
1251 # opted-in, or invoked `telemetry off`), or they upgraded from a
1252 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1253 # regardless, hence is considered as opted-out
1254 if self.db_collection is None:
1255 return False
1256 return len(self.db_collection) > 0
1257
1258 def should_nag(self) -> bool:
1259 # Find delta between opted-in collections and module collections;
1260 # nag only if module has a collection which is not in db, and nag == True.
1261
1262 # We currently do not nag if the user is opted-out (or never opted-in).
1263 # If we wish to do this in the future, we need to have a tri-mode state
1264 # (opted in, opted out, no action yet), and it needs to be guarded by a
1265 # config option (so that nagging can be turned off via config).
1266 # We also need to add a last_opted_out_ceph_version variable, for the
1267 # major upgrade check.
1268
1269 # check if there are collections the user is not opt-in to
1270 # that we should nag about
1271 if self.db_collection is not None:
1272 for c in MODULE_COLLECTION:
1273 if c['name'].name not in self.db_collection:
1274 if c['nag'] == True:
1275 self.log.debug(f"The collection: {c['name']} is not reported")
1276 return True
1277
1278 # user might be opted-in to the most recent collection, or there is no
1279 # new collection which requires nagging about; thus nag in case it's a
1280 # major upgrade and there are new collections
1281 # (which their own nag == False):
1282 new_collections = False
1283 col_delta = self.collection_delta()
1284 if col_delta is not None and len(col_delta) > 0:
1285 new_collections = True
1286
1287 return self.is_major_upgrade() and new_collections
1288
1289 def init_collection(self) -> None:
1290 # We fetch from db the collections the user had already opted-in to.
1291 # During the transition the results will be empty, but the user might
1292 # be opted-in to an older version (e.g. revision = 3)
1293
1294 collection = self.get_store('collection')
1295
1296 if collection is not None:
1297 self.db_collection = json.loads(collection)
1298
1299 if self.db_collection is None:
1300 # happens once on upgrade
1301 if not self.enabled:
1302 # user is not opted-in
1303 self.set_store('collection', json.dumps([]))
1304 self.log.debug("user is not opted-in")
1305 else:
1306 # user is opted-in, verify the revision:
1307 if self.last_opt_revision == REVISION:
1308 self.log.debug(f"telemetry revision is {REVISION}")
1309 base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
1310 self.set_store('collection', json.dumps(base_collection))
1311 else:
1312 # user is opted-in to an older version, meaning they need
1313 # to re-opt in regardless
1314 self.set_store('collection', json.dumps([]))
1315 self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1316
1317 # reload collection after setting
1318 collection = self.get_store('collection')
1319 if collection is not None:
1320 self.db_collection = json.loads(collection)
1321 else:
1322 raise RuntimeError('collection is None after initial setting')
1323 else:
1324 # user has already upgraded
1325 self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
1326
1327 def is_enabled_collection(self, collection: Collection) -> bool:
1328 if self.db_collection is None:
1329 return False
1330 return collection.name in self.db_collection
1331
1332 def opt_in_all_collections(self) -> None:
1333 """
1334 Opt-in to all collections; Update db with the currently available collections in the module
1335 """
1336 if self.db_collection is None:
1337 raise RuntimeError('db_collection is None after initial setting')
1338
1339 for c in MODULE_COLLECTION:
1340 if c['name'].name not in self.db_collection:
1341 self.db_collection.append(c['name'])
1342
1343 self.set_store('collection', json.dumps(self.db_collection))
1344
1345 def send(self,
1346 report: Dict[str, Dict[str, str]],
1347 endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
1348 if not endpoint:
1349 endpoint = [self.EndPoint.ceph, self.EndPoint.device]
1350 failed = []
1351 success = []
1352 self.log.debug('Send endpoints %s' % endpoint)
1353 for e in endpoint:
1354 if e == self.EndPoint.ceph:
1355 fail_reason = self._try_post('ceph report', self.url, report)
1356 if fail_reason:
1357 failed.append(fail_reason)
1358 else:
1359 now = int(time.time())
1360 self.last_upload = now
1361 self.set_store('last_upload', str(now))
1362 success.append('Ceph report sent to {0}'.format(self.url))
1363 self.log.info('Sent report to {0}'.format(self.url))
1364 elif e == self.EndPoint.device:
1365 if 'device' in self.get_active_channels():
1366 devices = self.gather_device_report()
1367 if devices:
1368 num_devs = 0
1369 num_hosts = 0
1370 for host, ls in devices.items():
1371 self.log.debug('host %s devices %s' % (host, ls))
1372 if not len(ls):
1373 continue
1374 fail_reason = self._try_post('devices', self.device_url,
1375 ls)
1376 if fail_reason:
1377 failed.append(fail_reason)
1378 else:
1379 num_devs += len(ls)
1380 num_hosts += 1
1381 if num_devs:
1382 success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1383 num_devs, num_hosts, len(devices)))
1384 else:
1385 fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
1386 failed.append(fail_reason)
1387 self.log.error(fail_reason)
1388 if failed:
1389 return 1, '', '\n'.join(success + failed)
1390 return 0, '', '\n'.join(success)
1391
1392 def format_perf_histogram(self, report: Dict[str, Any]) -> None:
1393 # Formatting the perf histograms so they are human-readable. This will change the
1394 # ranges and values, which are currently in list form, into strings so that
1395 # they are displayed horizontally instead of vertically.
1396 try:
1397 # Formatting ranges and values in osd_perf_histograms
1398 mode = 'osd_perf_histograms'
1399 for config in report[mode]:
1400 for histogram in config:
1401 # Adjust ranges by converting lists into strings
1402 for axis in config[histogram]['axes']:
1403 for i in range(0, len(axis['ranges'])):
1404 axis['ranges'][i] = str(axis['ranges'][i])
1405
1406 for osd in config[histogram]['osds']:
1407 for i in range(0, len(osd['values'])):
1408 osd['values'][i] = str(osd['values'][i])
1409 except KeyError:
1410 # If the perf channel is not enabled, there should be a KeyError since
1411 # 'osd_perf_histograms' would not be present in the report. In that case,
1412 # the show function should pass as usual without trying to format the
1413 # histograms.
1414 pass
1415
1416 def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1417 '''
1418 Enable or disable a list of channels
1419 '''
1420 if not self.enabled:
1421 # telemetry should be on for channels to be toggled
1422 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1423 'Preview sample reports with `ceph telemetry preview`.'
1424 return 0, msg, ''
1425
1426 if channels is None:
1427 msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1428 return 0, msg, ''
1429
1430 state = action == 'enable'
1431 msg = ''
1432 for c in channels:
1433 if c not in ALL_CHANNELS:
1434 msg = f"{msg}{c} is not a valid channel name. "\
1435 f"Available channels: {ALL_CHANNELS}.\n"
1436 else:
1437 self.set_module_option(f"channel_{c}", state)
1438 setattr(self,
1439 f"channel_{c}",
1440 state)
1441 msg = f"{msg}channel_{c} is {action}d\n"
1442
1443 return 0, msg, ''
1444
1445 @CLIReadCommand('telemetry status')
1446 def status(self) -> Tuple[int, str, str]:
1447 '''
1448 Show current configuration
1449 '''
1450 r = {}
1451 for opt in self.MODULE_OPTIONS:
1452 r[opt['name']] = getattr(self, opt['name'])
1453 r['last_upload'] = (time.ctime(self.last_upload)
1454 if self.last_upload else self.last_upload)
1455 return 0, json.dumps(r, indent=4, sort_keys=True), ''
1456
1457 @CLIReadCommand('telemetry diff')
1458 def diff(self) -> Tuple[int, str, str]:
1459 '''
1460 Show the diff between opted-in collection and available collection
1461 '''
1462 diff = []
1463 keys = ['nag']
1464
1465 for c in MODULE_COLLECTION:
1466 if not self.is_enabled_collection(c['name']):
1467 diff.append({key: val for key, val in c.items() if key not in keys})
1468
1469 r = None
1470 if diff == []:
1471 r = "Telemetry is up to date"
1472 else:
1473 r = json.dumps(diff, indent=4, sort_keys=True)
1474
1475 return 0, r, ''
1476
1477 @CLICommand('telemetry on')
1478 def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
1479 '''
1480 Enable telemetry reports from this cluster
1481 '''
1482 if license != LICENSE:
1483 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1484 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1485 else:
1486 self.set_module_option('enabled', True)
1487 self.enabled = True
1488 self.opt_in_all_collections()
1489
1490 # for major releases upgrade nagging
1491 mon_map = self.get('mon_map')
1492 mon_min = mon_map.get("min_mon_release", 0)
1493 self.set_store('last_opted_in_ceph_version', str(mon_min))
1494 self.last_opted_in_ceph_version = mon_min
1495
1496 msg = 'Telemetry is on.'
1497 disabled_channels = ''
1498 active_channels = self.get_active_channels()
1499 for c in ALL_CHANNELS:
1500 if c not in active_channels and c != 'ident':
1501 disabled_channels = f"{disabled_channels} {c}"
1502
1503 if len(disabled_channels) > 0:
1504 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
1505 f"`ceph telemetry enable channel{disabled_channels}`"
1506
1507 return 0, msg, ''
1508
1509 @CLICommand('telemetry off')
1510 def off(self) -> Tuple[int, str, str]:
1511 '''
1512 Disable telemetry reports from this cluster
1513 '''
1514 if not self.enabled:
1515 # telemetry is already off
1516 msg = 'Telemetry is currently not enabled, nothing to turn off. '\
1517 'Please consider opting-in with `ceph telemetry on`.\n' \
1518 'Preview sample reports with `ceph telemetry preview`.'
1519 return 0, msg, ''
1520
1521 self.set_module_option('enabled', False)
1522 self.enabled = False
1523 self.set_store('collection', json.dumps([]))
1524 self.db_collection = []
1525
1526 # we might need this info in the future, in case
1527 # of nagging when user is opted-out
1528 mon_map = self.get('mon_map')
1529 mon_min = mon_map.get("min_mon_release", 0)
1530 self.set_store('last_opted_out_ceph_version', str(mon_min))
1531 self.last_opted_out_ceph_version = mon_min
1532
1533 msg = 'Telemetry is now disabled.'
1534 return 0, msg, ''
1535
1536 @CLIReadCommand('telemetry enable channel all')
1537 def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1538 '''
1539 Enable all channels
1540 '''
1541 return self.toggle_channel('enable', channels)
1542
1543 @CLIReadCommand('telemetry enable channel')
1544 def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1545 '''
1546 Enable a list of channels
1547 '''
1548 return self.toggle_channel('enable', channels)
1549
1550 @CLIReadCommand('telemetry disable channel all')
1551 def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1552 '''
1553 Disable all channels
1554 '''
1555 return self.toggle_channel('disable', channels)
1556
1557 @CLIReadCommand('telemetry disable channel')
1558 def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1559 '''
1560 Disable a list of channels
1561 '''
1562 return self.toggle_channel('disable', channels)
1563
1564 @CLIReadCommand('telemetry channel ls')
1565 def channel_ls(self) -> Tuple[int, str, str]:
1566 '''
1567 List all channels
1568 '''
1569 table = PrettyTable(
1570 [
1571 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1572 ],
1573 border=False)
1574 table.align['NAME'] = 'l'
1575 table.align['ENABLED'] = 'l'
1576 table.align['DEFAULT'] = 'l'
1577 table.align['DESC'] = 'l'
1578 table.left_padding_width = 0
1579 table.right_padding_width = 4
1580
1581 for c in ALL_CHANNELS:
1582 enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
1583 for o in self.MODULE_OPTIONS:
1584 if o['name'] == f"channel_{c}":
1585 default = "ON" if o.get('default', None) else "OFF"
1586 desc = o.get('desc', None)
1587
1588 table.add_row((
1589 c,
1590 enabled,
1591 default,
1592 desc,
1593 ))
1594
1595 return 0, table.get_string(sortby="NAME"), ''
1596
1597 @CLIReadCommand('telemetry collection ls')
1598 def collection_ls(self) -> Tuple[int, str, str]:
1599 '''
1600 List all collections
1601 '''
1602 col_delta = self.collection_delta()
1603 msg = ''
1604 if col_delta is not None and len(col_delta) > 0:
1605 msg = f"New collections are available:\n" \
1606 f"{sorted([c.name for c in col_delta])}\n" \
1607 f"Run `ceph telemetry on` to opt-in to these collections.\n"
1608
1609 table = PrettyTable(
1610 [
1611 'NAME', 'STATUS', 'DESC',
1612 ],
1613 border=False)
1614 table.align['NAME'] = 'l'
1615 table.align['STATUS'] = 'l'
1616 table.align['DESC'] = 'l'
1617 table.left_padding_width = 0
1618 table.right_padding_width = 4
1619
1620 for c in MODULE_COLLECTION:
1621 name = c['name']
1622 opted_in = self.is_enabled_collection(name)
1623 channel_enabled = getattr(self, f"channel_{c['channel']}")
1624
1625 status = ''
1626 if channel_enabled and opted_in:
1627 status = "REPORTING"
1628 else:
1629 why = ''
1630 delimiter = ''
1631
1632 if not opted_in:
1633 why += "NOT OPTED-IN"
1634 delimiter = ', '
1635 if not channel_enabled:
1636 why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
1637
1638 status = f"NOT REPORTING: {why}"
1639
1640 desc = c['description']
1641
1642 table.add_row((
1643 name,
1644 status,
1645 desc,
1646 ))
1647
1648 if len(msg):
1649 # add a new line between message and table output
1650 msg = f"{msg} \n"
1651
1652 return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
1653
1654 @CLICommand('telemetry send')
1655 def do_send(self,
1656 endpoint: Optional[List[EndPoint]] = None,
1657 license: Optional[str] = None) -> Tuple[int, str, str]:
1658 '''
1659 Send a sample report
1660 '''
1661 if not self.is_opted_in() and license != LICENSE:
1662 self.log.debug(('A telemetry send attempt while opted-out. '
1663 'Asking for license agreement'))
1664 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1665 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1666 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1667 else:
1668 self.last_report = self.compile_report()
1669 return self.send(self.last_report, endpoint)
1670
1671 @CLIReadCommand('telemetry show')
1672 def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1673 '''
1674 Show a sample report of opted-in collections (except for 'device')
1675 '''
1676 if not self.enabled:
1677 # if telemetry is off, no report is being sent, hence nothing to show
1678 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1679 'Preview sample reports with `ceph telemetry preview`.'
1680 return 0, msg, ''
1681
1682 report = self.get_report_locked(channels=channels)
1683 self.format_perf_histogram(report)
1684 report = json.dumps(report, indent=4, sort_keys=True)
1685
1686 if self.channel_device:
1687 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1688
1689 return 0, report, ''
1690
1691 @CLIReadCommand('telemetry preview')
1692 def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1693 '''
1694 Preview a sample report of the most recent collections available (except for 'device')
1695 '''
1696 report = {}
1697
1698 # We use a lock to prevent a scenario where the user wishes to preview
1699 # the report, and at the same time the module hits the interval of
1700 # sending a report with the opted-in collection, which has less data
1701 # than in the preview report.
1702 col_delta = self.collection_delta()
1703 with self.get_report_lock:
1704 if col_delta is not None and len(col_delta) == 0:
1705 # user is already opted-in to the most recent collection
1706 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1707 return 0, msg, ''
1708 else:
1709 # there are collections the user is not opted-in to
1710 next_collection = []
1711
1712 for c in MODULE_COLLECTION:
1713 next_collection.append(c['name'].name)
1714
1715 opted_in_collection = self.db_collection
1716 self.db_collection = next_collection
1717 report = self.get_report(channels=channels)
1718 self.db_collection = opted_in_collection
1719
1720 self.format_perf_histogram(report)
1721 report = json.dumps(report, indent=4, sort_keys=True)
1722
1723 if self.channel_device:
1724 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1725
1726 return 0, report, ''
1727
1728 @CLIReadCommand('telemetry show-device')
1729 def show_device(self) -> Tuple[int, str, str]:
1730 '''
1731 Show a sample device report
1732 '''
1733 if not self.enabled:
1734 # if telemetry is off, no report is being sent, hence nothing to show
1735 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1736 'Preview sample device reports with `ceph telemetry preview-device`.'
1737 return 0, msg, ''
1738
1739 if not self.channel_device:
1740 # if device channel is off, device report is not being sent, hence nothing to show
1741 msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1742 'Preview sample device reports with `ceph telemetry preview-device`.'
1743 return 0, msg, ''
1744
1745 return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
1746
1747 @CLIReadCommand('telemetry preview-device')
1748 def preview_device(self) -> Tuple[int, str, str]:
1749 '''
1750 Preview a sample device report of the most recent device collection
1751 '''
1752 report = {}
1753
1754 device_col_delta = self.collection_delta(['device'])
1755 with self.get_report_lock:
1756 if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
1757 # user is already opted-in to the most recent device collection,
1758 # and device channel is on, thus `show-device` should be called
1759 msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1760 return 0, msg, ''
1761
1762 # either the user is not opted-in at all, or there are collections
1763 # they are not opted-in to
1764 next_collection = []
1765
1766 for c in MODULE_COLLECTION:
1767 next_collection.append(c['name'].name)
1768
1769 opted_in_collection = self.db_collection
1770 self.db_collection = next_collection
1771 report = self.get_report('device')
1772 self.db_collection = opted_in_collection
1773
1774 report = json.dumps(report, indent=4, sort_keys=True)
1775 return 0, report, ''
1776
1777 @CLIReadCommand('telemetry show-all')
1778 def show_all(self) -> Tuple[int, str, str]:
1779 '''
1780 Show a sample report of all enabled channels (including 'device' channel)
1781 '''
1782 if not self.enabled:
1783 # if telemetry is off, no report is being sent, hence nothing to show
1784 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1785 'Preview sample reports with `ceph telemetry preview`.'
1786 return 0, msg, ''
1787
1788 if not self.channel_device:
1789 # device channel is off, no need to display its report
1790 return 0, json.dumps(self.get_report_locked('default'), indent=4, sort_keys=True), ''
1791
1792 # telemetry is on and device channel is enabled, show both
1793 return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), ''
1794
1795 @CLIReadCommand('telemetry preview-all')
1796 def preview_all(self) -> Tuple[int, str, str]:
1797 '''
1798 Preview a sample report of the most recent collections available of all channels (including 'device')
1799 '''
1800 report = {}
1801
1802 col_delta = self.collection_delta()
1803 with self.get_report_lock:
1804 if col_delta is not None and len(col_delta) == 0:
1805 # user is already opted-in to the most recent collection
1806 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1807 return 0, msg, ''
1808
1809 # there are collections the user is not opted-in to
1810 next_collection = []
1811
1812 for c in MODULE_COLLECTION:
1813 next_collection.append(c['name'].name)
1814
1815 opted_in_collection = self.db_collection
1816 self.db_collection = next_collection
1817 report = self.get_report('all')
1818 self.db_collection = opted_in_collection
1819
1820 self.format_perf_histogram(report)
1821 report = json.dumps(report, indent=4, sort_keys=True)
1822
1823 return 0, report, ''
1824
1825 def get_report_locked(self,
1826 report_type: str = 'default',
1827 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1828 '''
1829 A wrapper around get_report to allow for compiling a report of the most recent module collections
1830 '''
1831 with self.get_report_lock:
1832 return self.get_report(report_type, channels)
1833
1834 def get_report(self,
1835 report_type: str = 'default',
1836 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1837 if report_type == 'default':
1838 return self.compile_report(channels=channels)
1839 elif report_type == 'device':
1840 return self.gather_device_report()
1841 elif report_type == 'all':
1842 return {'report': self.compile_report(channels=channels),
1843 'device_report': self.gather_device_report()}
1844 return {}
1845
1846 def self_test(self) -> None:
1847 report = self.compile_report()
1848 if len(report) == 0:
1849 raise RuntimeError('Report is empty')
1850
1851 if 'report_id' not in report:
1852 raise RuntimeError('report_id not found in report')
1853
1854 def shutdown(self) -> None:
1855 self.run = False
1856 self.event.set()
1857
1858 def refresh_health_checks(self) -> None:
1859 health_checks = {}
1860 # TODO do we want to nag also in case the user is not opted-in?
1861 if self.enabled and self.should_nag():
1862 health_checks['TELEMETRY_CHANGED'] = {
1863 'severity': 'warning',
1864 'summary': 'Telemetry requires re-opt-in',
1865 'detail': [
1866 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
1867 ]
1868 }
1869 self.set_health_checks(health_checks)
1870
1871 def serve(self) -> None:
1872 self.load()
1873 self.run = True
1874
1875 self.log.debug('Waiting for mgr to warm up')
1876 time.sleep(10)
1877
1878 while self.run:
1879 self.event.clear()
1880
1881 self.refresh_health_checks()
1882
1883 if not self.is_opted_in():
1884 self.log.debug('Not sending report until user re-opts-in')
1885 self.event.wait(1800)
1886 continue
1887 if not self.enabled:
1888 self.log.debug('Not sending report until configured to do so')
1889 self.event.wait(1800)
1890 continue
1891
1892 now = int(time.time())
1893 if not self.last_upload or \
1894 (now - self.last_upload) > self.interval * 3600:
1895 self.log.info('Compiling and sending report to %s',
1896 self.url)
1897
1898 try:
1899 self.last_report = self.compile_report()
1900 except Exception:
1901 self.log.exception('Exception while compiling report:')
1902
1903 self.send(self.last_report)
1904 else:
1905 self.log.debug('Interval for sending new report has not expired')
1906
1907 sleep = 3600
1908 self.log.debug('Sleeping for %d seconds', sleep)
1909 self.event.wait(sleep)
1910
1911 @staticmethod
1912 def can_run() -> Tuple[bool, str]:
1913 return True, ''