]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import logging
8 import numbers
9 import enum
10 import errno
11 import hashlib
12 import json
13 import rbd
14 import requests
15 import uuid
16 import time
17 from datetime import datetime, timedelta
18 from prettytable import PrettyTable
19 from threading import Event, Lock
20 from collections import defaultdict
21 from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
22
23 from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
24
25
26 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
27
28 LICENSE = 'sharing-1-0'
29 LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
30 LICENSE_URL = 'https://cdla.io/sharing-1-0/'
31
32 # Latest revision of the telemetry report. Bump this each time we make
33 # *any* change.
34 REVISION = 3
35
36 # History of revisions
37 # --------------------
38 #
39 # Version 1:
40 # Mimic and/or nautilus are lumped together here, since
41 # we didn't track revisions yet.
42 #
43 # Version 2:
44 # - added revision tracking, nagging, etc.
45 # - added config option changes
46 # - added channels
47 # - added explicit license acknowledgement to the opt-in process
48 #
49 # Version 3:
50 # - added device health metrics (i.e., SMART data, minus serial number)
51 # - remove crush_rule
52 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
53 # how much metadata is cached, rfiles, rbytes, rsnapshots)
54 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
55 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
56 # - whether an OSD cluster network is in use
57 # - rbd pool and image count, and rbd mirror mode (pool-level)
58 # - rgw daemons, zones, zonegroups; which rgw frontends
59 # - crush map stats
60
61 class Collection(str, enum.Enum):
62 basic_base = 'basic_base'
63 device_base = 'device_base'
64 crash_base = 'crash_base'
65 ident_base = 'ident_base'
66 perf_perf = 'perf_perf'
67 basic_mds_metadata = 'basic_mds_metadata'
68 basic_pool_usage = 'basic_pool_usage'
69 basic_usage_by_class = 'basic_usage_by_class'
70
71 MODULE_COLLECTION : List[Dict] = [
72 {
73 "name": Collection.basic_base,
74 "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
75 "channel": "basic",
76 "nag": False
77 },
78 {
79 "name": Collection.device_base,
80 "description": "Information about device health metrics",
81 "channel": "device",
82 "nag": False
83 },
84 {
85 "name": Collection.crash_base,
86 "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
87 "channel": "crash",
88 "nag": False
89 },
90 {
91 "name": Collection.ident_base,
92 "description": "User-provided identifying information about the cluster",
93 "channel": "ident",
94 "nag": False
95 },
96 {
97 "name": Collection.perf_perf,
98 "description": "Information about performance counters of the cluster",
99 "channel": "perf",
100 "nag": True
101 },
102 {
103 "name": Collection.basic_mds_metadata,
104 "description": "MDS metadata",
105 "channel": "basic",
106 "nag": False
107 },
108 {
109 "name": Collection.basic_pool_usage,
110 "description": "Default pool application and usage statistics",
111 "channel": "basic",
112 "nag": False
113 },
114 {
115 "name": Collection.basic_usage_by_class,
116 "description": "Default device class usage statistics",
117 "channel": "basic",
118 "nag": False
119 }
120 ]
121
122 class Module(MgrModule):
123 metadata_keys = [
124 "arch",
125 "ceph_version",
126 "os",
127 "cpu",
128 "kernel_description",
129 "kernel_version",
130 "distro_description",
131 "distro"
132 ]
133
134 MODULE_OPTIONS = [
135 Option(name='url',
136 type='str',
137 default='https://telemetry.ceph.com/report'),
138 Option(name='device_url',
139 type='str',
140 default='https://telemetry.ceph.com/device'),
141 Option(name='enabled',
142 type='bool',
143 default=False),
144 Option(name='last_opt_revision',
145 type='int',
146 default=1),
147 Option(name='leaderboard',
148 type='bool',
149 default=False),
150 Option(name='description',
151 type='str',
152 default=None),
153 Option(name='contact',
154 type='str',
155 default=None),
156 Option(name='organization',
157 type='str',
158 default=None),
159 Option(name='proxy',
160 type='str',
161 default=None),
162 Option(name='interval',
163 type='int',
164 default=24,
165 min=8),
166 Option(name='channel_basic',
167 type='bool',
168 default=True,
169 desc='Share basic cluster information (size, version)'),
170 Option(name='channel_ident',
171 type='bool',
172 default=False,
173 desc='Share a user-provided description and/or contact email for the cluster'),
174 Option(name='channel_crash',
175 type='bool',
176 default=True,
177 desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
178 Option(name='channel_device',
179 type='bool',
180 default=True,
181 desc=('Share device health metrics '
182 '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
183 Option(name='channel_perf',
184 type='bool',
185 default=False,
186 desc='Share various performance metrics of a cluster'),
187 ]
188
189 @property
190 def config_keys(self) -> Dict[str, OptionValue]:
191 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
192
193 def __init__(self, *args: Any, **kwargs: Any) -> None:
194 super(Module, self).__init__(*args, **kwargs)
195 self.event = Event()
196 self.run = False
197 self.db_collection: Optional[List[str]] = None
198 self.last_opted_in_ceph_version: Optional[int] = None
199 self.last_opted_out_ceph_version: Optional[int] = None
200 self.last_upload: Optional[int] = None
201 self.last_report: Dict[str, Any] = dict()
202 self.report_id: Optional[str] = None
203 self.salt: Optional[str] = None
204 self.get_report_lock = Lock()
205 self.config_update_module_option()
206 # for mypy which does not run the code
207 if TYPE_CHECKING:
208 self.url = ''
209 self.device_url = ''
210 self.enabled = False
211 self.last_opt_revision = 0
212 self.leaderboard = ''
213 self.interval = 0
214 self.proxy = ''
215 self.channel_basic = True
216 self.channel_ident = False
217 self.channel_crash = True
218 self.channel_device = True
219 self.channel_perf = False
220 self.db_collection = ['basic_base', 'device_base']
221 self.last_opted_in_ceph_version = 17
222 self.last_opted_out_ceph_version = 0
223
224 def config_update_module_option(self) -> None:
225 for opt in self.MODULE_OPTIONS:
226 setattr(self,
227 opt['name'],
228 self.get_module_option(opt['name']))
229 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
230
231 def config_notify(self) -> None:
232 self.config_update_module_option()
233 # wake up serve() thread
234 self.event.set()
235
236 def load(self) -> None:
237 last_upload = self.get_store('last_upload', None)
238 if last_upload is None:
239 self.last_upload = None
240 else:
241 self.last_upload = int(last_upload)
242
243 report_id = self.get_store('report_id', None)
244 if report_id is None:
245 self.report_id = str(uuid.uuid4())
246 self.set_store('report_id', self.report_id)
247 else:
248 self.report_id = report_id
249
250 salt = self.get_store('salt', None)
251 if salt is None:
252 self.salt = str(uuid.uuid4())
253 self.set_store('salt', self.salt)
254 else:
255 self.salt = salt
256
257 self.init_collection()
258
259 last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
260 if last_opted_in_ceph_version is None:
261 self.last_opted_in_ceph_version = None
262 else:
263 self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
264
265 last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
266 if last_opted_out_ceph_version is None:
267 self.last_opted_out_ceph_version = None
268 else:
269 self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
270
271 def gather_osd_metadata(self,
272 osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
273 keys = ["osd_objectstore", "rotational"]
274 keys += self.metadata_keys
275
276 metadata: Dict[str, Dict[str, int]] = dict()
277 for key in keys:
278 metadata[key] = defaultdict(int)
279
280 for osd in osd_map['osds']:
281 res = self.get_metadata('osd', str(osd['osd']))
282 if res is None:
283 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
284 continue
285 for k, v in res.items():
286 if k not in keys:
287 continue
288
289 metadata[k][v] += 1
290
291 return metadata
292
293 def gather_mon_metadata(self,
294 mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
295 keys = list()
296 keys += self.metadata_keys
297
298 metadata: Dict[str, Dict[str, int]] = dict()
299 for key in keys:
300 metadata[key] = defaultdict(int)
301
302 for mon in mon_map['mons']:
303 res = self.get_metadata('mon', mon['name'])
304 if res is None:
305 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
306 continue
307 for k, v in res.items():
308 if k not in keys:
309 continue
310
311 metadata[k][v] += 1
312
313 return metadata
314
315 def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
316 metadata: Dict[str, Dict[str, int]] = dict()
317
318 res = self.get('mds_metadata') # metadata of *all* mds daemons
319 if res is None or not res:
320 self.log.debug('Could not get metadata for mds daemons')
321 return metadata
322
323 keys = list()
324 keys += self.metadata_keys
325
326 for key in keys:
327 metadata[key] = defaultdict(int)
328
329 for mds in res.values():
330 for k, v in mds.items():
331 if k not in keys:
332 continue
333
334 metadata[k][v] += 1
335
336 return metadata
337
338 def gather_crush_info(self) -> Dict[str, Union[int,
339 bool,
340 List[int],
341 Dict[str, int],
342 Dict[int, int]]]:
343 osdmap = self.get_osdmap()
344 crush_raw = osdmap.get_crush()
345 crush = crush_raw.dump()
346
347 BucketKeyT = TypeVar('BucketKeyT', int, str)
348
349 def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
350 if k in d:
351 d[k] += 1
352 else:
353 d[k] = 1
354
355 device_classes: Dict[str, int] = {}
356 for dev in crush['devices']:
357 inc(device_classes, dev.get('class', ''))
358
359 bucket_algs: Dict[str, int] = {}
360 bucket_types: Dict[str, int] = {}
361 bucket_sizes: Dict[int, int] = {}
362 for bucket in crush['buckets']:
363 if '~' in bucket['name']: # ignore shadow buckets
364 continue
365 inc(bucket_algs, bucket['alg'])
366 inc(bucket_types, bucket['type_id'])
367 inc(bucket_sizes, len(bucket['items']))
368
369 return {
370 'num_devices': len(crush['devices']),
371 'num_types': len(crush['types']),
372 'num_buckets': len(crush['buckets']),
373 'num_rules': len(crush['rules']),
374 'device_classes': list(device_classes.values()),
375 'tunables': crush['tunables'],
376 'compat_weight_set': '-1' in crush['choose_args'],
377 'num_weight_sets': len(crush['choose_args']),
378 'bucket_algs': bucket_algs,
379 'bucket_sizes': bucket_sizes,
380 'bucket_types': bucket_types,
381 }
382
383 def gather_configs(self) -> Dict[str, List[str]]:
384 # cluster config options
385 cluster = set()
386 r, outb, outs = self.mon_command({
387 'prefix': 'config dump',
388 'format': 'json'
389 })
390 if r != 0:
391 return {}
392 try:
393 dump = json.loads(outb)
394 except json.decoder.JSONDecodeError:
395 return {}
396 for opt in dump:
397 name = opt.get('name')
398 if name:
399 cluster.add(name)
400 # daemon-reported options (which may include ceph.conf)
401 active = set()
402 ls = self.get("modified_config_options")
403 for opt in ls.get('options', {}):
404 active.add(opt)
405 return {
406 'cluster_changed': sorted(list(cluster)),
407 'active_changed': sorted(list(active)),
408 }
409
410 def get_heap_stats(self) -> Dict[str, dict]:
411 # Initialize result dict
412 result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
413
414 # Get list of osd ids from the metadata
415 osd_metadata = self.get('osd_metadata')
416
417 # Grab output from the "osd.x heap stats" command
418 for osd_id in osd_metadata:
419 cmd_dict = {
420 'prefix': 'heap',
421 'heapcmd': 'stats',
422 'id': str(osd_id),
423 }
424 r, outb, outs = self.osd_command(cmd_dict)
425 if r != 0:
426 self.log.debug("Invalid command dictionary.")
427 continue
428 else:
429 if 'tcmalloc heap stats' in outs:
430 values = [int(i) for i in outs.split() if i.isdigit()]
431 # `categories` must be ordered this way for the correct output to be parsed
432 categories = ['use_by_application',
433 'page_heap_freelist',
434 'central_cache_freelist',
435 'transfer_cache_freelist',
436 'thread_cache_freelists',
437 'malloc_metadata',
438 'actual_memory_used',
439 'released_to_os',
440 'virtual_address_space_used',
441 'spans_in_use',
442 'thread_heaps_in_use',
443 'tcmalloc_page_size']
444 if len(values) != len(categories):
445 self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
446 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
447 continue
448 osd = 'osd.' + str(osd_id)
449 result[osd] = dict(zip(categories, values))
450 else:
451 self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
452 continue
453
454 return result
455
456 def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
457 # Initialize result dict
458 result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
459
460 # Get list of osd ids from the metadata
461 osd_metadata = self.get('osd_metadata')
462
463 # Grab output from the "osd.x dump_mempools" command
464 for osd_id in osd_metadata:
465 cmd_dict = {
466 'prefix': 'dump_mempools',
467 'id': str(osd_id),
468 'format': 'json'
469 }
470 r, outb, outs = self.osd_command(cmd_dict)
471 if r != 0:
472 self.log.debug("Invalid command dictionary.")
473 continue
474 else:
475 try:
476 # This is where the mempool will land.
477 dump = json.loads(outb)
478 if mode == 'separated':
479 result["osd." + str(osd_id)] = dump['mempool']['by_pool']
480 elif mode == 'aggregated':
481 for mem_type in dump['mempool']['by_pool']:
482 result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
483 result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
484 else:
485 self.log.debug("Incorrect mode specified in get_mempool")
486 except (json.decoder.JSONDecodeError, KeyError) as e:
487 self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
488 continue
489
490 return result
491
492 def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
493 # Initialize result dict
494 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
495 lambda: defaultdict(
496 lambda: defaultdict(
497 lambda: defaultdict(
498 lambda: defaultdict(int))))))
499
500 # Get list of osd ids from the metadata
501 osd_metadata = self.get('osd_metadata')
502
503 # Grab output from the "osd.x perf histogram dump" command
504 for osd_id in osd_metadata:
505 cmd_dict = {
506 'prefix': 'perf histogram dump',
507 'id': str(osd_id),
508 'format': 'json'
509 }
510 r, outb, outs = self.osd_command(cmd_dict)
511 # Check for invalid calls
512 if r != 0:
513 self.log.debug("Invalid command dictionary.")
514 continue
515 else:
516 try:
517 # This is where the histograms will land if there are any.
518 dump = json.loads(outb)
519
520 for histogram in dump['osd']:
521 # Log axis information. There are two axes, each represented
522 # as a dictionary. Both dictionaries are contained inside a
523 # list called 'axes'.
524 axes = []
525 for axis in dump['osd'][histogram]['axes']:
526
527 # This is the dict that contains information for an individual
528 # axis. It will be appended to the 'axes' list at the end.
529 axis_dict: Dict[str, Any] = defaultdict()
530
531 # Collecting information for buckets, min, name, etc.
532 axis_dict['buckets'] = axis['buckets']
533 axis_dict['min'] = axis['min']
534 axis_dict['name'] = axis['name']
535 axis_dict['quant_size'] = axis['quant_size']
536 axis_dict['scale_type'] = axis['scale_type']
537
538 # Collecting ranges; placing them in lists to
539 # improve readability later on.
540 ranges = []
541 for _range in axis['ranges']:
542 _max, _min = None, None
543 if 'max' in _range:
544 _max = _range['max']
545 if 'min' in _range:
546 _min = _range['min']
547 ranges.append([_min, _max])
548 axis_dict['ranges'] = ranges
549
550 # Now that 'axis_dict' contains all the appropriate
551 # information for the current axis, append it to the 'axes' list.
552 # There will end up being two axes in the 'axes' list, since the
553 # histograms are 2D.
554 axes.append(axis_dict)
555
556 # Add the 'axes' list, containing both axes, to result.
557 # At this point, you will see that the name of the key is the string
558 # form of our axes list (str(axes)). This is there so that histograms
559 # with different axis configs will not be combined.
560 # These key names are later dropped when only the values are returned.
561 result[str(axes)][histogram]['axes'] = axes
562
563 # Collect current values and make sure they are in
564 # integer form.
565 values = []
566 for value_list in dump['osd'][histogram]['values']:
567 values.append([int(v) for v in value_list])
568
569 if mode == 'separated':
570 if 'osds' not in result[str(axes)][histogram]:
571 result[str(axes)][histogram]['osds'] = []
572 result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
573
574 elif mode == 'aggregated':
575 # Aggregate values. If 'values' have already been initialized,
576 # we can safely add.
577 if 'values' in result[str(axes)][histogram]:
578 for i in range (0, len(values)):
579 for j in range (0, len(values[i])):
580 values[i][j] += result[str(axes)][histogram]['values'][i][j]
581
582 # Add the values to result.
583 result[str(axes)][histogram]['values'] = values
584
585 # Update num_combined_osds
586 if 'num_combined_osds' not in result[str(axes)][histogram]:
587 result[str(axes)][histogram]['num_combined_osds'] = 1
588 else:
589 result[str(axes)][histogram]['num_combined_osds'] += 1
590 else:
591 self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
592 return list()
593
594 # Sometimes, json errors occur if you give it an empty string.
595 # I am also putting in a catch for a KeyError since it could
596 # happen where the code is assuming that a key exists in the
597 # schema when it doesn't. In either case, we'll handle that
598 # by continuing and collecting what we can from other osds.
599 except (json.decoder.JSONDecodeError, KeyError) as e:
600 self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
601 continue
602
603 return list(result.values())
604
605 def get_io_rate(self) -> dict:
606 return self.get('io_rate')
607
608 def get_stats_per_pool(self) -> dict:
609 result = self.get('pg_dump')['pool_stats']
610
611 # collect application metadata from osd_map
612 osd_map = self.get('osd_map')
613 application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
614
615 # add application to each pool from pg_dump
616 for pool in result:
617 pool['application'] = []
618 # Only include default applications
619 for application in application_metadata[pool['poolid']]:
620 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
621 pool['application'].append(application)
622
623 return result
624
625 def get_stats_per_pg(self) -> dict:
626 return self.get('pg_dump')['pg_stats']
627
628 def get_rocksdb_stats(self) -> Dict[str, str]:
629 # Initalizers
630 result: Dict[str, str] = defaultdict()
631 version = self.get_rocksdb_version()
632
633 # Update result
634 result['version'] = version
635
636 return result
637
638 def gather_crashinfo(self) -> List[Dict[str, str]]:
639 crashlist: List[Dict[str, str]] = list()
640 errno, crashids, err = self.remote('crash', 'ls')
641 if errno:
642 return crashlist
643 for crashid in crashids.split():
644 errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
645 if errno:
646 continue
647 c = json.loads(crashinfo)
648
649 # redact hostname
650 del c['utsname_hostname']
651
652 # entity_name might have more than one '.', beware
653 (etype, eid) = c.get('entity_name', '').split('.', 1)
654 m = hashlib.sha1()
655 assert self.salt
656 m.update(self.salt.encode('utf-8'))
657 m.update(eid.encode('utf-8'))
658 m.update(self.salt.encode('utf-8'))
659 c['entity_name'] = etype + '.' + m.hexdigest()
660
661 # redact final line of python tracebacks, as the exception
662 # payload may contain identifying information
663 if 'mgr_module' in c and 'backtrace' in c:
664 # backtrace might be empty
665 if len(c['backtrace']) > 0:
666 c['backtrace'][-1] = '<redacted>'
667
668 crashlist.append(c)
669 return crashlist
670
671 def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
672 # Extract perf counter data with get_all_perf_counters(), a method
673 # from mgr/mgr_module.py. This method returns a nested dictionary that
674 # looks a lot like perf schema, except with some additional fields.
675 #
676 # Example of output, a snapshot of a mon daemon:
677 # "mon.b": {
678 # "bluestore.kv_flush_lat": {
679 # "count": 2431,
680 # "description": "Average kv_thread flush latency",
681 # "nick": "fl_l",
682 # "priority": 8,
683 # "type": 5,
684 # "units": 1,
685 # "value": 88814109
686 # },
687 # },
688 all_perf_counters = self.get_all_perf_counters()
689
690 # Initialize 'result' dict
691 result: Dict[str, dict] = defaultdict(lambda: defaultdict(
692 lambda: defaultdict(lambda: defaultdict(int))))
693
694 for daemon in all_perf_counters:
695
696 # Calculate num combined daemon types if in aggregated mode
697 if mode == 'aggregated':
698 daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
699 if 'num_combined_daemons' not in result[daemon_type]:
700 result[daemon_type]['num_combined_daemons'] = 1
701 else:
702 result[daemon_type]['num_combined_daemons'] += 1
703
704 for collection in all_perf_counters[daemon]:
705 # Split the collection to avoid redundancy in final report; i.e.:
706 # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
707 # bluestore: kv_flush_lat, kv_final_lat
708 col_0, col_1 = collection.split('.')
709
710 # Debug log for empty keys. This initially was a problem for prioritycache
711 # perf counters, where the col_0 was empty for certain mon counters:
712 #
713 # "mon.a": { instead of "mon.a": {
714 # "": { "prioritycache": {
715 # "cache_bytes": {...}, "cache_bytes": {...},
716 #
717 # This log is here to detect any future instances of a similar issue.
718 if (daemon == "") or (col_0 == "") or (col_1 == ""):
719 self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
720
721 if mode == 'separated':
722 # Add value to result
723 result[daemon][col_0][col_1]['value'] = \
724 all_perf_counters[daemon][collection]['value']
725
726 # Check that 'count' exists, as not all counters have a count field.
727 if 'count' in all_perf_counters[daemon][collection]:
728 result[daemon][col_0][col_1]['count'] = \
729 all_perf_counters[daemon][collection]['count']
730 elif mode == 'aggregated':
731 # Not every rgw daemon has the same schema. Specifically, each rgw daemon
732 # has a uniquely-named collection that starts off identically (i.e.
733 # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
734 # This bit of code combines these unique counters all under one rgw instance.
735 # Without this check, the schema would remain separeted out in the final report.
736 if col_0[0:11] == "objecter-0x":
737 col_0 = "objecter-0x"
738
739 # Check that the value can be incremented. In some cases,
740 # the files are of type 'pair' (real-integer-pair, integer-integer pair).
741 # In those cases, the value is a dictionary, and not a number.
742 # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
743 if isinstance(all_perf_counters[daemon][collection]['value'], numbers.Number):
744 result[daemon_type][col_0][col_1]['value'] += \
745 all_perf_counters[daemon][collection]['value']
746
747 # Check that 'count' exists, as not all counters have a count field.
748 if 'count' in all_perf_counters[daemon][collection]:
749 result[daemon_type][col_0][col_1]['count'] += \
750 all_perf_counters[daemon][collection]['count']
751 else:
752 self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
753 return {}
754
755 return result
756
757 def get_active_channels(self) -> List[str]:
758 r = []
759 if self.channel_basic:
760 r.append('basic')
761 if self.channel_crash:
762 r.append('crash')
763 if self.channel_device:
764 r.append('device')
765 if self.channel_ident:
766 r.append('ident')
767 if self.channel_perf:
768 r.append('perf')
769 return r
770
771 def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
772 try:
773 time_format = self.remote('devicehealth', 'get_time_format')
774 except Exception as e:
775 self.log.debug('Unable to format time: {}'.format(e))
776 return {}
777 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
778 min_sample = cutoff.strftime(time_format)
779
780 devices = self.get('devices')['devices']
781 if not devices:
782 self.log.debug('Unable to get device info from the mgr.')
783 return {}
784
785 # anon-host-id -> anon-devid -> { timestamp -> record }
786 res: Dict[str, Dict[str, Dict[str, str]]] = {}
787 for d in devices:
788 devid = d['devid']
789 try:
790 # this is a map of stamp -> {device info}
791 m = self.remote('devicehealth', 'get_recent_device_metrics',
792 devid, min_sample)
793 except Exception as e:
794 self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
795 continue
796
797 # anonymize host id
798 try:
799 host = d['location'][0]['host']
800 except (KeyError, IndexError) as e:
801 self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e))
802 continue
803 anon_host = self.get_store('host-id/%s' % host)
804 if not anon_host:
805 anon_host = str(uuid.uuid1())
806 self.set_store('host-id/%s' % host, anon_host)
807 serial = None
808 for dev, rep in m.items():
809 rep['host_id'] = anon_host
810 if serial is None and 'serial_number' in rep:
811 serial = rep['serial_number']
812
813 # anonymize device id
814 anon_devid = self.get_store('devid-id/%s' % devid)
815 if not anon_devid:
816 # ideally devid is 'vendor_model_serial',
817 # but can also be 'model_serial', 'serial'
818 if '_' in devid:
819 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
820 else:
821 anon_devid = str(uuid.uuid1())
822 self.set_store('devid-id/%s' % devid, anon_devid)
823 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
824 host, anon_host))
825
826 # anonymize the smartctl report itself
827 if serial:
828 m_str = json.dumps(m)
829 m = json.loads(m_str.replace(serial, 'deleted'))
830
831 if anon_host not in res:
832 res[anon_host] = {}
833 res[anon_host][anon_devid] = m
834 return res
835
836 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
837 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
838 if data:
839 return data[-1][1]
840 else:
841 return 0
842
843 def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
844 if not channels:
845 channels = self.get_active_channels()
846 report = {
847 'leaderboard': self.leaderboard,
848 'report_version': 1,
849 'report_timestamp': datetime.utcnow().isoformat(),
850 'report_id': self.report_id,
851 'channels': channels,
852 'channels_available': ALL_CHANNELS,
853 'license': LICENSE,
854 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
855 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
856 }
857
858 if 'ident' in channels:
859 for option in ['description', 'contact', 'organization']:
860 report[option] = getattr(self, option)
861
862 if 'basic' in channels:
863 mon_map = self.get('mon_map')
864 osd_map = self.get('osd_map')
865 service_map = self.get('service_map')
866 fs_map = self.get('fs_map')
867 df = self.get('df')
868 df_pools = {pool['id']: pool for pool in df['pools']}
869
870 report['created'] = mon_map['created']
871
872 # mons
873 v1_mons = 0
874 v2_mons = 0
875 ipv4_mons = 0
876 ipv6_mons = 0
877 for mon in mon_map['mons']:
878 for a in mon['public_addrs']['addrvec']:
879 if a['type'] == 'v2':
880 v2_mons += 1
881 elif a['type'] == 'v1':
882 v1_mons += 1
883 if a['addr'].startswith('['):
884 ipv6_mons += 1
885 else:
886 ipv4_mons += 1
887 report['mon'] = {
888 'count': len(mon_map['mons']),
889 'features': mon_map['features'],
890 'min_mon_release': mon_map['min_mon_release'],
891 'v1_addr_mons': v1_mons,
892 'v2_addr_mons': v2_mons,
893 'ipv4_addr_mons': ipv4_mons,
894 'ipv6_addr_mons': ipv6_mons,
895 }
896
897 report['config'] = self.gather_configs()
898
899 # pools
900
901 rbd_num_pools = 0
902 rbd_num_images_by_pool = []
903 rbd_mirroring_by_pool = []
904 num_pg = 0
905 report['pools'] = list()
906 for pool in osd_map['pools']:
907 num_pg += pool['pg_num']
908 ec_profile = {}
909 if pool['erasure_code_profile']:
910 orig = osd_map['erasure_code_profiles'].get(
911 pool['erasure_code_profile'], {})
912 ec_profile = {
913 k: orig[k] for k in orig.keys()
914 if k in ['k', 'm', 'plugin', 'technique',
915 'crush-failure-domain', 'l']
916 }
917 pool_data = {
918 'pool': pool['pool'],
919 'pg_num': pool['pg_num'],
920 'pgp_num': pool['pg_placement_num'],
921 'size': pool['size'],
922 'min_size': pool['min_size'],
923 'pg_autoscale_mode': pool['pg_autoscale_mode'],
924 'target_max_bytes': pool['target_max_bytes'],
925 'target_max_objects': pool['target_max_objects'],
926 'type': ['', 'replicated', '', 'erasure'][pool['type']],
927 'erasure_code_profile': ec_profile,
928 'cache_mode': pool['cache_mode'],
929 }
930
931 # basic_pool_usage collection
932 if self.is_enabled_collection(Collection.basic_pool_usage):
933 pool_data['application'] = []
934 for application in pool['application_metadata']:
935 # Only include default applications
936 if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
937 pool_data['application'].append(application)
938 pool_stats = df_pools[pool['pool']]['stats']
939 pool_data['stats'] = { # filter out kb_used
940 'avail_raw': pool_stats['avail_raw'],
941 'bytes_used': pool_stats['bytes_used'],
942 'compress_bytes_used': pool_stats['compress_bytes_used'],
943 'compress_under_bytes': pool_stats['compress_under_bytes'],
944 'data_bytes_used': pool_stats['data_bytes_used'],
945 'dirty': pool_stats['dirty'],
946 'max_avail': pool_stats['max_avail'],
947 'objects': pool_stats['objects'],
948 'omap_bytes_used': pool_stats['omap_bytes_used'],
949 'percent_used': pool_stats['percent_used'],
950 'quota_bytes': pool_stats['quota_bytes'],
951 'quota_objects': pool_stats['quota_objects'],
952 'rd': pool_stats['rd'],
953 'rd_bytes': pool_stats['rd_bytes'],
954 'stored': pool_stats['stored'],
955 'stored_data': pool_stats['stored_data'],
956 'stored_omap': pool_stats['stored_omap'],
957 'stored_raw': pool_stats['stored_raw'],
958 'wr': pool_stats['wr'],
959 'wr_bytes': pool_stats['wr_bytes']
960 }
961
962 cast(List[Dict[str, Any]], report['pools']).append(pool_data)
963 if 'rbd' in pool['application_metadata']:
964 rbd_num_pools += 1
965 ioctx = self.rados.open_ioctx(pool['pool_name'])
966 rbd_num_images_by_pool.append(
967 sum(1 for _ in rbd.RBD().list2(ioctx)))
968 rbd_mirroring_by_pool.append(
969 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
970 report['rbd'] = {
971 'num_pools': rbd_num_pools,
972 'num_images_by_pool': rbd_num_images_by_pool,
973 'mirroring_by_pool': rbd_mirroring_by_pool}
974
975 # osds
976 cluster_network = False
977 for osd in osd_map['osds']:
978 if osd['up'] and not cluster_network:
979 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
980 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
981 if front_ip != back_ip:
982 cluster_network = True
983 report['osd'] = {
984 'count': len(osd_map['osds']),
985 'require_osd_release': osd_map['require_osd_release'],
986 'require_min_compat_client': osd_map['require_min_compat_client'],
987 'cluster_network': cluster_network,
988 }
989
990 # crush
991 report['crush'] = self.gather_crush_info()
992
993 # cephfs
994 report['fs'] = {
995 'count': len(fs_map['filesystems']),
996 'feature_flags': fs_map['feature_flags'],
997 'num_standby_mds': len(fs_map['standbys']),
998 'filesystems': [],
999 }
1000 num_mds = len(fs_map['standbys'])
1001 for fsm in fs_map['filesystems']:
1002 fs = fsm['mdsmap']
1003 num_sessions = 0
1004 cached_ino = 0
1005 cached_dn = 0
1006 cached_cap = 0
1007 subtrees = 0
1008 rfiles = 0
1009 rbytes = 0
1010 rsnaps = 0
1011 for gid, mds in fs['info'].items():
1012 num_sessions += self.get_latest('mds', mds['name'],
1013 'mds_sessions.session_count')
1014 cached_ino += self.get_latest('mds', mds['name'],
1015 'mds_mem.ino')
1016 cached_dn += self.get_latest('mds', mds['name'],
1017 'mds_mem.dn')
1018 cached_cap += self.get_latest('mds', mds['name'],
1019 'mds_mem.cap')
1020 subtrees += self.get_latest('mds', mds['name'],
1021 'mds.subtrees')
1022 if mds['rank'] == 0:
1023 rfiles = self.get_latest('mds', mds['name'],
1024 'mds.root_rfiles')
1025 rbytes = self.get_latest('mds', mds['name'],
1026 'mds.root_rbytes')
1027 rsnaps = self.get_latest('mds', mds['name'],
1028 'mds.root_rsnaps')
1029 report['fs']['filesystems'].append({ # type: ignore
1030 'max_mds': fs['max_mds'],
1031 'ever_allowed_features': fs['ever_allowed_features'],
1032 'explicitly_allowed_features': fs['explicitly_allowed_features'],
1033 'num_in': len(fs['in']),
1034 'num_up': len(fs['up']),
1035 'num_standby_replay': len(
1036 [mds for gid, mds in fs['info'].items()
1037 if mds['state'] == 'up:standby-replay']),
1038 'num_mds': len(fs['info']),
1039 'num_sessions': num_sessions,
1040 'cached_inos': cached_ino,
1041 'cached_dns': cached_dn,
1042 'cached_caps': cached_cap,
1043 'cached_subtrees': subtrees,
1044 'balancer_enabled': len(fs['balancer']) > 0,
1045 'num_data_pools': len(fs['data_pools']),
1046 'standby_count_wanted': fs['standby_count_wanted'],
1047 'approx_ctime': fs['created'][0:7],
1048 'files': rfiles,
1049 'bytes': rbytes,
1050 'snaps': rsnaps,
1051 })
1052 num_mds += len(fs['info'])
1053 report['fs']['total_num_mds'] = num_mds # type: ignore
1054
1055 # daemons
1056 report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
1057 mon=self.gather_mon_metadata(mon_map))
1058
1059 if self.is_enabled_collection(Collection.basic_mds_metadata):
1060 report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
1061
1062 # host counts
1063 servers = self.list_servers()
1064 self.log.debug('servers %s' % servers)
1065 hosts = {
1066 'num': len([h for h in servers if h['hostname']]),
1067 }
1068 for t in ['mon', 'mds', 'osd', 'mgr']:
1069 nr_services = sum(1 for host in servers if
1070 any(service for service in cast(List[ServiceInfoT],
1071 host['services'])
1072 if service['type'] == t))
1073 hosts['num_with_' + t] = nr_services
1074 report['hosts'] = hosts
1075
1076 report['usage'] = {
1077 'pools': len(df['pools']),
1078 'pg_num': num_pg,
1079 'total_used_bytes': df['stats']['total_used_bytes'],
1080 'total_bytes': df['stats']['total_bytes'],
1081 'total_avail_bytes': df['stats']['total_avail_bytes']
1082 }
1083 # basic_usage_by_class collection
1084 if self.is_enabled_collection(Collection.basic_usage_by_class):
1085 report['usage']['stats_by_class'] = {} # type: ignore
1086 for device_class in df['stats_by_class']:
1087 if device_class in ['hdd', 'ssd', 'nvme']:
1088 report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
1089
1090 services: DefaultDict[str, int] = defaultdict(int)
1091 for key, value in service_map['services'].items():
1092 services[key] += 1
1093 if key == 'rgw':
1094 rgw = {}
1095 zones = set()
1096 zonegroups = set()
1097 frontends = set()
1098 count = 0
1099 d = value.get('daemons', dict())
1100 for k, v in d.items():
1101 if k == 'summary' and v:
1102 rgw[k] = v
1103 elif isinstance(v, dict) and 'metadata' in v:
1104 count += 1
1105 zones.add(v['metadata']['zone_id'])
1106 zonegroups.add(v['metadata']['zonegroup_id'])
1107 frontends.add(v['metadata']['frontend_type#0'])
1108
1109 # we could actually iterate over all the keys of
1110 # the dict and check for how many frontends there
1111 # are, but it is unlikely that one would be running
1112 # more than 2 supported ones
1113 f2 = v['metadata'].get('frontend_type#1', None)
1114 if f2:
1115 frontends.add(f2)
1116
1117 rgw['count'] = count
1118 rgw['zones'] = len(zones)
1119 rgw['zonegroups'] = len(zonegroups)
1120 rgw['frontends'] = list(frontends) # sets aren't json-serializable
1121 report['rgw'] = rgw
1122 report['services'] = services
1123
1124 try:
1125 report['balancer'] = self.remote('balancer', 'gather_telemetry')
1126 except ImportError:
1127 report['balancer'] = {
1128 'active': False
1129 }
1130
1131 if 'crash' in channels:
1132 report['crashes'] = self.gather_crashinfo()
1133
1134 if 'perf' in channels:
1135 if self.is_enabled_collection(Collection.perf_perf):
1136 report['perf_counters'] = self.gather_perf_counters('separated')
1137 report['stats_per_pool'] = self.get_stats_per_pool()
1138 report['stats_per_pg'] = self.get_stats_per_pg()
1139 report['io_rate'] = self.get_io_rate()
1140 report['osd_perf_histograms'] = self.get_osd_histograms('separated')
1141 report['mempool'] = self.get_mempool('separated')
1142 report['heap_stats'] = self.get_heap_stats()
1143 report['rocksdb_stats'] = self.get_rocksdb_stats()
1144
1145 # NOTE: We do not include the 'device' channel in this report; it is
1146 # sent to a different endpoint.
1147
1148 return report
1149
1150 def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
1151 self.log.info('Sending %s to: %s' % (what, url))
1152 proxies = dict()
1153 if self.proxy:
1154 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
1155 proxies['http'] = self.proxy
1156 proxies['https'] = self.proxy
1157 try:
1158 resp = requests.put(url=url, json=report, proxies=proxies)
1159 resp.raise_for_status()
1160 except Exception as e:
1161 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
1162 self.log.error(fail_reason)
1163 return fail_reason
1164 return None
1165
1166 class EndPoint(enum.Enum):
1167 ceph = 'ceph'
1168 device = 'device'
1169
1170 def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
1171 '''
1172 Find collections that are available in the module, but are not in the db
1173 '''
1174 if self.db_collection is None:
1175 return None
1176
1177 if not channels:
1178 channels = ALL_CHANNELS
1179 else:
1180 for ch in channels:
1181 if ch not in ALL_CHANNELS:
1182 self.log.debug(f"invalid channel name: {ch}")
1183 return None
1184
1185 new_collection : List[Collection] = []
1186
1187 for c in MODULE_COLLECTION:
1188 if c['name'].name not in self.db_collection:
1189 if c['channel'] in channels:
1190 new_collection.append(c['name'])
1191
1192 return new_collection
1193
1194 def is_major_upgrade(self) -> bool:
1195 '''
1196 Returns True only if the user last opted-in to an older major
1197 '''
1198 if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
1199 # we do not know what Ceph version was when the user last opted-in,
1200 # thus we do not wish to nag in case of a major upgrade
1201 return False
1202
1203 mon_map = self.get('mon_map')
1204 mon_min = mon_map.get("min_mon_release", 0)
1205
1206 if mon_min - self.last_opted_in_ceph_version > 0:
1207 self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
1208 return True
1209
1210 return False
1211
1212 def is_opted_in(self) -> bool:
1213 # If len is 0 it means that the user is either opted-out (never
1214 # opted-in, or invoked `telemetry off`), or they upgraded from a
1215 # telemetry revision 1 or 2, which required to re-opt in to revision 3,
1216 # regardless, hence is considered as opted-out
1217 if self.db_collection is None:
1218 return False
1219 return len(self.db_collection) > 0
1220
1221 def should_nag(self) -> bool:
1222 # Find delta between opted-in collections and module collections;
1223 # nag only if module has a collection which is not in db, and nag == True.
1224
1225 # We currently do not nag if the user is opted-out (or never opted-in).
1226 # If we wish to do this in the future, we need to have a tri-mode state
1227 # (opted in, opted out, no action yet), and it needs to be guarded by a
1228 # config option (so that nagging can be turned off via config).
1229 # We also need to add a last_opted_out_ceph_version variable, for the
1230 # major upgrade check.
1231
1232 # check if there are collections the user is not opt-in to
1233 # that we should nag about
1234 if self.db_collection is not None:
1235 for c in MODULE_COLLECTION:
1236 if c['name'].name not in self.db_collection:
1237 if c['nag'] == True:
1238 self.log.debug(f"The collection: {c['name']} is not reported")
1239 return True
1240
1241 # user might be opted-in to the most recent collection, or there is no
1242 # new collection which requires nagging about; thus nag in case it's a
1243 # major upgrade and there are new collections
1244 # (which their own nag == False):
1245 new_collections = False
1246 col_delta = self.collection_delta()
1247 if col_delta is not None and len(col_delta) > 0:
1248 new_collections = True
1249
1250 return self.is_major_upgrade() and new_collections
1251
1252 def init_collection(self) -> None:
1253 # We fetch from db the collections the user had already opted-in to.
1254 # During the transition the results will be empty, but the user might
1255 # be opted-in to an older version (e.g. revision = 3)
1256
1257 collection = self.get_store('collection')
1258
1259 if collection is not None:
1260 self.db_collection = json.loads(collection)
1261
1262 if self.db_collection is None:
1263 # happens once on upgrade
1264 if not self.enabled:
1265 # user is not opted-in
1266 self.set_store('collection', json.dumps([]))
1267 self.log.debug("user is not opted-in")
1268 else:
1269 # user is opted-in, verify the revision:
1270 if self.last_opt_revision == REVISION:
1271 self.log.debug(f"telemetry revision is {REVISION}")
1272 base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
1273 self.set_store('collection', json.dumps(base_collection))
1274 else:
1275 # user is opted-in to an older version, meaning they need
1276 # to re-opt in regardless
1277 self.set_store('collection', json.dumps([]))
1278 self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
1279
1280 # reload collection after setting
1281 collection = self.get_store('collection')
1282 if collection is not None:
1283 self.db_collection = json.loads(collection)
1284 else:
1285 raise RuntimeError('collection is None after initial setting')
1286 else:
1287 # user has already upgraded
1288 self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
1289
1290 def is_enabled_collection(self, collection: Collection) -> bool:
1291 if self.db_collection is None:
1292 return False
1293 return collection.name in self.db_collection
1294
1295 def opt_in_all_collections(self) -> None:
1296 """
1297 Opt-in to all collections; Update db with the currently available collections in the module
1298 """
1299 if self.db_collection is None:
1300 raise RuntimeError('db_collection is None after initial setting')
1301
1302 for c in MODULE_COLLECTION:
1303 if c['name'].name not in self.db_collection:
1304 self.db_collection.append(c['name'])
1305
1306 self.set_store('collection', json.dumps(self.db_collection))
1307
1308 def send(self,
1309 report: Dict[str, Dict[str, str]],
1310 endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
1311 if not endpoint:
1312 endpoint = [self.EndPoint.ceph, self.EndPoint.device]
1313 failed = []
1314 success = []
1315 self.log.debug('Send endpoints %s' % endpoint)
1316 for e in endpoint:
1317 if e == self.EndPoint.ceph:
1318 fail_reason = self._try_post('ceph report', self.url, report)
1319 if fail_reason:
1320 failed.append(fail_reason)
1321 else:
1322 now = int(time.time())
1323 self.last_upload = now
1324 self.set_store('last_upload', str(now))
1325 success.append('Ceph report sent to {0}'.format(self.url))
1326 self.log.info('Sent report to {0}'.format(self.url))
1327 elif e == self.EndPoint.device:
1328 if 'device' in self.get_active_channels():
1329 devices = self.gather_device_report()
1330 if devices:
1331 num_devs = 0
1332 num_hosts = 0
1333 for host, ls in devices.items():
1334 self.log.debug('host %s devices %s' % (host, ls))
1335 if not len(ls):
1336 continue
1337 fail_reason = self._try_post('devices', self.device_url,
1338 ls)
1339 if fail_reason:
1340 failed.append(fail_reason)
1341 else:
1342 num_devs += len(ls)
1343 num_hosts += 1
1344 if num_devs:
1345 success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
1346 num_devs, num_hosts, len(devices)))
1347 else:
1348 fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
1349 failed.append(fail_reason)
1350 self.log.error(fail_reason)
1351 if failed:
1352 return 1, '', '\n'.join(success + failed)
1353 return 0, '', '\n'.join(success)
1354
1355 def format_perf_histogram(self, report: Dict[str, Any]) -> None:
1356 # Formatting the perf histograms so they are human-readable. This will change the
1357 # ranges and values, which are currently in list form, into strings so that
1358 # they are displayed horizontally instead of vertically.
1359 try:
1360 # Formatting ranges and values in osd_perf_histograms
1361 mode = 'osd_perf_histograms'
1362 for config in report[mode]:
1363 for histogram in config:
1364 # Adjust ranges by converting lists into strings
1365 for axis in config[histogram]['axes']:
1366 for i in range(0, len(axis['ranges'])):
1367 axis['ranges'][i] = str(axis['ranges'][i])
1368
1369 for osd in config[histogram]['osds']:
1370 for i in range(0, len(osd['values'])):
1371 osd['values'][i] = str(osd['values'][i])
1372 except KeyError:
1373 # If the perf channel is not enabled, there should be a KeyError since
1374 # 'osd_perf_histograms' would not be present in the report. In that case,
1375 # the show function should pass as usual without trying to format the
1376 # histograms.
1377 pass
1378
1379 def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1380 '''
1381 Enable or disable a list of channels
1382 '''
1383 if not self.enabled:
1384 # telemetry should be on for channels to be toggled
1385 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1386 'Preview sample reports with `ceph telemetry preview`.'
1387 return 0, msg, ''
1388
1389 if channels is None:
1390 msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
1391 return 0, msg, ''
1392
1393 state = action == 'enable'
1394 msg = ''
1395 for c in channels:
1396 if c not in ALL_CHANNELS:
1397 msg = f"{msg}{c} is not a valid channel name. "\
1398 f"Available channels: {ALL_CHANNELS}.\n"
1399 else:
1400 self.set_module_option(f"channel_{c}", state)
1401 setattr(self,
1402 f"channel_{c}",
1403 state)
1404 msg = f"{msg}channel_{c} is {action}d\n"
1405
1406 return 0, msg, ''
1407
1408 @CLIReadCommand('telemetry status')
1409 def status(self) -> Tuple[int, str, str]:
1410 '''
1411 Show current configuration
1412 '''
1413 r = {}
1414 for opt in self.MODULE_OPTIONS:
1415 r[opt['name']] = getattr(self, opt['name'])
1416 r['last_upload'] = (time.ctime(self.last_upload)
1417 if self.last_upload else self.last_upload)
1418 return 0, json.dumps(r, indent=4, sort_keys=True), ''
1419
1420 @CLIReadCommand('telemetry diff')
1421 def diff(self) -> Tuple[int, str, str]:
1422 '''
1423 Show the diff between opted-in collection and available collection
1424 '''
1425 diff = []
1426 keys = ['nag']
1427
1428 for c in MODULE_COLLECTION:
1429 if not self.is_enabled_collection(c['name']):
1430 diff.append({key: val for key, val in c.items() if key not in keys})
1431
1432 r = None
1433 if diff == []:
1434 r = "Telemetry is up to date"
1435 else:
1436 r = json.dumps(diff, indent=4, sort_keys=True)
1437
1438 return 0, r, ''
1439
1440 @CLICommand('telemetry on')
1441 def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
1442 '''
1443 Enable telemetry reports from this cluster
1444 '''
1445 if license != LICENSE:
1446 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1447 To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
1448 else:
1449 self.set_module_option('enabled', True)
1450 self.enabled = True
1451 self.opt_in_all_collections()
1452
1453 # for major releases upgrade nagging
1454 mon_map = self.get('mon_map')
1455 mon_min = mon_map.get("min_mon_release", 0)
1456 self.set_store('last_opted_in_ceph_version', str(mon_min))
1457 self.last_opted_in_ceph_version = mon_min
1458
1459 msg = 'Telemetry is on.'
1460 disabled_channels = ''
1461 active_channels = self.get_active_channels()
1462 for c in ALL_CHANNELS:
1463 if c not in active_channels and c != 'ident':
1464 disabled_channels = f"{disabled_channels} {c}"
1465
1466 if len(disabled_channels) > 0:
1467 msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
1468 f"`ceph telemetry enable channel{disabled_channels}`"
1469
1470 return 0, msg, ''
1471
1472 @CLICommand('telemetry off')
1473 def off(self) -> Tuple[int, str, str]:
1474 '''
1475 Disable telemetry reports from this cluster
1476 '''
1477 if not self.enabled:
1478 # telemetry is already off
1479 msg = 'Telemetry is currently not enabled, nothing to turn off. '\
1480 'Please consider opting-in with `ceph telemetry on`.\n' \
1481 'Preview sample reports with `ceph telemetry preview`.'
1482 return 0, msg, ''
1483
1484 self.set_module_option('enabled', False)
1485 self.enabled = False
1486 self.set_store('collection', json.dumps([]))
1487 self.db_collection = []
1488
1489 # we might need this info in the future, in case
1490 # of nagging when user is opted-out
1491 mon_map = self.get('mon_map')
1492 mon_min = mon_map.get("min_mon_release", 0)
1493 self.set_store('last_opted_out_ceph_version', str(mon_min))
1494 self.last_opted_out_ceph_version = mon_min
1495
1496 msg = 'Telemetry is now disabled.'
1497 return 0, msg, ''
1498
1499 @CLIReadCommand('telemetry enable channel all')
1500 def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1501 '''
1502 Enable all channels
1503 '''
1504 return self.toggle_channel('enable', channels)
1505
1506 @CLIReadCommand('telemetry enable channel')
1507 def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1508 '''
1509 Enable a list of channels
1510 '''
1511 return self.toggle_channel('enable', channels)
1512
1513 @CLIReadCommand('telemetry disable channel all')
1514 def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
1515 '''
1516 Disable all channels
1517 '''
1518 return self.toggle_channel('disable', channels)
1519
1520 @CLIReadCommand('telemetry disable channel')
1521 def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1522 '''
1523 Disable a list of channels
1524 '''
1525 return self.toggle_channel('disable', channels)
1526
1527 @CLIReadCommand('telemetry channel ls')
1528 def channel_ls(self) -> Tuple[int, str, str]:
1529 '''
1530 List all channels
1531 '''
1532 table = PrettyTable(
1533 [
1534 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
1535 ],
1536 border=False)
1537 table.align['NAME'] = 'l'
1538 table.align['ENABLED'] = 'l'
1539 table.align['DEFAULT'] = 'l'
1540 table.align['DESC'] = 'l'
1541 table.left_padding_width = 0
1542 table.right_padding_width = 4
1543
1544 for c in ALL_CHANNELS:
1545 enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
1546 for o in self.MODULE_OPTIONS:
1547 if o['name'] == f"channel_{c}":
1548 default = "ON" if o.get('default', None) else "OFF"
1549 desc = o.get('desc', None)
1550
1551 table.add_row((
1552 c,
1553 enabled,
1554 default,
1555 desc,
1556 ))
1557
1558 return 0, table.get_string(sortby="NAME"), ''
1559
1560 @CLIReadCommand('telemetry collection ls')
1561 def collection_ls(self) -> Tuple[int, str, str]:
1562 '''
1563 List all collections
1564 '''
1565 col_delta = self.collection_delta()
1566 msg = ''
1567 if col_delta is not None and len(col_delta) > 0:
1568 msg = f"New collections are available:\n" \
1569 f"{sorted([c.name for c in col_delta])}\n" \
1570 f"Run `ceph telemetry on` to opt-in to these collections.\n"
1571
1572 table = PrettyTable(
1573 [
1574 'NAME', 'STATUS', 'DESC',
1575 ],
1576 border=False)
1577 table.align['NAME'] = 'l'
1578 table.align['STATUS'] = 'l'
1579 table.align['DESC'] = 'l'
1580 table.left_padding_width = 0
1581 table.right_padding_width = 4
1582
1583 for c in MODULE_COLLECTION:
1584 name = c['name']
1585 opted_in = self.is_enabled_collection(name)
1586 channel_enabled = getattr(self, f"channel_{c['channel']}")
1587
1588 status = ''
1589 if channel_enabled and opted_in:
1590 status = "REPORTING"
1591 else:
1592 why = ''
1593 delimiter = ''
1594
1595 if not opted_in:
1596 why += "NOT OPTED-IN"
1597 delimiter = ', '
1598 if not channel_enabled:
1599 why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
1600
1601 status = f"NOT REPORTING: {why}"
1602
1603 desc = c['description']
1604
1605 table.add_row((
1606 name,
1607 status,
1608 desc,
1609 ))
1610
1611 if len(msg):
1612 # add a new line between message and table output
1613 msg = f"{msg} \n"
1614
1615 return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
1616
1617 @CLICommand('telemetry send')
1618 def do_send(self,
1619 endpoint: Optional[List[EndPoint]] = None,
1620 license: Optional[str] = None) -> Tuple[int, str, str]:
1621 '''
1622 Send a sample report
1623 '''
1624 if not self.is_opted_in() and license != LICENSE:
1625 self.log.debug(('A telemetry send attempt while opted-out. '
1626 'Asking for license agreement'))
1627 return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
1628 To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
1629 Please consider enabling the telemetry module with 'ceph telemetry on'.'''
1630 else:
1631 self.last_report = self.compile_report()
1632 return self.send(self.last_report, endpoint)
1633
1634 @CLIReadCommand('telemetry show')
1635 def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1636 '''
1637 Show a sample report of opted-in collections (except for 'device')
1638 '''
1639 if not self.enabled:
1640 # if telemetry is off, no report is being sent, hence nothing to show
1641 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1642 'Preview sample reports with `ceph telemetry preview`.'
1643 return 0, msg, ''
1644
1645 report = self.get_report_locked(channels=channels)
1646 self.format_perf_histogram(report)
1647 report = json.dumps(report, indent=4, sort_keys=True)
1648
1649 if self.channel_device:
1650 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
1651
1652 return 0, report, ''
1653
1654 @CLIReadCommand('telemetry preview')
1655 def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
1656 '''
1657 Preview a sample report of the most recent collections available (except for 'device')
1658 '''
1659 report = {}
1660
1661 # We use a lock to prevent a scenario where the user wishes to preview
1662 # the report, and at the same time the module hits the interval of
1663 # sending a report with the opted-in collection, which has less data
1664 # than in the preview report.
1665 col_delta = self.collection_delta()
1666 with self.get_report_lock:
1667 if col_delta is not None and len(col_delta) == 0:
1668 # user is already opted-in to the most recent collection
1669 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1670 return 0, msg, ''
1671 else:
1672 # there are collections the user is not opted-in to
1673 next_collection = []
1674
1675 for c in MODULE_COLLECTION:
1676 next_collection.append(c['name'].name)
1677
1678 opted_in_collection = self.db_collection
1679 self.db_collection = next_collection
1680 report = self.get_report(channels=channels)
1681 self.db_collection = opted_in_collection
1682
1683 self.format_perf_histogram(report)
1684 report = json.dumps(report, indent=4, sort_keys=True)
1685
1686 if self.channel_device:
1687 report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
1688
1689 return 0, report, ''
1690
1691 @CLIReadCommand('telemetry show-device')
1692 def show_device(self) -> Tuple[int, str, str]:
1693 '''
1694 Show a sample device report
1695 '''
1696 if not self.enabled:
1697 # if telemetry is off, no report is being sent, hence nothing to show
1698 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1699 'Preview sample device reports with `ceph telemetry preview-device`.'
1700 return 0, msg, ''
1701
1702 if not self.channel_device:
1703 # if device channel is off, device report is not being sent, hence nothing to show
1704 msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
1705 'Preview sample device reports with `ceph telemetry preview-device`.'
1706 return 0, msg, ''
1707
1708 return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
1709
1710 @CLIReadCommand('telemetry preview-device')
1711 def preview_device(self) -> Tuple[int, str, str]:
1712 '''
1713 Preview a sample device report of the most recent device collection
1714 '''
1715 report = {}
1716
1717 device_col_delta = self.collection_delta(['device'])
1718 with self.get_report_lock:
1719 if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
1720 # user is already opted-in to the most recent device collection,
1721 # and device channel is on, thus `show-device` should be called
1722 msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
1723 return 0, msg, ''
1724
1725 # either the user is not opted-in at all, or there are collections
1726 # they are not opted-in to
1727 next_collection = []
1728
1729 for c in MODULE_COLLECTION:
1730 next_collection.append(c['name'].name)
1731
1732 opted_in_collection = self.db_collection
1733 self.db_collection = next_collection
1734 report = self.get_report('device')
1735 self.db_collection = opted_in_collection
1736
1737 report = json.dumps(report, indent=4, sort_keys=True)
1738 return 0, report, ''
1739
1740 @CLIReadCommand('telemetry show-all')
1741 def show_all(self) -> Tuple[int, str, str]:
1742 '''
1743 Show a sample report of all enabled channels (including 'device' channel)
1744 '''
1745 if not self.enabled:
1746 # if telemetry is off, no report is being sent, hence nothing to show
1747 msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
1748 'Preview sample reports with `ceph telemetry preview`.'
1749 return 0, msg, ''
1750
1751 if not self.channel_device:
1752 # device channel is off, no need to display its report
1753 return 0, json.dumps(self.get_report_locked('default'), indent=4, sort_keys=True), ''
1754
1755 # telemetry is on and device channel is enabled, show both
1756 return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), ''
1757
1758 @CLIReadCommand('telemetry preview-all')
1759 def preview_all(self) -> Tuple[int, str, str]:
1760 '''
1761 Preview a sample report of the most recent collections available of all channels (including 'device')
1762 '''
1763 report = {}
1764
1765 col_delta = self.collection_delta()
1766 with self.get_report_lock:
1767 if col_delta is not None and len(col_delta) == 0:
1768 # user is already opted-in to the most recent collection
1769 msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
1770 return 0, msg, ''
1771
1772 # there are collections the user is not opted-in to
1773 next_collection = []
1774
1775 for c in MODULE_COLLECTION:
1776 next_collection.append(c['name'].name)
1777
1778 opted_in_collection = self.db_collection
1779 self.db_collection = next_collection
1780 report = self.get_report('all')
1781 self.db_collection = opted_in_collection
1782
1783 self.format_perf_histogram(report)
1784 report = json.dumps(report, indent=4, sort_keys=True)
1785
1786 return 0, report, ''
1787
1788 def get_report_locked(self,
1789 report_type: str = 'default',
1790 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1791 '''
1792 A wrapper around get_report to allow for compiling a report of the most recent module collections
1793 '''
1794 with self.get_report_lock:
1795 return self.get_report(report_type, channels)
1796
1797 def get_report(self,
1798 report_type: str = 'default',
1799 channels: Optional[List[str]] = None) -> Dict[str, Any]:
1800 if report_type == 'default':
1801 return self.compile_report(channels=channels)
1802 elif report_type == 'device':
1803 return self.gather_device_report()
1804 elif report_type == 'all':
1805 return {'report': self.compile_report(channels=channels),
1806 'device_report': self.gather_device_report()}
1807 return {}
1808
1809 def self_test(self) -> None:
1810 report = self.compile_report()
1811 if len(report) == 0:
1812 raise RuntimeError('Report is empty')
1813
1814 if 'report_id' not in report:
1815 raise RuntimeError('report_id not found in report')
1816
1817 def shutdown(self) -> None:
1818 self.run = False
1819 self.event.set()
1820
1821 def refresh_health_checks(self) -> None:
1822 health_checks = {}
1823 # TODO do we want to nag also in case the user is not opted-in?
1824 if self.enabled and self.should_nag():
1825 health_checks['TELEMETRY_CHANGED'] = {
1826 'severity': 'warning',
1827 'summary': 'Telemetry requires re-opt-in',
1828 'detail': [
1829 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
1830 ]
1831 }
1832 self.set_health_checks(health_checks)
1833
1834 def serve(self) -> None:
1835 self.load()
1836 self.run = True
1837
1838 self.log.debug('Waiting for mgr to warm up')
1839 time.sleep(10)
1840
1841 while self.run:
1842 self.event.clear()
1843
1844 self.refresh_health_checks()
1845
1846 if not self.is_opted_in():
1847 self.log.debug('Not sending report until user re-opts-in')
1848 self.event.wait(1800)
1849 continue
1850 if not self.enabled:
1851 self.log.debug('Not sending report until configured to do so')
1852 self.event.wait(1800)
1853 continue
1854
1855 now = int(time.time())
1856 if not self.last_upload or \
1857 (now - self.last_upload) > self.interval * 3600:
1858 self.log.info('Compiling and sending report to %s',
1859 self.url)
1860
1861 try:
1862 self.last_report = self.compile_report()
1863 except Exception:
1864 self.log.exception('Exception while compiling report:')
1865
1866 self.send(self.last_report)
1867 else:
1868 self.log.debug('Interval for sending new report has not expired')
1869
1870 sleep = 3600
1871 self.log.debug('Sleeping for %d seconds', sleep)
1872 self.event.wait(sleep)
1873
1874 @staticmethod
1875 def can_run() -> Tuple[bool, str]:
1876 return True, ''