]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | """ |
2 | Telemetry module for ceph-mgr | |
3 | ||
4 | Collect statistics from Ceph cluster and send this back to the Ceph project | |
5 | when user has opted-in | |
6 | """ | |
20effc67 TL |
7 | import logging |
8 | import numbers | |
9 | import enum | |
11fdf7f2 | 10 | import errno |
eafe8130 | 11 | import hashlib |
11fdf7f2 | 12 | import json |
eafe8130 | 13 | import rbd |
11fdf7f2 TL |
14 | import requests |
15 | import uuid | |
16 | import time | |
eafe8130 | 17 | from datetime import datetime, timedelta |
20effc67 TL |
18 | from prettytable import PrettyTable |
19 | from threading import Event, Lock | |
11fdf7f2 | 20 | from collections import defaultdict |
20effc67 | 21 | from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union |
11fdf7f2 | 22 | |
20effc67 | 23 | from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT |
11fdf7f2 TL |
24 | |
25 | ||
20effc67 | 26 | ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf'] |
eafe8130 | 27 | |
20effc67 TL |
28 | LICENSE = 'sharing-1-0' |
29 | LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0' | |
30 | LICENSE_URL = 'https://cdla.io/sharing-1-0/' | |
eafe8130 TL |
31 | |
32 | # Latest revision of the telemetry report. Bump this each time we make | |
33 | # *any* change. | |
34 | REVISION = 3 | |
35 | ||
36 | # History of revisions | |
37 | # -------------------- | |
38 | # | |
39 | # Version 1: | |
40 | # Mimic and/or nautilus are lumped together here, since | |
41 | # we didn't track revisions yet. | |
42 | # | |
43 | # Version 2: | |
44 | # - added revision tracking, nagging, etc. | |
45 | # - added config option changes | |
46 | # - added channels | |
47 | # - added explicit license acknowledgement to the opt-in process | |
48 | # | |
49 | # Version 3: | |
50 | # - added device health metrics (i.e., SMART data, minus serial number) | |
51 | # - remove crush_rule | |
52 | # - added CephFS metadata (how many MDSs, fs features, how many data pools, | |
53 | # how much metadata is cached, rfiles, rbytes, rsnapshots) | |
54 | # - added more pool metadata (rep vs ec, cache tiering mode, ec profile) | |
55 | # - added host count, and counts for hosts with each of (mon, osd, mds, mgr) | |
56 | # - whether an OSD cluster network is in use | |
57 | # - rbd pool and image count, and rbd mirror mode (pool-level) | |
58 | # - rgw daemons, zones, zonegroups; which rgw frontends | |
59 | # - crush map stats | |
60 | ||
20effc67 TL |
61 | class Collection(str, enum.Enum): |
62 | basic_base = 'basic_base' | |
63 | device_base = 'device_base' | |
64 | crash_base = 'crash_base' | |
65 | ident_base = 'ident_base' | |
66 | perf_perf = 'perf_perf' | |
67 | basic_mds_metadata = 'basic_mds_metadata' | |
68 | basic_pool_usage = 'basic_pool_usage' | |
69 | basic_usage_by_class = 'basic_usage_by_class' | |
70 | ||
71 | MODULE_COLLECTION : List[Dict] = [ | |
72 | { | |
73 | "name": Collection.basic_base, | |
74 | "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)", | |
75 | "channel": "basic", | |
76 | "nag": False | |
77 | }, | |
78 | { | |
79 | "name": Collection.device_base, | |
80 | "description": "Information about device health metrics", | |
81 | "channel": "device", | |
82 | "nag": False | |
83 | }, | |
84 | { | |
85 | "name": Collection.crash_base, | |
86 | "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)", | |
87 | "channel": "crash", | |
88 | "nag": False | |
89 | }, | |
90 | { | |
91 | "name": Collection.ident_base, | |
92 | "description": "User-provided identifying information about the cluster", | |
93 | "channel": "ident", | |
94 | "nag": False | |
95 | }, | |
96 | { | |
97 | "name": Collection.perf_perf, | |
98 | "description": "Information about performance counters of the cluster", | |
99 | "channel": "perf", | |
100 | "nag": True | |
101 | }, | |
102 | { | |
103 | "name": Collection.basic_mds_metadata, | |
104 | "description": "MDS metadata", | |
105 | "channel": "basic", | |
106 | "nag": False | |
107 | }, | |
108 | { | |
109 | "name": Collection.basic_pool_usage, | |
110 | "description": "Default pool application and usage statistics", | |
111 | "channel": "basic", | |
112 | "nag": False | |
113 | }, | |
114 | { | |
115 | "name": Collection.basic_usage_by_class, | |
116 | "description": "Default device class usage statistics", | |
117 | "channel": "basic", | |
118 | "nag": False | |
119 | } | |
120 | ] | |
11fdf7f2 | 121 | |
20effc67 | 122 | class Module(MgrModule): |
11fdf7f2 | 123 | metadata_keys = [ |
20effc67 TL |
124 | "arch", |
125 | "ceph_version", | |
126 | "os", | |
127 | "cpu", | |
128 | "kernel_description", | |
129 | "kernel_version", | |
130 | "distro_description", | |
131 | "distro" | |
11fdf7f2 TL |
132 | ] |
133 | ||
134 | MODULE_OPTIONS = [ | |
20effc67 TL |
135 | Option(name='url', |
136 | type='str', | |
137 | default='https://telemetry.ceph.com/report'), | |
138 | Option(name='device_url', | |
139 | type='str', | |
140 | default='https://telemetry.ceph.com/device'), | |
141 | Option(name='enabled', | |
142 | type='bool', | |
143 | default=False), | |
144 | Option(name='last_opt_revision', | |
145 | type='int', | |
146 | default=1), | |
147 | Option(name='leaderboard', | |
148 | type='bool', | |
149 | default=False), | |
150 | Option(name='description', | |
151 | type='str', | |
152 | default=None), | |
153 | Option(name='contact', | |
154 | type='str', | |
155 | default=None), | |
156 | Option(name='organization', | |
157 | type='str', | |
158 | default=None), | |
159 | Option(name='proxy', | |
160 | type='str', | |
161 | default=None), | |
162 | Option(name='interval', | |
163 | type='int', | |
164 | default=24, | |
165 | min=8), | |
166 | Option(name='channel_basic', | |
167 | type='bool', | |
168 | default=True, | |
169 | desc='Share basic cluster information (size, version)'), | |
170 | Option(name='channel_ident', | |
171 | type='bool', | |
172 | default=False, | |
173 | desc='Share a user-provided description and/or contact email for the cluster'), | |
174 | Option(name='channel_crash', | |
175 | type='bool', | |
176 | default=True, | |
177 | desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'), | |
178 | Option(name='channel_device', | |
179 | type='bool', | |
180 | default=True, | |
181 | desc=('Share device health metrics ' | |
182 | '(e.g., SMART data, minus potentially identifying info like serial numbers)')), | |
183 | Option(name='channel_perf', | |
184 | type='bool', | |
185 | default=False, | |
186 | desc='Share various performance metrics of a cluster'), | |
11fdf7f2 TL |
187 | ] |
188 | ||
189 | @property | |
20effc67 | 190 | def config_keys(self) -> Dict[str, OptionValue]: |
11fdf7f2 TL |
191 | return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) |
192 | ||
20effc67 | 193 | def __init__(self, *args: Any, **kwargs: Any) -> None: |
11fdf7f2 TL |
194 | super(Module, self).__init__(*args, **kwargs) |
195 | self.event = Event() | |
196 | self.run = False | |
20effc67 TL |
197 | self.db_collection: Optional[List[str]] = None |
198 | self.last_opted_in_ceph_version: Optional[int] = None | |
199 | self.last_opted_out_ceph_version: Optional[int] = None | |
200 | self.last_upload: Optional[int] = None | |
201 | self.last_report: Dict[str, Any] = dict() | |
202 | self.report_id: Optional[str] = None | |
203 | self.salt: Optional[str] = None | |
204 | self.get_report_lock = Lock() | |
205 | self.config_update_module_option() | |
206 | # for mypy which does not run the code | |
207 | if TYPE_CHECKING: | |
208 | self.url = '' | |
209 | self.device_url = '' | |
210 | self.enabled = False | |
211 | self.last_opt_revision = 0 | |
212 | self.leaderboard = '' | |
213 | self.interval = 0 | |
214 | self.proxy = '' | |
215 | self.channel_basic = True | |
216 | self.channel_ident = False | |
217 | self.channel_crash = True | |
218 | self.channel_device = True | |
219 | self.channel_perf = False | |
220 | self.db_collection = ['basic_base', 'device_base'] | |
221 | self.last_opted_in_ceph_version = 17 | |
222 | self.last_opted_out_ceph_version = 0 | |
223 | ||
224 | def config_update_module_option(self) -> None: | |
11fdf7f2 TL |
225 | for opt in self.MODULE_OPTIONS: |
226 | setattr(self, | |
227 | opt['name'], | |
228 | self.get_module_option(opt['name'])) | |
229 | self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) | |
20effc67 TL |
230 | |
231 | def config_notify(self) -> None: | |
232 | self.config_update_module_option() | |
eafe8130 TL |
233 | # wake up serve() thread |
234 | self.event.set() | |
11fdf7f2 | 235 | |
20effc67 TL |
236 | def load(self) -> None: |
237 | last_upload = self.get_store('last_upload', None) | |
238 | if last_upload is None: | |
239 | self.last_upload = None | |
240 | else: | |
241 | self.last_upload = int(last_upload) | |
11fdf7f2 | 242 | |
20effc67 TL |
243 | report_id = self.get_store('report_id', None) |
244 | if report_id is None: | |
11fdf7f2 TL |
245 | self.report_id = str(uuid.uuid4()) |
246 | self.set_store('report_id', self.report_id) | |
20effc67 TL |
247 | else: |
248 | self.report_id = report_id | |
11fdf7f2 | 249 | |
20effc67 TL |
250 | salt = self.get_store('salt', None) |
251 | if salt is None: | |
eafe8130 TL |
252 | self.salt = str(uuid.uuid4()) |
253 | self.set_store('salt', self.salt) | |
20effc67 TL |
254 | else: |
255 | self.salt = salt | |
256 | ||
257 | self.init_collection() | |
258 | ||
259 | last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None) | |
260 | if last_opted_in_ceph_version is None: | |
261 | self.last_opted_in_ceph_version = None | |
262 | else: | |
263 | self.last_opted_in_ceph_version = int(last_opted_in_ceph_version) | |
264 | ||
265 | last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None) | |
266 | if last_opted_out_ceph_version is None: | |
267 | self.last_opted_out_ceph_version = None | |
268 | else: | |
269 | self.last_opted_out_ceph_version = int(last_opted_out_ceph_version) | |
eafe8130 | 270 | |
20effc67 TL |
271 | def gather_osd_metadata(self, |
272 | osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]: | |
11fdf7f2 TL |
273 | keys = ["osd_objectstore", "rotational"] |
274 | keys += self.metadata_keys | |
275 | ||
20effc67 | 276 | metadata: Dict[str, Dict[str, int]] = dict() |
11fdf7f2 TL |
277 | for key in keys: |
278 | metadata[key] = defaultdict(int) | |
279 | ||
280 | for osd in osd_map['osds']: | |
20effc67 | 281 | res = self.get_metadata('osd', str(osd['osd'])) |
92f5a8d4 TL |
282 | if res is None: |
283 | self.log.debug('Could not get metadata for osd.%s' % str(osd['osd'])) | |
284 | continue | |
20effc67 | 285 | for k, v in res.items(): |
11fdf7f2 TL |
286 | if k not in keys: |
287 | continue | |
288 | ||
289 | metadata[k][v] += 1 | |
290 | ||
291 | return metadata | |
292 | ||
20effc67 TL |
293 | def gather_mon_metadata(self, |
294 | mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]: | |
11fdf7f2 TL |
295 | keys = list() |
296 | keys += self.metadata_keys | |
297 | ||
20effc67 | 298 | metadata: Dict[str, Dict[str, int]] = dict() |
11fdf7f2 TL |
299 | for key in keys: |
300 | metadata[key] = defaultdict(int) | |
301 | ||
302 | for mon in mon_map['mons']: | |
20effc67 | 303 | res = self.get_metadata('mon', mon['name']) |
92f5a8d4 TL |
304 | if res is None: |
305 | self.log.debug('Could not get metadata for mon.%s' % (mon['name'])) | |
306 | continue | |
20effc67 TL |
307 | for k, v in res.items(): |
308 | if k not in keys: | |
309 | continue | |
310 | ||
311 | metadata[k][v] += 1 | |
312 | ||
313 | return metadata | |
314 | ||
315 | def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]: | |
316 | metadata: Dict[str, Dict[str, int]] = dict() | |
317 | ||
318 | res = self.get('mds_metadata') # metadata of *all* mds daemons | |
319 | if res is None or not res: | |
320 | self.log.debug('Could not get metadata for mds daemons') | |
321 | return metadata | |
322 | ||
323 | keys = list() | |
324 | keys += self.metadata_keys | |
325 | ||
326 | for key in keys: | |
327 | metadata[key] = defaultdict(int) | |
328 | ||
329 | for mds in res.values(): | |
330 | for k, v in mds.items(): | |
11fdf7f2 TL |
331 | if k not in keys: |
332 | continue | |
333 | ||
334 | metadata[k][v] += 1 | |
335 | ||
336 | return metadata | |
337 | ||
20effc67 TL |
338 | def gather_crush_info(self) -> Dict[str, Union[int, |
339 | bool, | |
340 | List[int], | |
341 | Dict[str, int], | |
342 | Dict[int, int]]]: | |
eafe8130 TL |
343 | osdmap = self.get_osdmap() |
344 | crush_raw = osdmap.get_crush() | |
345 | crush = crush_raw.dump() | |
346 | ||
20effc67 TL |
347 | BucketKeyT = TypeVar('BucketKeyT', int, str) |
348 | ||
349 | def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None: | |
eafe8130 TL |
350 | if k in d: |
351 | d[k] += 1 | |
352 | else: | |
353 | d[k] = 1 | |
354 | ||
20effc67 | 355 | device_classes: Dict[str, int] = {} |
eafe8130 TL |
356 | for dev in crush['devices']: |
357 | inc(device_classes, dev.get('class', '')) | |
358 | ||
20effc67 TL |
359 | bucket_algs: Dict[str, int] = {} |
360 | bucket_types: Dict[str, int] = {} | |
361 | bucket_sizes: Dict[int, int] = {} | |
eafe8130 TL |
362 | for bucket in crush['buckets']: |
363 | if '~' in bucket['name']: # ignore shadow buckets | |
364 | continue | |
365 | inc(bucket_algs, bucket['alg']) | |
366 | inc(bucket_types, bucket['type_id']) | |
367 | inc(bucket_sizes, len(bucket['items'])) | |
368 | ||
369 | return { | |
370 | 'num_devices': len(crush['devices']), | |
371 | 'num_types': len(crush['types']), | |
372 | 'num_buckets': len(crush['buckets']), | |
373 | 'num_rules': len(crush['rules']), | |
374 | 'device_classes': list(device_classes.values()), | |
375 | 'tunables': crush['tunables'], | |
376 | 'compat_weight_set': '-1' in crush['choose_args'], | |
377 | 'num_weight_sets': len(crush['choose_args']), | |
378 | 'bucket_algs': bucket_algs, | |
379 | 'bucket_sizes': bucket_sizes, | |
380 | 'bucket_types': bucket_types, | |
381 | } | |
382 | ||
20effc67 | 383 | def gather_configs(self) -> Dict[str, List[str]]: |
eafe8130 TL |
384 | # cluster config options |
385 | cluster = set() | |
386 | r, outb, outs = self.mon_command({ | |
387 | 'prefix': 'config dump', | |
388 | 'format': 'json' | |
20effc67 | 389 | }) |
eafe8130 TL |
390 | if r != 0: |
391 | return {} | |
392 | try: | |
393 | dump = json.loads(outb) | |
394 | except json.decoder.JSONDecodeError: | |
395 | return {} | |
396 | for opt in dump: | |
397 | name = opt.get('name') | |
398 | if name: | |
399 | cluster.add(name) | |
400 | # daemon-reported options (which may include ceph.conf) | |
401 | active = set() | |
20effc67 | 402 | ls = self.get("modified_config_options") |
eafe8130 TL |
403 | for opt in ls.get('options', {}): |
404 | active.add(opt) | |
405 | return { | |
406 | 'cluster_changed': sorted(list(cluster)), | |
407 | 'active_changed': sorted(list(active)), | |
408 | } | |
409 | ||
20effc67 TL |
410 | def get_heap_stats(self) -> Dict[str, dict]: |
411 | # Initialize result dict | |
412 | result: Dict[str, dict] = defaultdict(lambda: defaultdict(int)) | |
413 | ||
414 | # Get list of osd ids from the metadata | |
415 | osd_metadata = self.get('osd_metadata') | |
416 | ||
417 | # Grab output from the "osd.x heap stats" command | |
418 | for osd_id in osd_metadata: | |
419 | cmd_dict = { | |
420 | 'prefix': 'heap', | |
421 | 'heapcmd': 'stats', | |
422 | 'id': str(osd_id), | |
423 | } | |
424 | r, outb, outs = self.osd_command(cmd_dict) | |
425 | if r != 0: | |
426 | self.log.debug("Invalid command dictionary.") | |
427 | continue | |
428 | else: | |
429 | if 'tcmalloc heap stats' in outs: | |
430 | values = [int(i) for i in outs.split() if i.isdigit()] | |
431 | # `categories` must be ordered this way for the correct output to be parsed | |
432 | categories = ['use_by_application', | |
433 | 'page_heap_freelist', | |
434 | 'central_cache_freelist', | |
435 | 'transfer_cache_freelist', | |
436 | 'thread_cache_freelists', | |
437 | 'malloc_metadata', | |
438 | 'actual_memory_used', | |
439 | 'released_to_os', | |
440 | 'virtual_address_space_used', | |
441 | 'spans_in_use', | |
442 | 'thread_heaps_in_use', | |
443 | 'tcmalloc_page_size'] | |
444 | if len(values) != len(categories): | |
445 | self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \ | |
446 | 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs)) | |
447 | continue | |
448 | osd = 'osd.' + str(osd_id) | |
449 | result[osd] = dict(zip(categories, values)) | |
450 | else: | |
451 | self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs)) | |
452 | continue | |
453 | ||
454 | return result | |
455 | ||
456 | def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]: | |
457 | # Initialize result dict | |
458 | result: Dict[str, dict] = defaultdict(lambda: defaultdict(int)) | |
459 | ||
460 | # Get list of osd ids from the metadata | |
461 | osd_metadata = self.get('osd_metadata') | |
462 | ||
463 | # Grab output from the "osd.x dump_mempools" command | |
464 | for osd_id in osd_metadata: | |
465 | cmd_dict = { | |
466 | 'prefix': 'dump_mempools', | |
467 | 'id': str(osd_id), | |
468 | 'format': 'json' | |
469 | } | |
470 | r, outb, outs = self.osd_command(cmd_dict) | |
471 | if r != 0: | |
472 | self.log.debug("Invalid command dictionary.") | |
473 | continue | |
474 | else: | |
475 | try: | |
476 | # This is where the mempool will land. | |
477 | dump = json.loads(outb) | |
478 | if mode == 'separated': | |
479 | result["osd." + str(osd_id)] = dump['mempool']['by_pool'] | |
480 | elif mode == 'aggregated': | |
481 | for mem_type in dump['mempool']['by_pool']: | |
482 | result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes'] | |
483 | result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items'] | |
484 | else: | |
485 | self.log.debug("Incorrect mode specified in get_mempool") | |
486 | except (json.decoder.JSONDecodeError, KeyError) as e: | |
487 | self.log.debug("Error caught on osd.{}: {}".format(osd_id, e)) | |
488 | continue | |
489 | ||
490 | return result | |
491 | ||
492 | def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]: | |
493 | # Initialize result dict | |
494 | result: Dict[str, dict] = defaultdict(lambda: defaultdict( | |
495 | lambda: defaultdict( | |
496 | lambda: defaultdict( | |
497 | lambda: defaultdict( | |
498 | lambda: defaultdict(int)))))) | |
499 | ||
500 | # Get list of osd ids from the metadata | |
501 | osd_metadata = self.get('osd_metadata') | |
502 | ||
503 | # Grab output from the "osd.x perf histogram dump" command | |
504 | for osd_id in osd_metadata: | |
505 | cmd_dict = { | |
506 | 'prefix': 'perf histogram dump', | |
507 | 'id': str(osd_id), | |
508 | 'format': 'json' | |
509 | } | |
510 | r, outb, outs = self.osd_command(cmd_dict) | |
511 | # Check for invalid calls | |
512 | if r != 0: | |
513 | self.log.debug("Invalid command dictionary.") | |
514 | continue | |
515 | else: | |
516 | try: | |
517 | # This is where the histograms will land if there are any. | |
518 | dump = json.loads(outb) | |
519 | ||
520 | for histogram in dump['osd']: | |
521 | # Log axis information. There are two axes, each represented | |
522 | # as a dictionary. Both dictionaries are contained inside a | |
523 | # list called 'axes'. | |
524 | axes = [] | |
525 | for axis in dump['osd'][histogram]['axes']: | |
526 | ||
527 | # This is the dict that contains information for an individual | |
528 | # axis. It will be appended to the 'axes' list at the end. | |
529 | axis_dict: Dict[str, Any] = defaultdict() | |
530 | ||
531 | # Collecting information for buckets, min, name, etc. | |
532 | axis_dict['buckets'] = axis['buckets'] | |
533 | axis_dict['min'] = axis['min'] | |
534 | axis_dict['name'] = axis['name'] | |
535 | axis_dict['quant_size'] = axis['quant_size'] | |
536 | axis_dict['scale_type'] = axis['scale_type'] | |
537 | ||
538 | # Collecting ranges; placing them in lists to | |
539 | # improve readability later on. | |
540 | ranges = [] | |
541 | for _range in axis['ranges']: | |
542 | _max, _min = None, None | |
543 | if 'max' in _range: | |
544 | _max = _range['max'] | |
545 | if 'min' in _range: | |
546 | _min = _range['min'] | |
547 | ranges.append([_min, _max]) | |
548 | axis_dict['ranges'] = ranges | |
549 | ||
550 | # Now that 'axis_dict' contains all the appropriate | |
551 | # information for the current axis, append it to the 'axes' list. | |
552 | # There will end up being two axes in the 'axes' list, since the | |
553 | # histograms are 2D. | |
554 | axes.append(axis_dict) | |
555 | ||
556 | # Add the 'axes' list, containing both axes, to result. | |
557 | # At this point, you will see that the name of the key is the string | |
558 | # form of our axes list (str(axes)). This is there so that histograms | |
559 | # with different axis configs will not be combined. | |
560 | # These key names are later dropped when only the values are returned. | |
561 | result[str(axes)][histogram]['axes'] = axes | |
562 | ||
563 | # Collect current values and make sure they are in | |
564 | # integer form. | |
565 | values = [] | |
566 | for value_list in dump['osd'][histogram]['values']: | |
567 | values.append([int(v) for v in value_list]) | |
568 | ||
569 | if mode == 'separated': | |
570 | if 'osds' not in result[str(axes)][histogram]: | |
571 | result[str(axes)][histogram]['osds'] = [] | |
572 | result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values}) | |
573 | ||
574 | elif mode == 'aggregated': | |
575 | # Aggregate values. If 'values' have already been initialized, | |
576 | # we can safely add. | |
577 | if 'values' in result[str(axes)][histogram]: | |
578 | for i in range (0, len(values)): | |
579 | for j in range (0, len(values[i])): | |
580 | values[i][j] += result[str(axes)][histogram]['values'][i][j] | |
581 | ||
582 | # Add the values to result. | |
583 | result[str(axes)][histogram]['values'] = values | |
584 | ||
585 | # Update num_combined_osds | |
586 | if 'num_combined_osds' not in result[str(axes)][histogram]: | |
587 | result[str(axes)][histogram]['num_combined_osds'] = 1 | |
588 | else: | |
589 | result[str(axes)][histogram]['num_combined_osds'] += 1 | |
590 | else: | |
591 | self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode)) | |
592 | return list() | |
593 | ||
594 | # Sometimes, json errors occur if you give it an empty string. | |
595 | # I am also putting in a catch for a KeyError since it could | |
596 | # happen where the code is assuming that a key exists in the | |
597 | # schema when it doesn't. In either case, we'll handle that | |
598 | # by continuing and collecting what we can from other osds. | |
599 | except (json.decoder.JSONDecodeError, KeyError) as e: | |
600 | self.log.debug("Error caught on osd.{}: {}".format(osd_id, e)) | |
601 | continue | |
602 | ||
603 | return list(result.values()) | |
604 | ||
605 | def get_io_rate(self) -> dict: | |
606 | return self.get('io_rate') | |
607 | ||
608 | def get_stats_per_pool(self) -> dict: | |
609 | result = self.get('pg_dump')['pool_stats'] | |
610 | ||
611 | # collect application metadata from osd_map | |
612 | osd_map = self.get('osd_map') | |
613 | application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']} | |
614 | ||
615 | # add application to each pool from pg_dump | |
616 | for pool in result: | |
617 | pool['application'] = [] | |
618 | # Only include default applications | |
619 | for application in application_metadata[pool['poolid']]: | |
620 | if application in ['cephfs', 'mgr', 'rbd', 'rgw']: | |
621 | pool['application'].append(application) | |
622 | ||
623 | return result | |
624 | ||
625 | def get_stats_per_pg(self) -> dict: | |
626 | return self.get('pg_dump')['pg_stats'] | |
627 | ||
628 | def get_rocksdb_stats(self) -> Dict[str, str]: | |
629 | # Initalizers | |
630 | result: Dict[str, str] = defaultdict() | |
631 | version = self.get_rocksdb_version() | |
632 | ||
633 | # Update result | |
634 | result['version'] = version | |
635 | ||
636 | return result | |
637 | ||
638 | def gather_crashinfo(self) -> List[Dict[str, str]]: | |
639 | crashlist: List[Dict[str, str]] = list() | |
eafe8130 | 640 | errno, crashids, err = self.remote('crash', 'ls') |
11fdf7f2 | 641 | if errno: |
20effc67 | 642 | return crashlist |
11fdf7f2 | 643 | for crashid in crashids.split(): |
20effc67 | 644 | errno, crashinfo, err = self.remote('crash', 'do_info', crashid) |
11fdf7f2 TL |
645 | if errno: |
646 | continue | |
647 | c = json.loads(crashinfo) | |
20effc67 TL |
648 | |
649 | # redact hostname | |
11fdf7f2 | 650 | del c['utsname_hostname'] |
20effc67 | 651 | |
92f5a8d4 TL |
652 | # entity_name might have more than one '.', beware |
653 | (etype, eid) = c.get('entity_name', '').split('.', 1) | |
eafe8130 | 654 | m = hashlib.sha1() |
20effc67 | 655 | assert self.salt |
eafe8130 TL |
656 | m.update(self.salt.encode('utf-8')) |
657 | m.update(eid.encode('utf-8')) | |
658 | m.update(self.salt.encode('utf-8')) | |
659 | c['entity_name'] = etype + '.' + m.hexdigest() | |
20effc67 TL |
660 | |
661 | # redact final line of python tracebacks, as the exception | |
662 | # payload may contain identifying information | |
663 | if 'mgr_module' in c and 'backtrace' in c: | |
664 | # backtrace might be empty | |
665 | if len(c['backtrace']) > 0: | |
666 | c['backtrace'][-1] = '<redacted>' | |
667 | ||
11fdf7f2 TL |
668 | crashlist.append(c) |
669 | return crashlist | |
670 | ||
20effc67 TL |
671 | def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]: |
672 | # Extract perf counter data with get_all_perf_counters(), a method | |
673 | # from mgr/mgr_module.py. This method returns a nested dictionary that | |
674 | # looks a lot like perf schema, except with some additional fields. | |
675 | # | |
676 | # Example of output, a snapshot of a mon daemon: | |
677 | # "mon.b": { | |
678 | # "bluestore.kv_flush_lat": { | |
679 | # "count": 2431, | |
680 | # "description": "Average kv_thread flush latency", | |
681 | # "nick": "fl_l", | |
682 | # "priority": 8, | |
683 | # "type": 5, | |
684 | # "units": 1, | |
685 | # "value": 88814109 | |
686 | # }, | |
687 | # }, | |
688 | all_perf_counters = self.get_all_perf_counters() | |
689 | ||
690 | # Initialize 'result' dict | |
691 | result: Dict[str, dict] = defaultdict(lambda: defaultdict( | |
692 | lambda: defaultdict(lambda: defaultdict(int)))) | |
693 | ||
694 | for daemon in all_perf_counters: | |
695 | ||
696 | # Calculate num combined daemon types if in aggregated mode | |
697 | if mode == 'aggregated': | |
698 | daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw' | |
699 | if 'num_combined_daemons' not in result[daemon_type]: | |
700 | result[daemon_type]['num_combined_daemons'] = 1 | |
701 | else: | |
702 | result[daemon_type]['num_combined_daemons'] += 1 | |
703 | ||
704 | for collection in all_perf_counters[daemon]: | |
705 | # Split the collection to avoid redundancy in final report; i.e.: | |
706 | # bluestore.kv_flush_lat, bluestore.kv_final_lat --> | |
707 | # bluestore: kv_flush_lat, kv_final_lat | |
708 | col_0, col_1 = collection.split('.') | |
709 | ||
710 | # Debug log for empty keys. This initially was a problem for prioritycache | |
711 | # perf counters, where the col_0 was empty for certain mon counters: | |
712 | # | |
713 | # "mon.a": { instead of "mon.a": { | |
714 | # "": { "prioritycache": { | |
715 | # "cache_bytes": {...}, "cache_bytes": {...}, | |
716 | # | |
717 | # This log is here to detect any future instances of a similar issue. | |
718 | if (daemon == "") or (col_0 == "") or (col_1 == ""): | |
719 | self.log.debug("Instance of an empty key: {}{}".format(daemon, collection)) | |
720 | ||
721 | if mode == 'separated': | |
722 | # Add value to result | |
723 | result[daemon][col_0][col_1]['value'] = \ | |
724 | all_perf_counters[daemon][collection]['value'] | |
725 | ||
726 | # Check that 'count' exists, as not all counters have a count field. | |
727 | if 'count' in all_perf_counters[daemon][collection]: | |
728 | result[daemon][col_0][col_1]['count'] = \ | |
729 | all_perf_counters[daemon][collection]['count'] | |
730 | elif mode == 'aggregated': | |
731 | # Not every rgw daemon has the same schema. Specifically, each rgw daemon | |
732 | # has a uniquely-named collection that starts off identically (i.e. | |
733 | # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw"). | |
734 | # This bit of code combines these unique counters all under one rgw instance. | |
735 | # Without this check, the schema would remain separeted out in the final report. | |
736 | if col_0[0:11] == "objecter-0x": | |
737 | col_0 = "objecter-0x" | |
738 | ||
739 | # Check that the value can be incremented. In some cases, | |
740 | # the files are of type 'pair' (real-integer-pair, integer-integer pair). | |
741 | # In those cases, the value is a dictionary, and not a number. | |
742 | # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"] | |
743 | if isinstance(all_perf_counters[daemon][collection]['value'], numbers.Number): | |
744 | result[daemon_type][col_0][col_1]['value'] += \ | |
745 | all_perf_counters[daemon][collection]['value'] | |
746 | ||
747 | # Check that 'count' exists, as not all counters have a count field. | |
748 | if 'count' in all_perf_counters[daemon][collection]: | |
749 | result[daemon_type][col_0][col_1]['count'] += \ | |
750 | all_perf_counters[daemon][collection]['count'] | |
751 | else: | |
752 | self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode)) | |
753 | return {} | |
754 | ||
755 | return result | |
756 | ||
757 | def get_active_channels(self) -> List[str]: | |
eafe8130 TL |
758 | r = [] |
759 | if self.channel_basic: | |
760 | r.append('basic') | |
761 | if self.channel_crash: | |
762 | r.append('crash') | |
763 | if self.channel_device: | |
764 | r.append('device') | |
f67539c2 TL |
765 | if self.channel_ident: |
766 | r.append('ident') | |
20effc67 TL |
767 | if self.channel_perf: |
768 | r.append('perf') | |
eafe8130 TL |
769 | return r |
770 | ||
20effc67 | 771 | def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]: |
eafe8130 TL |
772 | try: |
773 | time_format = self.remote('devicehealth', 'get_time_format') | |
20effc67 TL |
774 | except Exception as e: |
775 | self.log.debug('Unable to format time: {}'.format(e)) | |
776 | return {} | |
eafe8130 TL |
777 | cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2) |
778 | min_sample = cutoff.strftime(time_format) | |
779 | ||
780 | devices = self.get('devices')['devices'] | |
20effc67 TL |
781 | if not devices: |
782 | self.log.debug('Unable to get device info from the mgr.') | |
783 | return {} | |
eafe8130 | 784 | |
20effc67 TL |
785 | # anon-host-id -> anon-devid -> { timestamp -> record } |
786 | res: Dict[str, Dict[str, Dict[str, str]]] = {} | |
eafe8130 TL |
787 | for d in devices: |
788 | devid = d['devid'] | |
789 | try: | |
790 | # this is a map of stamp -> {device info} | |
791 | m = self.remote('devicehealth', 'get_recent_device_metrics', | |
792 | devid, min_sample) | |
20effc67 TL |
793 | except Exception as e: |
794 | self.log.debug('Unable to get recent metrics from device with id "{}": {}'.format(devid, e)) | |
eafe8130 TL |
795 | continue |
796 | ||
797 | # anonymize host id | |
798 | try: | |
799 | host = d['location'][0]['host'] | |
20effc67 TL |
800 | except (KeyError, IndexError) as e: |
801 | self.log.debug('Unable to get host from device with id "{}": {}'.format(devid, e)) | |
eafe8130 TL |
802 | continue |
803 | anon_host = self.get_store('host-id/%s' % host) | |
804 | if not anon_host: | |
805 | anon_host = str(uuid.uuid1()) | |
806 | self.set_store('host-id/%s' % host, anon_host) | |
f91f0fd5 | 807 | serial = None |
eafe8130 TL |
808 | for dev, rep in m.items(): |
809 | rep['host_id'] = anon_host | |
f91f0fd5 TL |
810 | if serial is None and 'serial_number' in rep: |
811 | serial = rep['serial_number'] | |
eafe8130 TL |
812 | |
813 | # anonymize device id | |
eafe8130 TL |
814 | anon_devid = self.get_store('devid-id/%s' % devid) |
815 | if not anon_devid: | |
f91f0fd5 TL |
816 | # ideally devid is 'vendor_model_serial', |
817 | # but can also be 'model_serial', 'serial' | |
818 | if '_' in devid: | |
819 | anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}" | |
820 | else: | |
821 | anon_devid = str(uuid.uuid1()) | |
eafe8130 TL |
822 | self.set_store('devid-id/%s' % devid, anon_devid) |
823 | self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid, | |
824 | host, anon_host)) | |
825 | ||
826 | # anonymize the smartctl report itself | |
f91f0fd5 TL |
827 | if serial: |
828 | m_str = json.dumps(m) | |
829 | m = json.loads(m_str.replace(serial, 'deleted')) | |
eafe8130 TL |
830 | |
831 | if anon_host not in res: | |
832 | res[anon_host] = {} | |
833 | res[anon_host][anon_devid] = m | |
834 | return res | |
835 | ||
20effc67 | 836 | def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int: |
eafe8130 | 837 | data = self.get_counter(daemon_type, daemon_name, stat)[stat] |
eafe8130 TL |
838 | if data: |
839 | return data[-1][1] | |
840 | else: | |
841 | return 0 | |
842 | ||
20effc67 | 843 | def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]: |
eafe8130 TL |
844 | if not channels: |
845 | channels = self.get_active_channels() | |
11fdf7f2 | 846 | report = { |
522d829b | 847 | 'leaderboard': self.leaderboard, |
11fdf7f2 | 848 | 'report_version': 1, |
eafe8130 TL |
849 | 'report_timestamp': datetime.utcnow().isoformat(), |
850 | 'report_id': self.report_id, | |
851 | 'channels': channels, | |
852 | 'channels_available': ALL_CHANNELS, | |
853 | 'license': LICENSE, | |
20effc67 TL |
854 | 'collections_available': [c['name'].name for c in MODULE_COLLECTION], |
855 | 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])], | |
11fdf7f2 TL |
856 | } |
857 | ||
eafe8130 | 858 | if 'ident' in channels: |
eafe8130 TL |
859 | for option in ['description', 'contact', 'organization']: |
860 | report[option] = getattr(self, option) | |
861 | ||
862 | if 'basic' in channels: | |
863 | mon_map = self.get('mon_map') | |
864 | osd_map = self.get('osd_map') | |
865 | service_map = self.get('service_map') | |
866 | fs_map = self.get('fs_map') | |
867 | df = self.get('df') | |
20effc67 | 868 | df_pools = {pool['id']: pool for pool in df['pools']} |
eafe8130 | 869 | |
9f95a23c | 870 | report['created'] = mon_map['created'] |
eafe8130 TL |
871 | |
872 | # mons | |
873 | v1_mons = 0 | |
874 | v2_mons = 0 | |
875 | ipv4_mons = 0 | |
876 | ipv6_mons = 0 | |
877 | for mon in mon_map['mons']: | |
878 | for a in mon['public_addrs']['addrvec']: | |
879 | if a['type'] == 'v2': | |
880 | v2_mons += 1 | |
881 | elif a['type'] == 'v1': | |
882 | v1_mons += 1 | |
883 | if a['addr'].startswith('['): | |
884 | ipv6_mons += 1 | |
885 | else: | |
886 | ipv4_mons += 1 | |
887 | report['mon'] = { | |
888 | 'count': len(mon_map['mons']), | |
889 | 'features': mon_map['features'], | |
890 | 'min_mon_release': mon_map['min_mon_release'], | |
891 | 'v1_addr_mons': v1_mons, | |
892 | 'v2_addr_mons': v2_mons, | |
893 | 'ipv4_addr_mons': ipv4_mons, | |
894 | 'ipv6_addr_mons': ipv6_mons, | |
895 | } | |
896 | ||
897 | report['config'] = self.gather_configs() | |
898 | ||
899 | # pools | |
20effc67 TL |
900 | |
901 | rbd_num_pools = 0 | |
902 | rbd_num_images_by_pool = [] | |
903 | rbd_mirroring_by_pool = [] | |
eafe8130 TL |
904 | num_pg = 0 |
905 | report['pools'] = list() | |
906 | for pool in osd_map['pools']: | |
907 | num_pg += pool['pg_num'] | |
908 | ec_profile = {} | |
909 | if pool['erasure_code_profile']: | |
910 | orig = osd_map['erasure_code_profiles'].get( | |
911 | pool['erasure_code_profile'], {}) | |
912 | ec_profile = { | |
913 | k: orig[k] for k in orig.keys() | |
914 | if k in ['k', 'm', 'plugin', 'technique', | |
915 | 'crush-failure-domain', 'l'] | |
916 | } | |
20effc67 | 917 | pool_data = { |
eafe8130 | 918 | 'pool': pool['pool'], |
eafe8130 TL |
919 | 'pg_num': pool['pg_num'], |
920 | 'pgp_num': pool['pg_placement_num'], | |
921 | 'size': pool['size'], | |
922 | 'min_size': pool['min_size'], | |
923 | 'pg_autoscale_mode': pool['pg_autoscale_mode'], | |
924 | 'target_max_bytes': pool['target_max_bytes'], | |
925 | 'target_max_objects': pool['target_max_objects'], | |
926 | 'type': ['', 'replicated', '', 'erasure'][pool['type']], | |
927 | 'erasure_code_profile': ec_profile, | |
928 | 'cache_mode': pool['cache_mode'], | |
929 | } | |
20effc67 TL |
930 | |
931 | # basic_pool_usage collection | |
932 | if self.is_enabled_collection(Collection.basic_pool_usage): | |
933 | pool_data['application'] = [] | |
934 | for application in pool['application_metadata']: | |
935 | # Only include default applications | |
936 | if application in ['cephfs', 'mgr', 'rbd', 'rgw']: | |
937 | pool_data['application'].append(application) | |
938 | pool_stats = df_pools[pool['pool']]['stats'] | |
939 | pool_data['stats'] = { # filter out kb_used | |
940 | 'avail_raw': pool_stats['avail_raw'], | |
941 | 'bytes_used': pool_stats['bytes_used'], | |
942 | 'compress_bytes_used': pool_stats['compress_bytes_used'], | |
943 | 'compress_under_bytes': pool_stats['compress_under_bytes'], | |
944 | 'data_bytes_used': pool_stats['data_bytes_used'], | |
945 | 'dirty': pool_stats['dirty'], | |
946 | 'max_avail': pool_stats['max_avail'], | |
947 | 'objects': pool_stats['objects'], | |
948 | 'omap_bytes_used': pool_stats['omap_bytes_used'], | |
949 | 'percent_used': pool_stats['percent_used'], | |
950 | 'quota_bytes': pool_stats['quota_bytes'], | |
951 | 'quota_objects': pool_stats['quota_objects'], | |
952 | 'rd': pool_stats['rd'], | |
953 | 'rd_bytes': pool_stats['rd_bytes'], | |
954 | 'stored': pool_stats['stored'], | |
955 | 'stored_data': pool_stats['stored_data'], | |
956 | 'stored_omap': pool_stats['stored_omap'], | |
957 | 'stored_raw': pool_stats['stored_raw'], | |
958 | 'wr': pool_stats['wr'], | |
959 | 'wr_bytes': pool_stats['wr_bytes'] | |
960 | } | |
961 | ||
962 | cast(List[Dict[str, Any]], report['pools']).append(pool_data) | |
eafe8130 | 963 | if 'rbd' in pool['application_metadata']: |
20effc67 | 964 | rbd_num_pools += 1 |
eafe8130 | 965 | ioctx = self.rados.open_ioctx(pool['pool_name']) |
20effc67 | 966 | rbd_num_images_by_pool.append( |
eafe8130 | 967 | sum(1 for _ in rbd.RBD().list2(ioctx))) |
20effc67 | 968 | rbd_mirroring_by_pool.append( |
eafe8130 | 969 | rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED) |
20effc67 TL |
970 | report['rbd'] = { |
971 | 'num_pools': rbd_num_pools, | |
972 | 'num_images_by_pool': rbd_num_images_by_pool, | |
973 | 'mirroring_by_pool': rbd_mirroring_by_pool} | |
eafe8130 TL |
974 | |
975 | # osds | |
976 | cluster_network = False | |
977 | for osd in osd_map['osds']: | |
978 | if osd['up'] and not cluster_network: | |
979 | front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0] | |
92f5a8d4 | 980 | back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0] |
eafe8130 TL |
981 | if front_ip != back_ip: |
982 | cluster_network = True | |
983 | report['osd'] = { | |
984 | 'count': len(osd_map['osds']), | |
985 | 'require_osd_release': osd_map['require_osd_release'], | |
986 | 'require_min_compat_client': osd_map['require_min_compat_client'], | |
987 | 'cluster_network': cluster_network, | |
988 | } | |
989 | ||
990 | # crush | |
991 | report['crush'] = self.gather_crush_info() | |
992 | ||
993 | # cephfs | |
994 | report['fs'] = { | |
995 | 'count': len(fs_map['filesystems']), | |
996 | 'feature_flags': fs_map['feature_flags'], | |
997 | 'num_standby_mds': len(fs_map['standbys']), | |
998 | 'filesystems': [], | |
999 | } | |
1000 | num_mds = len(fs_map['standbys']) | |
1001 | for fsm in fs_map['filesystems']: | |
1002 | fs = fsm['mdsmap'] | |
1003 | num_sessions = 0 | |
1004 | cached_ino = 0 | |
1005 | cached_dn = 0 | |
1006 | cached_cap = 0 | |
1007 | subtrees = 0 | |
1008 | rfiles = 0 | |
1009 | rbytes = 0 | |
1010 | rsnaps = 0 | |
1011 | for gid, mds in fs['info'].items(): | |
1012 | num_sessions += self.get_latest('mds', mds['name'], | |
1013 | 'mds_sessions.session_count') | |
1014 | cached_ino += self.get_latest('mds', mds['name'], | |
1015 | 'mds_mem.ino') | |
1016 | cached_dn += self.get_latest('mds', mds['name'], | |
1017 | 'mds_mem.dn') | |
1018 | cached_cap += self.get_latest('mds', mds['name'], | |
1019 | 'mds_mem.cap') | |
1020 | subtrees += self.get_latest('mds', mds['name'], | |
1021 | 'mds.subtrees') | |
1022 | if mds['rank'] == 0: | |
1023 | rfiles = self.get_latest('mds', mds['name'], | |
1024 | 'mds.root_rfiles') | |
1025 | rbytes = self.get_latest('mds', mds['name'], | |
1026 | 'mds.root_rbytes') | |
1027 | rsnaps = self.get_latest('mds', mds['name'], | |
1028 | 'mds.root_rsnaps') | |
20effc67 | 1029 | report['fs']['filesystems'].append({ # type: ignore |
eafe8130 TL |
1030 | 'max_mds': fs['max_mds'], |
1031 | 'ever_allowed_features': fs['ever_allowed_features'], | |
1032 | 'explicitly_allowed_features': fs['explicitly_allowed_features'], | |
1033 | 'num_in': len(fs['in']), | |
1034 | 'num_up': len(fs['up']), | |
1035 | 'num_standby_replay': len( | |
1036 | [mds for gid, mds in fs['info'].items() | |
1037 | if mds['state'] == 'up:standby-replay']), | |
1038 | 'num_mds': len(fs['info']), | |
1039 | 'num_sessions': num_sessions, | |
1040 | 'cached_inos': cached_ino, | |
1041 | 'cached_dns': cached_dn, | |
1042 | 'cached_caps': cached_cap, | |
1043 | 'cached_subtrees': subtrees, | |
1044 | 'balancer_enabled': len(fs['balancer']) > 0, | |
1045 | 'num_data_pools': len(fs['data_pools']), | |
1046 | 'standby_count_wanted': fs['standby_count_wanted'], | |
1047 | 'approx_ctime': fs['created'][0:7], | |
1048 | 'files': rfiles, | |
1049 | 'bytes': rbytes, | |
1050 | 'snaps': rsnaps, | |
1051 | }) | |
1052 | num_mds += len(fs['info']) | |
20effc67 | 1053 | report['fs']['total_num_mds'] = num_mds # type: ignore |
eafe8130 TL |
1054 | |
1055 | # daemons | |
20effc67 TL |
1056 | report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map), |
1057 | mon=self.gather_mon_metadata(mon_map)) | |
1058 | ||
1059 | if self.is_enabled_collection(Collection.basic_mds_metadata): | |
1060 | report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore | |
eafe8130 TL |
1061 | |
1062 | # host counts | |
1063 | servers = self.list_servers() | |
1064 | self.log.debug('servers %s' % servers) | |
20effc67 | 1065 | hosts = { |
eafe8130 TL |
1066 | 'num': len([h for h in servers if h['hostname']]), |
1067 | } | |
1068 | for t in ['mon', 'mds', 'osd', 'mgr']: | |
20effc67 TL |
1069 | nr_services = sum(1 for host in servers if |
1070 | any(service for service in cast(List[ServiceInfoT], | |
1071 | host['services']) | |
1072 | if service['type'] == t)) | |
1073 | hosts['num_with_' + t] = nr_services | |
1074 | report['hosts'] = hosts | |
eafe8130 TL |
1075 | |
1076 | report['usage'] = { | |
1077 | 'pools': len(df['pools']), | |
92f5a8d4 | 1078 | 'pg_num': num_pg, |
eafe8130 TL |
1079 | 'total_used_bytes': df['stats']['total_used_bytes'], |
1080 | 'total_bytes': df['stats']['total_bytes'], | |
1081 | 'total_avail_bytes': df['stats']['total_avail_bytes'] | |
1082 | } | |
20effc67 TL |
1083 | # basic_usage_by_class collection |
1084 | if self.is_enabled_collection(Collection.basic_usage_by_class): | |
1085 | report['usage']['stats_by_class'] = {} # type: ignore | |
1086 | for device_class in df['stats_by_class']: | |
1087 | if device_class in ['hdd', 'ssd', 'nvme']: | |
1088 | report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore | |
1089 | ||
1090 | services: DefaultDict[str, int] = defaultdict(int) | |
eafe8130 | 1091 | for key, value in service_map['services'].items(): |
20effc67 | 1092 | services[key] += 1 |
eafe8130 | 1093 | if key == 'rgw': |
20effc67 | 1094 | rgw = {} |
eafe8130 | 1095 | zones = set() |
eafe8130 TL |
1096 | zonegroups = set() |
1097 | frontends = set() | |
20effc67 | 1098 | count = 0 |
eafe8130 | 1099 | d = value.get('daemons', dict()) |
20effc67 | 1100 | for k, v in d.items(): |
eafe8130 | 1101 | if k == 'summary' and v: |
20effc67 | 1102 | rgw[k] = v |
eafe8130 | 1103 | elif isinstance(v, dict) and 'metadata' in v: |
20effc67 | 1104 | count += 1 |
eafe8130 TL |
1105 | zones.add(v['metadata']['zone_id']) |
1106 | zonegroups.add(v['metadata']['zonegroup_id']) | |
1107 | frontends.add(v['metadata']['frontend_type#0']) | |
1108 | ||
1109 | # we could actually iterate over all the keys of | |
1110 | # the dict and check for how many frontends there | |
1111 | # are, but it is unlikely that one would be running | |
1112 | # more than 2 supported ones | |
1113 | f2 = v['metadata'].get('frontend_type#1', None) | |
1114 | if f2: | |
1115 | frontends.add(f2) | |
1116 | ||
20effc67 TL |
1117 | rgw['count'] = count |
1118 | rgw['zones'] = len(zones) | |
1119 | rgw['zonegroups'] = len(zonegroups) | |
1120 | rgw['frontends'] = list(frontends) # sets aren't json-serializable | |
1121 | report['rgw'] = rgw | |
1122 | report['services'] = services | |
eafe8130 TL |
1123 | |
1124 | try: | |
1125 | report['balancer'] = self.remote('balancer', 'gather_telemetry') | |
1126 | except ImportError: | |
1127 | report['balancer'] = { | |
1128 | 'active': False | |
11fdf7f2 | 1129 | } |
11fdf7f2 | 1130 | |
eafe8130 TL |
1131 | if 'crash' in channels: |
1132 | report['crashes'] = self.gather_crashinfo() | |
11fdf7f2 | 1133 | |
20effc67 TL |
1134 | if 'perf' in channels: |
1135 | if self.is_enabled_collection(Collection.perf_perf): | |
1136 | report['perf_counters'] = self.gather_perf_counters('separated') | |
1137 | report['stats_per_pool'] = self.get_stats_per_pool() | |
1138 | report['stats_per_pg'] = self.get_stats_per_pg() | |
1139 | report['io_rate'] = self.get_io_rate() | |
1140 | report['osd_perf_histograms'] = self.get_osd_histograms('separated') | |
1141 | report['mempool'] = self.get_mempool('separated') | |
1142 | report['heap_stats'] = self.get_heap_stats() | |
1143 | report['rocksdb_stats'] = self.get_rocksdb_stats() | |
1144 | ||
eafe8130 TL |
1145 | # NOTE: We do not include the 'device' channel in this report; it is |
1146 | # sent to a different endpoint. | |
11fdf7f2 TL |
1147 | |
1148 | return report | |
1149 | ||
20effc67 | 1150 | def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]: |
9f95a23c TL |
1151 | self.log.info('Sending %s to: %s' % (what, url)) |
1152 | proxies = dict() | |
1153 | if self.proxy: | |
1154 | self.log.info('Send using HTTP(S) proxy: %s', self.proxy) | |
1155 | proxies['http'] = self.proxy | |
1156 | proxies['https'] = self.proxy | |
1157 | try: | |
1158 | resp = requests.put(url=url, json=report, proxies=proxies) | |
1159 | resp.raise_for_status() | |
1160 | except Exception as e: | |
1161 | fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e)) | |
1162 | self.log.error(fail_reason) | |
1163 | return fail_reason | |
1164 | return None | |
1165 | ||
20effc67 TL |
1166 | class EndPoint(enum.Enum): |
1167 | ceph = 'ceph' | |
1168 | device = 'device' | |
1169 | ||
1170 | def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]: | |
1171 | ''' | |
1172 | Find collections that are available in the module, but are not in the db | |
1173 | ''' | |
1174 | if self.db_collection is None: | |
1175 | return None | |
1176 | ||
1177 | if not channels: | |
1178 | channels = ALL_CHANNELS | |
1179 | else: | |
1180 | for ch in channels: | |
1181 | if ch not in ALL_CHANNELS: | |
1182 | self.log.debug(f"invalid channel name: {ch}") | |
1183 | return None | |
1184 | ||
1185 | new_collection : List[Collection] = [] | |
1186 | ||
1187 | for c in MODULE_COLLECTION: | |
1188 | if c['name'].name not in self.db_collection: | |
1189 | if c['channel'] in channels: | |
1190 | new_collection.append(c['name']) | |
1191 | ||
1192 | return new_collection | |
1193 | ||
1194 | def is_major_upgrade(self) -> bool: | |
1195 | ''' | |
1196 | Returns True only if the user last opted-in to an older major | |
1197 | ''' | |
1198 | if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0: | |
1199 | # we do not know what Ceph version was when the user last opted-in, | |
1200 | # thus we do not wish to nag in case of a major upgrade | |
1201 | return False | |
1202 | ||
1203 | mon_map = self.get('mon_map') | |
1204 | mon_min = mon_map.get("min_mon_release", 0) | |
1205 | ||
1206 | if mon_min - self.last_opted_in_ceph_version > 0: | |
1207 | self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}") | |
1208 | return True | |
1209 | ||
1210 | return False | |
1211 | ||
1212 | def is_opted_in(self) -> bool: | |
1213 | # If len is 0 it means that the user is either opted-out (never | |
1214 | # opted-in, or invoked `telemetry off`), or they upgraded from a | |
1215 | # telemetry revision 1 or 2, which required to re-opt in to revision 3, | |
1216 | # regardless, hence is considered as opted-out | |
1217 | if self.db_collection is None: | |
1218 | return False | |
1219 | return len(self.db_collection) > 0 | |
1220 | ||
1221 | def should_nag(self) -> bool: | |
1222 | # Find delta between opted-in collections and module collections; | |
1223 | # nag only if module has a collection which is not in db, and nag == True. | |
1224 | ||
1225 | # We currently do not nag if the user is opted-out (or never opted-in). | |
1226 | # If we wish to do this in the future, we need to have a tri-mode state | |
1227 | # (opted in, opted out, no action yet), and it needs to be guarded by a | |
1228 | # config option (so that nagging can be turned off via config). | |
1229 | # We also need to add a last_opted_out_ceph_version variable, for the | |
1230 | # major upgrade check. | |
1231 | ||
1232 | # check if there are collections the user is not opt-in to | |
1233 | # that we should nag about | |
1234 | if self.db_collection is not None: | |
1235 | for c in MODULE_COLLECTION: | |
1236 | if c['name'].name not in self.db_collection: | |
1237 | if c['nag'] == True: | |
1238 | self.log.debug(f"The collection: {c['name']} is not reported") | |
1239 | return True | |
1240 | ||
1241 | # user might be opted-in to the most recent collection, or there is no | |
1242 | # new collection which requires nagging about; thus nag in case it's a | |
1243 | # major upgrade and there are new collections | |
1244 | # (which their own nag == False): | |
1245 | new_collections = False | |
1246 | col_delta = self.collection_delta() | |
1247 | if col_delta is not None and len(col_delta) > 0: | |
1248 | new_collections = True | |
1249 | ||
1250 | return self.is_major_upgrade() and new_collections | |
1251 | ||
1252 | def init_collection(self) -> None: | |
1253 | # We fetch from db the collections the user had already opted-in to. | |
1254 | # During the transition the results will be empty, but the user might | |
1255 | # be opted-in to an older version (e.g. revision = 3) | |
1256 | ||
1257 | collection = self.get_store('collection') | |
1258 | ||
1259 | if collection is not None: | |
1260 | self.db_collection = json.loads(collection) | |
1261 | ||
1262 | if self.db_collection is None: | |
1263 | # happens once on upgrade | |
1264 | if not self.enabled: | |
1265 | # user is not opted-in | |
1266 | self.set_store('collection', json.dumps([])) | |
1267 | self.log.debug("user is not opted-in") | |
1268 | else: | |
1269 | # user is opted-in, verify the revision: | |
1270 | if self.last_opt_revision == REVISION: | |
1271 | self.log.debug(f"telemetry revision is {REVISION}") | |
1272 | base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name] | |
1273 | self.set_store('collection', json.dumps(base_collection)) | |
1274 | else: | |
1275 | # user is opted-in to an older version, meaning they need | |
1276 | # to re-opt in regardless | |
1277 | self.set_store('collection', json.dumps([])) | |
1278 | self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in") | |
1279 | ||
1280 | # reload collection after setting | |
1281 | collection = self.get_store('collection') | |
1282 | if collection is not None: | |
1283 | self.db_collection = json.loads(collection) | |
1284 | else: | |
1285 | raise RuntimeError('collection is None after initial setting') | |
1286 | else: | |
1287 | # user has already upgraded | |
1288 | self.log.debug(f"user has upgraded already: collection: {self.db_collection}") | |
1289 | ||
1290 | def is_enabled_collection(self, collection: Collection) -> bool: | |
1291 | if self.db_collection is None: | |
1292 | return False | |
1293 | return collection.name in self.db_collection | |
1294 | ||
1295 | def opt_in_all_collections(self) -> None: | |
1296 | """ | |
1297 | Opt-in to all collections; Update db with the currently available collections in the module | |
1298 | """ | |
1299 | if self.db_collection is None: | |
1300 | raise RuntimeError('db_collection is None after initial setting') | |
1301 | ||
1302 | for c in MODULE_COLLECTION: | |
1303 | if c['name'].name not in self.db_collection: | |
1304 | self.db_collection.append(c['name']) | |
1305 | ||
1306 | self.set_store('collection', json.dumps(self.db_collection)) | |
1307 | ||
1308 | def send(self, | |
1309 | report: Dict[str, Dict[str, str]], | |
1310 | endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]: | |
eafe8130 | 1311 | if not endpoint: |
20effc67 | 1312 | endpoint = [self.EndPoint.ceph, self.EndPoint.device] |
eafe8130 TL |
1313 | failed = [] |
1314 | success = [] | |
eafe8130 | 1315 | self.log.debug('Send endpoints %s' % endpoint) |
eafe8130 | 1316 | for e in endpoint: |
20effc67 | 1317 | if e == self.EndPoint.ceph: |
9f95a23c TL |
1318 | fail_reason = self._try_post('ceph report', self.url, report) |
1319 | if fail_reason: | |
1320 | failed.append(fail_reason) | |
eafe8130 TL |
1321 | else: |
1322 | now = int(time.time()) | |
1323 | self.last_upload = now | |
1324 | self.set_store('last_upload', str(now)) | |
1325 | success.append('Ceph report sent to {0}'.format(self.url)) | |
1326 | self.log.info('Sent report to {0}'.format(self.url)) | |
20effc67 | 1327 | elif e == self.EndPoint.device: |
eafe8130 | 1328 | if 'device' in self.get_active_channels(): |
eafe8130 | 1329 | devices = self.gather_device_report() |
20effc67 TL |
1330 | if devices: |
1331 | num_devs = 0 | |
1332 | num_hosts = 0 | |
1333 | for host, ls in devices.items(): | |
1334 | self.log.debug('host %s devices %s' % (host, ls)) | |
1335 | if not len(ls): | |
1336 | continue | |
1337 | fail_reason = self._try_post('devices', self.device_url, | |
1338 | ls) | |
1339 | if fail_reason: | |
1340 | failed.append(fail_reason) | |
1341 | else: | |
1342 | num_devs += len(ls) | |
1343 | num_hosts += 1 | |
1344 | if num_devs: | |
1345 | success.append('Reported %d devices from %d hosts across a total of %d hosts' % ( | |
1346 | num_devs, num_hosts, len(devices))) | |
1347 | else: | |
1348 | fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.' | |
1349 | failed.append(fail_reason) | |
1350 | self.log.error(fail_reason) | |
eafe8130 TL |
1351 | if failed: |
1352 | return 1, '', '\n'.join(success + failed) | |
1353 | return 0, '', '\n'.join(success) | |
11fdf7f2 | 1354 | |
20effc67 TL |
1355 | def format_perf_histogram(self, report: Dict[str, Any]) -> None: |
1356 | # Formatting the perf histograms so they are human-readable. This will change the | |
1357 | # ranges and values, which are currently in list form, into strings so that | |
1358 | # they are displayed horizontally instead of vertically. | |
1359 | try: | |
1360 | # Formatting ranges and values in osd_perf_histograms | |
1361 | mode = 'osd_perf_histograms' | |
1362 | for config in report[mode]: | |
1363 | for histogram in config: | |
1364 | # Adjust ranges by converting lists into strings | |
1365 | for axis in config[histogram]['axes']: | |
1366 | for i in range(0, len(axis['ranges'])): | |
1367 | axis['ranges'][i] = str(axis['ranges'][i]) | |
1368 | ||
1369 | for osd in config[histogram]['osds']: | |
1370 | for i in range(0, len(osd['values'])): | |
1371 | osd['values'][i] = str(osd['values'][i]) | |
1372 | except KeyError: | |
1373 | # If the perf channel is not enabled, there should be a KeyError since | |
1374 | # 'osd_perf_histograms' would not be present in the report. In that case, | |
1375 | # the show function should pass as usual without trying to format the | |
1376 | # histograms. | |
1377 | pass | |
1378 | ||
1379 | def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1380 | ''' | |
1381 | Enable or disable a list of channels | |
1382 | ''' | |
1383 | if not self.enabled: | |
1384 | # telemetry should be on for channels to be toggled | |
1385 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1386 | 'Preview sample reports with `ceph telemetry preview`.' | |
1387 | return 0, msg, '' | |
1388 | ||
1389 | if channels is None: | |
1390 | msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.' | |
1391 | return 0, msg, '' | |
1392 | ||
1393 | state = action == 'enable' | |
1394 | msg = '' | |
1395 | for c in channels: | |
1396 | if c not in ALL_CHANNELS: | |
1397 | msg = f"{msg}{c} is not a valid channel name. "\ | |
1398 | f"Available channels: {ALL_CHANNELS}.\n" | |
1399 | else: | |
1400 | self.set_module_option(f"channel_{c}", state) | |
1401 | setattr(self, | |
1402 | f"channel_{c}", | |
1403 | state) | |
1404 | msg = f"{msg}channel_{c} is {action}d\n" | |
1405 | ||
1406 | return 0, msg, '' | |
1407 | ||
1408 | @CLIReadCommand('telemetry status') | |
1409 | def status(self) -> Tuple[int, str, str]: | |
1410 | ''' | |
1411 | Show current configuration | |
1412 | ''' | |
1413 | r = {} | |
1414 | for opt in self.MODULE_OPTIONS: | |
1415 | r[opt['name']] = getattr(self, opt['name']) | |
1416 | r['last_upload'] = (time.ctime(self.last_upload) | |
1417 | if self.last_upload else self.last_upload) | |
1418 | return 0, json.dumps(r, indent=4, sort_keys=True), '' | |
1419 | ||
1420 | @CLIReadCommand('telemetry diff') | |
1421 | def diff(self) -> Tuple[int, str, str]: | |
1422 | ''' | |
1423 | Show the diff between opted-in collection and available collection | |
1424 | ''' | |
1425 | diff = [] | |
1426 | keys = ['nag'] | |
1427 | ||
1428 | for c in MODULE_COLLECTION: | |
1429 | if not self.is_enabled_collection(c['name']): | |
1430 | diff.append({key: val for key, val in c.items() if key not in keys}) | |
1431 | ||
1432 | r = None | |
1433 | if diff == []: | |
1434 | r = "Telemetry is up to date" | |
1435 | else: | |
1436 | r = json.dumps(diff, indent=4, sort_keys=True) | |
1437 | ||
1438 | return 0, r, '' | |
1439 | ||
1440 | @CLICommand('telemetry on') | |
1441 | def on(self, license: Optional[str] = None) -> Tuple[int, str, str]: | |
1442 | ''' | |
1443 | Enable telemetry reports from this cluster | |
1444 | ''' | |
1445 | if license != LICENSE: | |
1446 | return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}). | |
1447 | To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.''' | |
11fdf7f2 | 1448 | else: |
20effc67 TL |
1449 | self.set_module_option('enabled', True) |
1450 | self.enabled = True | |
1451 | self.opt_in_all_collections() | |
11fdf7f2 | 1452 | |
20effc67 TL |
1453 | # for major releases upgrade nagging |
1454 | mon_map = self.get('mon_map') | |
1455 | mon_min = mon_map.get("min_mon_release", 0) | |
1456 | self.set_store('last_opted_in_ceph_version', str(mon_min)) | |
1457 | self.last_opted_in_ceph_version = mon_min | |
1458 | ||
1459 | msg = 'Telemetry is on.' | |
1460 | disabled_channels = '' | |
1461 | active_channels = self.get_active_channels() | |
1462 | for c in ALL_CHANNELS: | |
1463 | if c not in active_channels and c != 'ident': | |
1464 | disabled_channels = f"{disabled_channels} {c}" | |
1465 | ||
1466 | if len(disabled_channels) > 0: | |
1467 | msg = f"{msg}\nSome channels are disabled, please enable with:\n"\ | |
1468 | f"`ceph telemetry enable channel{disabled_channels}`" | |
1469 | ||
1470 | return 0, msg, '' | |
1471 | ||
1472 | @CLICommand('telemetry off') | |
1473 | def off(self) -> Tuple[int, str, str]: | |
1474 | ''' | |
1475 | Disable telemetry reports from this cluster | |
1476 | ''' | |
1477 | if not self.enabled: | |
1478 | # telemetry is already off | |
1479 | msg = 'Telemetry is currently not enabled, nothing to turn off. '\ | |
1480 | 'Please consider opting-in with `ceph telemetry on`.\n' \ | |
1481 | 'Preview sample reports with `ceph telemetry preview`.' | |
1482 | return 0, msg, '' | |
e306af50 | 1483 | |
e306af50 | 1484 | self.set_module_option('enabled', False) |
20effc67 TL |
1485 | self.enabled = False |
1486 | self.set_store('collection', json.dumps([])) | |
1487 | self.db_collection = [] | |
1488 | ||
1489 | # we might need this info in the future, in case | |
1490 | # of nagging when user is opted-out | |
1491 | mon_map = self.get('mon_map') | |
1492 | mon_min = mon_map.get("min_mon_release", 0) | |
1493 | self.set_store('last_opted_out_ceph_version', str(mon_min)) | |
1494 | self.last_opted_out_ceph_version = mon_min | |
1495 | ||
1496 | msg = 'Telemetry is now disabled.' | |
1497 | return 0, msg, '' | |
1498 | ||
1499 | @CLIReadCommand('telemetry enable channel all') | |
1500 | def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]: | |
1501 | ''' | |
1502 | Enable all channels | |
1503 | ''' | |
1504 | return self.toggle_channel('enable', channels) | |
1505 | ||
1506 | @CLIReadCommand('telemetry enable channel') | |
1507 | def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1508 | ''' | |
1509 | Enable a list of channels | |
1510 | ''' | |
1511 | return self.toggle_channel('enable', channels) | |
1512 | ||
1513 | @CLIReadCommand('telemetry disable channel all') | |
1514 | def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]: | |
1515 | ''' | |
1516 | Disable all channels | |
1517 | ''' | |
1518 | return self.toggle_channel('disable', channels) | |
1519 | ||
1520 | @CLIReadCommand('telemetry disable channel') | |
1521 | def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1522 | ''' | |
1523 | Disable a list of channels | |
1524 | ''' | |
1525 | return self.toggle_channel('disable', channels) | |
1526 | ||
1527 | @CLIReadCommand('telemetry channel ls') | |
1528 | def channel_ls(self) -> Tuple[int, str, str]: | |
1529 | ''' | |
1530 | List all channels | |
1531 | ''' | |
1532 | table = PrettyTable( | |
1533 | [ | |
1534 | 'NAME', 'ENABLED', 'DEFAULT', 'DESC', | |
1535 | ], | |
1536 | border=False) | |
1537 | table.align['NAME'] = 'l' | |
1538 | table.align['ENABLED'] = 'l' | |
1539 | table.align['DEFAULT'] = 'l' | |
1540 | table.align['DESC'] = 'l' | |
1541 | table.left_padding_width = 0 | |
1542 | table.right_padding_width = 4 | |
1543 | ||
1544 | for c in ALL_CHANNELS: | |
1545 | enabled = "ON" if getattr(self, f"channel_{c}") else "OFF" | |
1546 | for o in self.MODULE_OPTIONS: | |
1547 | if o['name'] == f"channel_{c}": | |
1548 | default = "ON" if o.get('default', None) else "OFF" | |
1549 | desc = o.get('desc', None) | |
1550 | ||
1551 | table.add_row(( | |
1552 | c, | |
1553 | enabled, | |
1554 | default, | |
1555 | desc, | |
1556 | )) | |
1557 | ||
1558 | return 0, table.get_string(sortby="NAME"), '' | |
1559 | ||
1560 | @CLIReadCommand('telemetry collection ls') | |
1561 | def collection_ls(self) -> Tuple[int, str, str]: | |
1562 | ''' | |
1563 | List all collections | |
1564 | ''' | |
1565 | col_delta = self.collection_delta() | |
1566 | msg = '' | |
1567 | if col_delta is not None and len(col_delta) > 0: | |
1568 | msg = f"New collections are available:\n" \ | |
1569 | f"{sorted([c.name for c in col_delta])}\n" \ | |
1570 | f"Run `ceph telemetry on` to opt-in to these collections.\n" | |
1571 | ||
1572 | table = PrettyTable( | |
1573 | [ | |
1574 | 'NAME', 'STATUS', 'DESC', | |
1575 | ], | |
1576 | border=False) | |
1577 | table.align['NAME'] = 'l' | |
1578 | table.align['STATUS'] = 'l' | |
1579 | table.align['DESC'] = 'l' | |
1580 | table.left_padding_width = 0 | |
1581 | table.right_padding_width = 4 | |
1582 | ||
1583 | for c in MODULE_COLLECTION: | |
1584 | name = c['name'] | |
1585 | opted_in = self.is_enabled_collection(name) | |
1586 | channel_enabled = getattr(self, f"channel_{c['channel']}") | |
1587 | ||
1588 | status = '' | |
1589 | if channel_enabled and opted_in: | |
1590 | status = "REPORTING" | |
1591 | else: | |
1592 | why = '' | |
1593 | delimiter = '' | |
1594 | ||
1595 | if not opted_in: | |
1596 | why += "NOT OPTED-IN" | |
1597 | delimiter = ', ' | |
1598 | if not channel_enabled: | |
1599 | why += f"{delimiter}CHANNEL {c['channel']} IS OFF" | |
1600 | ||
1601 | status = f"NOT REPORTING: {why}" | |
1602 | ||
1603 | desc = c['description'] | |
1604 | ||
1605 | table.add_row(( | |
1606 | name, | |
1607 | status, | |
1608 | desc, | |
1609 | )) | |
1610 | ||
1611 | if len(msg): | |
1612 | # add a new line between message and table output | |
1613 | msg = f"{msg} \n" | |
1614 | ||
1615 | return 0, f'{msg}{table.get_string(sortby="NAME")}', '' | |
1616 | ||
1617 | @CLICommand('telemetry send') | |
1618 | def do_send(self, | |
1619 | endpoint: Optional[List[EndPoint]] = None, | |
1620 | license: Optional[str] = None) -> Tuple[int, str, str]: | |
1621 | ''' | |
1622 | Send a sample report | |
1623 | ''' | |
1624 | if not self.is_opted_in() and license != LICENSE: | |
1625 | self.log.debug(('A telemetry send attempt while opted-out. ' | |
1626 | 'Asking for license agreement')) | |
1627 | return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}). | |
1628 | To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command. | |
1629 | Please consider enabling the telemetry module with 'ceph telemetry on'.''' | |
1630 | else: | |
1631 | self.last_report = self.compile_report() | |
1632 | return self.send(self.last_report, endpoint) | |
1633 | ||
1634 | @CLIReadCommand('telemetry show') | |
1635 | def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1636 | ''' | |
1637 | Show a sample report of opted-in collections (except for 'device') | |
1638 | ''' | |
1639 | if not self.enabled: | |
1640 | # if telemetry is off, no report is being sent, hence nothing to show | |
1641 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1642 | 'Preview sample reports with `ceph telemetry preview`.' | |
1643 | return 0, msg, '' | |
1644 | ||
1645 | report = self.get_report_locked(channels=channels) | |
1646 | self.format_perf_histogram(report) | |
1647 | report = json.dumps(report, indent=4, sort_keys=True) | |
1648 | ||
1649 | if self.channel_device: | |
1650 | report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.''' | |
1651 | ||
1652 | return 0, report, '' | |
1653 | ||
1654 | @CLIReadCommand('telemetry preview') | |
1655 | def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1656 | ''' | |
1657 | Preview a sample report of the most recent collections available (except for 'device') | |
1658 | ''' | |
1659 | report = {} | |
1660 | ||
1661 | # We use a lock to prevent a scenario where the user wishes to preview | |
1662 | # the report, and at the same time the module hits the interval of | |
1663 | # sending a report with the opted-in collection, which has less data | |
1664 | # than in the preview report. | |
1665 | col_delta = self.collection_delta() | |
1666 | with self.get_report_lock: | |
1667 | if col_delta is not None and len(col_delta) == 0: | |
1668 | # user is already opted-in to the most recent collection | |
1669 | msg = 'Telemetry is up to date, see report with `ceph telemetry show`.' | |
1670 | return 0, msg, '' | |
1671 | else: | |
1672 | # there are collections the user is not opted-in to | |
1673 | next_collection = [] | |
1674 | ||
1675 | for c in MODULE_COLLECTION: | |
1676 | next_collection.append(c['name'].name) | |
1677 | ||
1678 | opted_in_collection = self.db_collection | |
1679 | self.db_collection = next_collection | |
1680 | report = self.get_report(channels=channels) | |
1681 | self.db_collection = opted_in_collection | |
e306af50 | 1682 | |
20effc67 TL |
1683 | self.format_perf_histogram(report) |
1684 | report = json.dumps(report, indent=4, sort_keys=True) | |
1685 | ||
1686 | if self.channel_device: | |
1687 | report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.''' | |
1688 | ||
1689 | return 0, report, '' | |
1690 | ||
1691 | @CLIReadCommand('telemetry show-device') | |
1692 | def show_device(self) -> Tuple[int, str, str]: | |
1693 | ''' | |
1694 | Show a sample device report | |
1695 | ''' | |
1696 | if not self.enabled: | |
1697 | # if telemetry is off, no report is being sent, hence nothing to show | |
1698 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1699 | 'Preview sample device reports with `ceph telemetry preview-device`.' | |
1700 | return 0, msg, '' | |
1701 | ||
1702 | if not self.channel_device: | |
1703 | # if device channel is off, device report is not being sent, hence nothing to show | |
1704 | msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \ | |
1705 | 'Preview sample device reports with `ceph telemetry preview-device`.' | |
1706 | return 0, msg, '' | |
1707 | ||
1708 | return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), '' | |
1709 | ||
1710 | @CLIReadCommand('telemetry preview-device') | |
1711 | def preview_device(self) -> Tuple[int, str, str]: | |
1712 | ''' | |
1713 | Preview a sample device report of the most recent device collection | |
1714 | ''' | |
1715 | report = {} | |
1716 | ||
1717 | device_col_delta = self.collection_delta(['device']) | |
1718 | with self.get_report_lock: | |
1719 | if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device: | |
1720 | # user is already opted-in to the most recent device collection, | |
1721 | # and device channel is on, thus `show-device` should be called | |
1722 | msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.' | |
1723 | return 0, msg, '' | |
1724 | ||
1725 | # either the user is not opted-in at all, or there are collections | |
1726 | # they are not opted-in to | |
1727 | next_collection = [] | |
1728 | ||
1729 | for c in MODULE_COLLECTION: | |
1730 | next_collection.append(c['name'].name) | |
1731 | ||
1732 | opted_in_collection = self.db_collection | |
1733 | self.db_collection = next_collection | |
1734 | report = self.get_report('device') | |
1735 | self.db_collection = opted_in_collection | |
1736 | ||
1737 | report = json.dumps(report, indent=4, sort_keys=True) | |
1738 | return 0, report, '' | |
1739 | ||
1740 | @CLIReadCommand('telemetry show-all') | |
1741 | def show_all(self) -> Tuple[int, str, str]: | |
1742 | ''' | |
1743 | Show a sample report of all enabled channels (including 'device' channel) | |
1744 | ''' | |
1745 | if not self.enabled: | |
1746 | # if telemetry is off, no report is being sent, hence nothing to show | |
1747 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1748 | 'Preview sample reports with `ceph telemetry preview`.' | |
1749 | return 0, msg, '' | |
1750 | ||
1751 | if not self.channel_device: | |
1752 | # device channel is off, no need to display its report | |
1753 | return 0, json.dumps(self.get_report_locked('default'), indent=4, sort_keys=True), '' | |
1754 | ||
1755 | # telemetry is on and device channel is enabled, show both | |
1756 | return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), '' | |
1757 | ||
1758 | @CLIReadCommand('telemetry preview-all') | |
1759 | def preview_all(self) -> Tuple[int, str, str]: | |
1760 | ''' | |
1761 | Preview a sample report of the most recent collections available of all channels (including 'device') | |
1762 | ''' | |
1763 | report = {} | |
1764 | ||
1765 | col_delta = self.collection_delta() | |
1766 | with self.get_report_lock: | |
1767 | if col_delta is not None and len(col_delta) == 0: | |
1768 | # user is already opted-in to the most recent collection | |
1769 | msg = 'Telemetry is up to date, see report with `ceph telemetry show`.' | |
1770 | return 0, msg, '' | |
1771 | ||
1772 | # there are collections the user is not opted-in to | |
1773 | next_collection = [] | |
1774 | ||
1775 | for c in MODULE_COLLECTION: | |
1776 | next_collection.append(c['name'].name) | |
1777 | ||
1778 | opted_in_collection = self.db_collection | |
1779 | self.db_collection = next_collection | |
1780 | report = self.get_report('all') | |
1781 | self.db_collection = opted_in_collection | |
1782 | ||
1783 | self.format_perf_histogram(report) | |
1784 | report = json.dumps(report, indent=4, sort_keys=True) | |
1785 | ||
1786 | return 0, report, '' | |
1787 | ||
1788 | def get_report_locked(self, | |
1789 | report_type: str = 'default', | |
1790 | channels: Optional[List[str]] = None) -> Dict[str, Any]: | |
1791 | ''' | |
1792 | A wrapper around get_report to allow for compiling a report of the most recent module collections | |
1793 | ''' | |
1794 | with self.get_report_lock: | |
1795 | return self.get_report(report_type, channels) | |
1796 | ||
1797 | def get_report(self, | |
1798 | report_type: str = 'default', | |
1799 | channels: Optional[List[str]] = None) -> Dict[str, Any]: | |
e306af50 TL |
1800 | if report_type == 'default': |
1801 | return self.compile_report(channels=channels) | |
1802 | elif report_type == 'device': | |
1803 | return self.gather_device_report() | |
1804 | elif report_type == 'all': | |
1805 | return {'report': self.compile_report(channels=channels), | |
1806 | 'device_report': self.gather_device_report()} | |
1807 | return {} | |
1808 | ||
20effc67 | 1809 | def self_test(self) -> None: |
11fdf7f2 TL |
1810 | report = self.compile_report() |
1811 | if len(report) == 0: | |
1812 | raise RuntimeError('Report is empty') | |
1813 | ||
1814 | if 'report_id' not in report: | |
1815 | raise RuntimeError('report_id not found in report') | |
1816 | ||
20effc67 | 1817 | def shutdown(self) -> None: |
11fdf7f2 TL |
1818 | self.run = False |
1819 | self.event.set() | |
1820 | ||
20effc67 | 1821 | def refresh_health_checks(self) -> None: |
eafe8130 | 1822 | health_checks = {} |
20effc67 TL |
1823 | # TODO do we want to nag also in case the user is not opted-in? |
1824 | if self.enabled and self.should_nag(): | |
eafe8130 TL |
1825 | health_checks['TELEMETRY_CHANGED'] = { |
1826 | 'severity': 'warning', | |
1827 | 'summary': 'Telemetry requires re-opt-in', | |
1828 | 'detail': [ | |
20effc67 | 1829 | 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`' |
eafe8130 TL |
1830 | ] |
1831 | } | |
1832 | self.set_health_checks(health_checks) | |
1833 | ||
20effc67 | 1834 | def serve(self) -> None: |
11fdf7f2 | 1835 | self.load() |
11fdf7f2 TL |
1836 | self.run = True |
1837 | ||
1838 | self.log.debug('Waiting for mgr to warm up') | |
20effc67 | 1839 | time.sleep(10) |
11fdf7f2 TL |
1840 | |
1841 | while self.run: | |
eafe8130 TL |
1842 | self.event.clear() |
1843 | ||
1844 | self.refresh_health_checks() | |
1845 | ||
20effc67 | 1846 | if not self.is_opted_in(): |
eafe8130 TL |
1847 | self.log.debug('Not sending report until user re-opts-in') |
1848 | self.event.wait(1800) | |
1849 | continue | |
11fdf7f2 | 1850 | if not self.enabled: |
eafe8130 | 1851 | self.log.debug('Not sending report until configured to do so') |
11fdf7f2 TL |
1852 | self.event.wait(1800) |
1853 | continue | |
1854 | ||
1855 | now = int(time.time()) | |
20effc67 TL |
1856 | if not self.last_upload or \ |
1857 | (now - self.last_upload) > self.interval * 3600: | |
11fdf7f2 TL |
1858 | self.log.info('Compiling and sending report to %s', |
1859 | self.url) | |
1860 | ||
1861 | try: | |
1862 | self.last_report = self.compile_report() | |
20effc67 | 1863 | except Exception: |
11fdf7f2 TL |
1864 | self.log.exception('Exception while compiling report:') |
1865 | ||
eafe8130 | 1866 | self.send(self.last_report) |
11fdf7f2 | 1867 | else: |
eafe8130 | 1868 | self.log.debug('Interval for sending new report has not expired') |
11fdf7f2 TL |
1869 | |
1870 | sleep = 3600 | |
1871 | self.log.debug('Sleeping for %d seconds', sleep) | |
1872 | self.event.wait(sleep) | |
1873 | ||
11fdf7f2 | 1874 | @staticmethod |
20effc67 | 1875 | def can_run() -> Tuple[bool, str]: |
11fdf7f2 | 1876 | return True, '' |