]>
Commit | Line | Data |
---|---|---|
1 | """ | |
2 | Telemetry module for ceph-mgr | |
3 | ||
4 | Collect statistics from Ceph cluster and send this back to the Ceph project | |
5 | when user has opted-in | |
6 | """ | |
7 | import logging | |
8 | import numbers | |
9 | import enum | |
10 | import errno | |
11 | import hashlib | |
12 | import json | |
13 | import rbd | |
14 | import requests | |
15 | import uuid | |
16 | import time | |
17 | from datetime import datetime, timedelta | |
18 | from prettytable import PrettyTable | |
19 | from threading import Event, Lock | |
20 | from collections import defaultdict | |
21 | from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union | |
22 | ||
23 | from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT | |
24 | ||
25 | ||
26 | ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf'] | |
27 | ||
28 | LICENSE = 'sharing-1-0' | |
29 | LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0' | |
30 | LICENSE_URL = 'https://cdla.io/sharing-1-0/' | |
31 | NO_SALT_CNT = 0 | |
32 | ||
33 | # Latest revision of the telemetry report. Bump this each time we make | |
34 | # *any* change. | |
35 | REVISION = 3 | |
36 | ||
37 | # History of revisions | |
38 | # -------------------- | |
39 | # | |
40 | # Version 1: | |
41 | # Mimic and/or nautilus are lumped together here, since | |
42 | # we didn't track revisions yet. | |
43 | # | |
44 | # Version 2: | |
45 | # - added revision tracking, nagging, etc. | |
46 | # - added config option changes | |
47 | # - added channels | |
48 | # - added explicit license acknowledgement to the opt-in process | |
49 | # | |
50 | # Version 3: | |
51 | # - added device health metrics (i.e., SMART data, minus serial number) | |
52 | # - remove crush_rule | |
53 | # - added CephFS metadata (how many MDSs, fs features, how many data pools, | |
54 | # how much metadata is cached, rfiles, rbytes, rsnapshots) | |
55 | # - added more pool metadata (rep vs ec, cache tiering mode, ec profile) | |
56 | # - added host count, and counts for hosts with each of (mon, osd, mds, mgr) | |
57 | # - whether an OSD cluster network is in use | |
58 | # - rbd pool and image count, and rbd mirror mode (pool-level) | |
59 | # - rgw daemons, zones, zonegroups; which rgw frontends | |
60 | # - crush map stats | |
61 | ||
62 | class Collection(str, enum.Enum): | |
63 | basic_base = 'basic_base' | |
64 | device_base = 'device_base' | |
65 | crash_base = 'crash_base' | |
66 | ident_base = 'ident_base' | |
67 | perf_perf = 'perf_perf' | |
68 | basic_mds_metadata = 'basic_mds_metadata' | |
69 | basic_pool_usage = 'basic_pool_usage' | |
70 | basic_usage_by_class = 'basic_usage_by_class' | |
71 | basic_rook_v01 = 'basic_rook_v01' | |
72 | perf_memory_metrics = 'perf_memory_metrics' | |
73 | basic_pool_options_bluestore = 'basic_pool_options_bluestore' | |
74 | basic_pool_flags = 'basic_pool_flags' | |
75 | ||
76 | MODULE_COLLECTION : List[Dict] = [ | |
77 | { | |
78 | "name": Collection.basic_base, | |
79 | "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)", | |
80 | "channel": "basic", | |
81 | "nag": False | |
82 | }, | |
83 | { | |
84 | "name": Collection.device_base, | |
85 | "description": "Information about device health metrics", | |
86 | "channel": "device", | |
87 | "nag": False | |
88 | }, | |
89 | { | |
90 | "name": Collection.crash_base, | |
91 | "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)", | |
92 | "channel": "crash", | |
93 | "nag": False | |
94 | }, | |
95 | { | |
96 | "name": Collection.ident_base, | |
97 | "description": "User-provided identifying information about the cluster", | |
98 | "channel": "ident", | |
99 | "nag": False | |
100 | }, | |
101 | { | |
102 | "name": Collection.perf_perf, | |
103 | "description": "Information about performance counters of the cluster", | |
104 | "channel": "perf", | |
105 | "nag": True | |
106 | }, | |
107 | { | |
108 | "name": Collection.basic_mds_metadata, | |
109 | "description": "MDS metadata", | |
110 | "channel": "basic", | |
111 | "nag": False | |
112 | }, | |
113 | { | |
114 | "name": Collection.basic_pool_usage, | |
115 | "description": "Default pool application and usage statistics", | |
116 | "channel": "basic", | |
117 | "nag": False | |
118 | }, | |
119 | { | |
120 | "name": Collection.basic_usage_by_class, | |
121 | "description": "Default device class usage statistics", | |
122 | "channel": "basic", | |
123 | "nag": False | |
124 | }, | |
125 | { | |
126 | "name": Collection.basic_rook_v01, | |
127 | "description": "Basic Rook deployment data", | |
128 | "channel": "basic", | |
129 | "nag": True | |
130 | }, | |
131 | { | |
132 | "name": Collection.perf_memory_metrics, | |
133 | "description": "Heap stats and mempools for mon and mds", | |
134 | "channel": "perf", | |
135 | "nag": False | |
136 | }, | |
137 | { | |
138 | "name": Collection.basic_pool_options_bluestore, | |
139 | "description": "Per-pool bluestore config options", | |
140 | "channel": "basic", | |
141 | "nag": False | |
142 | }, | |
143 | { | |
144 | "name": Collection.basic_pool_flags, | |
145 | "description": "Per-pool flags", | |
146 | "channel": "basic", | |
147 | "nag": False | |
148 | }, | |
149 | ] | |
150 | ||
151 | ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [ | |
152 | # Note: a key cannot be both a node and a leaf, e.g. | |
153 | # "rook/a/b" | |
154 | # "rook/a/b/c" | |
155 | ("rook/version", Collection.basic_rook_v01), | |
156 | ("rook/kubernetes/version", Collection.basic_rook_v01), | |
157 | ("rook/csi/version", Collection.basic_rook_v01), | |
158 | ("rook/node/count/kubernetes-total", Collection.basic_rook_v01), | |
159 | ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01), | |
160 | ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01), | |
161 | ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01), | |
162 | ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01), | |
163 | ("rook/usage/storage-class/count/total", Collection.basic_rook_v01), | |
164 | ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01), | |
165 | ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01), | |
166 | ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01), | |
167 | ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01), | |
168 | ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01), | |
169 | ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01), | |
170 | ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01), | |
171 | ("rook/cluster/mon/count", Collection.basic_rook_v01), | |
172 | ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01), | |
173 | ("rook/cluster/mon/max-id", Collection.basic_rook_v01), | |
174 | ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01), | |
175 | ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01), | |
176 | ("rook/cluster/network/provider", Collection.basic_rook_v01), | |
177 | ("rook/cluster/external-mode", Collection.basic_rook_v01), | |
178 | ] | |
179 | ||
180 | class Module(MgrModule): | |
181 | metadata_keys = [ | |
182 | "arch", | |
183 | "ceph_version", | |
184 | "os", | |
185 | "cpu", | |
186 | "kernel_description", | |
187 | "kernel_version", | |
188 | "distro_description", | |
189 | "distro" | |
190 | ] | |
191 | ||
192 | MODULE_OPTIONS = [ | |
193 | Option(name='url', | |
194 | type='str', | |
195 | default='https://telemetry.ceph.com/report'), | |
196 | Option(name='device_url', | |
197 | type='str', | |
198 | default='https://telemetry.ceph.com/device'), | |
199 | Option(name='enabled', | |
200 | type='bool', | |
201 | default=False), | |
202 | Option(name='last_opt_revision', | |
203 | type='int', | |
204 | default=1), | |
205 | Option(name='leaderboard', | |
206 | type='bool', | |
207 | default=False), | |
208 | Option(name='leaderboard_description', | |
209 | type='str', | |
210 | default=None), | |
211 | Option(name='description', | |
212 | type='str', | |
213 | default=None), | |
214 | Option(name='contact', | |
215 | type='str', | |
216 | default=None), | |
217 | Option(name='organization', | |
218 | type='str', | |
219 | default=None), | |
220 | Option(name='proxy', | |
221 | type='str', | |
222 | default=None), | |
223 | Option(name='interval', | |
224 | type='int', | |
225 | default=24, | |
226 | min=8), | |
227 | Option(name='channel_basic', | |
228 | type='bool', | |
229 | default=True, | |
230 | desc='Share basic cluster information (size, version)'), | |
231 | Option(name='channel_ident', | |
232 | type='bool', | |
233 | default=False, | |
234 | desc='Share a user-provided description and/or contact email for the cluster'), | |
235 | Option(name='channel_crash', | |
236 | type='bool', | |
237 | default=True, | |
238 | desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'), | |
239 | Option(name='channel_device', | |
240 | type='bool', | |
241 | default=True, | |
242 | desc=('Share device health metrics ' | |
243 | '(e.g., SMART data, minus potentially identifying info like serial numbers)')), | |
244 | Option(name='channel_perf', | |
245 | type='bool', | |
246 | default=False, | |
247 | desc='Share various performance metrics of a cluster'), | |
248 | ] | |
249 | ||
250 | @property | |
251 | def config_keys(self) -> Dict[str, OptionValue]: | |
252 | return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) | |
253 | ||
254 | def __init__(self, *args: Any, **kwargs: Any) -> None: | |
255 | super(Module, self).__init__(*args, **kwargs) | |
256 | self.event = Event() | |
257 | self.run = False | |
258 | self.db_collection: Optional[List[str]] = None | |
259 | self.last_opted_in_ceph_version: Optional[int] = None | |
260 | self.last_opted_out_ceph_version: Optional[int] = None | |
261 | self.last_upload: Optional[int] = None | |
262 | self.last_report: Dict[str, Any] = dict() | |
263 | self.report_id: Optional[str] = None | |
264 | self.salt: Optional[str] = None | |
265 | self.get_report_lock = Lock() | |
266 | self.config_update_module_option() | |
267 | # for mypy which does not run the code | |
268 | if TYPE_CHECKING: | |
269 | self.url = '' | |
270 | self.device_url = '' | |
271 | self.enabled = False | |
272 | self.last_opt_revision = 0 | |
273 | self.leaderboard = '' | |
274 | self.leaderboard_description = '' | |
275 | self.interval = 0 | |
276 | self.proxy = '' | |
277 | self.channel_basic = True | |
278 | self.channel_ident = False | |
279 | self.channel_crash = True | |
280 | self.channel_device = True | |
281 | self.channel_perf = False | |
282 | self.db_collection = ['basic_base', 'device_base'] | |
283 | self.last_opted_in_ceph_version = 17 | |
284 | self.last_opted_out_ceph_version = 0 | |
285 | ||
286 | def config_update_module_option(self) -> None: | |
287 | for opt in self.MODULE_OPTIONS: | |
288 | setattr(self, | |
289 | opt['name'], | |
290 | self.get_module_option(opt['name'])) | |
291 | self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) | |
292 | ||
293 | def config_notify(self) -> None: | |
294 | self.config_update_module_option() | |
295 | # wake up serve() thread | |
296 | self.event.set() | |
297 | ||
298 | def load(self) -> None: | |
299 | last_upload = self.get_store('last_upload', None) | |
300 | if last_upload is None: | |
301 | self.last_upload = None | |
302 | else: | |
303 | self.last_upload = int(last_upload) | |
304 | ||
305 | report_id = self.get_store('report_id', None) | |
306 | if report_id is None: | |
307 | self.report_id = str(uuid.uuid4()) | |
308 | self.set_store('report_id', self.report_id) | |
309 | else: | |
310 | self.report_id = report_id | |
311 | ||
312 | salt = self.get_store('salt', None) | |
313 | if salt is None: | |
314 | self.salt = str(uuid.uuid4()) | |
315 | self.set_store('salt', self.salt) | |
316 | else: | |
317 | self.salt = salt | |
318 | ||
319 | self.init_collection() | |
320 | ||
321 | last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None) | |
322 | if last_opted_in_ceph_version is None: | |
323 | self.last_opted_in_ceph_version = None | |
324 | else: | |
325 | self.last_opted_in_ceph_version = int(last_opted_in_ceph_version) | |
326 | ||
327 | last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None) | |
328 | if last_opted_out_ceph_version is None: | |
329 | self.last_opted_out_ceph_version = None | |
330 | else: | |
331 | self.last_opted_out_ceph_version = int(last_opted_out_ceph_version) | |
332 | ||
333 | def gather_osd_metadata(self, | |
334 | osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]: | |
335 | keys = ["osd_objectstore", "rotational"] | |
336 | keys += self.metadata_keys | |
337 | ||
338 | metadata: Dict[str, Dict[str, int]] = dict() | |
339 | for key in keys: | |
340 | metadata[key] = defaultdict(int) | |
341 | ||
342 | for osd in osd_map['osds']: | |
343 | res = self.get_metadata('osd', str(osd['osd'])) | |
344 | if res is None: | |
345 | self.log.debug('Could not get metadata for osd.%s' % str(osd['osd'])) | |
346 | continue | |
347 | for k, v in res.items(): | |
348 | if k not in keys: | |
349 | continue | |
350 | ||
351 | metadata[k][v] += 1 | |
352 | ||
353 | return metadata | |
354 | ||
355 | def gather_mon_metadata(self, | |
356 | mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]: | |
357 | keys = list() | |
358 | keys += self.metadata_keys | |
359 | ||
360 | metadata: Dict[str, Dict[str, int]] = dict() | |
361 | for key in keys: | |
362 | metadata[key] = defaultdict(int) | |
363 | ||
364 | for mon in mon_map['mons']: | |
365 | res = self.get_metadata('mon', mon['name']) | |
366 | if res is None: | |
367 | self.log.debug('Could not get metadata for mon.%s' % (mon['name'])) | |
368 | continue | |
369 | for k, v in res.items(): | |
370 | if k not in keys: | |
371 | continue | |
372 | ||
373 | metadata[k][v] += 1 | |
374 | ||
375 | return metadata | |
376 | ||
377 | def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]: | |
378 | metadata: Dict[str, Dict[str, int]] = dict() | |
379 | ||
380 | res = self.get('mds_metadata') # metadata of *all* mds daemons | |
381 | if res is None or not res: | |
382 | self.log.debug('Could not get metadata for mds daemons') | |
383 | return metadata | |
384 | ||
385 | keys = list() | |
386 | keys += self.metadata_keys | |
387 | ||
388 | for key in keys: | |
389 | metadata[key] = defaultdict(int) | |
390 | ||
391 | for mds in res.values(): | |
392 | for k, v in mds.items(): | |
393 | if k not in keys: | |
394 | continue | |
395 | ||
396 | metadata[k][v] += 1 | |
397 | ||
398 | return metadata | |
399 | ||
400 | def gather_crush_info(self) -> Dict[str, Union[int, | |
401 | bool, | |
402 | List[int], | |
403 | Dict[str, int], | |
404 | Dict[int, int]]]: | |
405 | osdmap = self.get_osdmap() | |
406 | crush_raw = osdmap.get_crush() | |
407 | crush = crush_raw.dump() | |
408 | ||
409 | BucketKeyT = TypeVar('BucketKeyT', int, str) | |
410 | ||
411 | def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None: | |
412 | if k in d: | |
413 | d[k] += 1 | |
414 | else: | |
415 | d[k] = 1 | |
416 | ||
417 | device_classes: Dict[str, int] = {} | |
418 | for dev in crush['devices']: | |
419 | inc(device_classes, dev.get('class', '')) | |
420 | ||
421 | bucket_algs: Dict[str, int] = {} | |
422 | bucket_types: Dict[str, int] = {} | |
423 | bucket_sizes: Dict[int, int] = {} | |
424 | for bucket in crush['buckets']: | |
425 | if '~' in bucket['name']: # ignore shadow buckets | |
426 | continue | |
427 | inc(bucket_algs, bucket['alg']) | |
428 | inc(bucket_types, bucket['type_id']) | |
429 | inc(bucket_sizes, len(bucket['items'])) | |
430 | ||
431 | return { | |
432 | 'num_devices': len(crush['devices']), | |
433 | 'num_types': len(crush['types']), | |
434 | 'num_buckets': len(crush['buckets']), | |
435 | 'num_rules': len(crush['rules']), | |
436 | 'device_classes': list(device_classes.values()), | |
437 | 'tunables': crush['tunables'], | |
438 | 'compat_weight_set': '-1' in crush['choose_args'], | |
439 | 'num_weight_sets': len(crush['choose_args']), | |
440 | 'bucket_algs': bucket_algs, | |
441 | 'bucket_sizes': bucket_sizes, | |
442 | 'bucket_types': bucket_types, | |
443 | } | |
444 | ||
445 | def gather_configs(self) -> Dict[str, List[str]]: | |
446 | # cluster config options | |
447 | cluster = set() | |
448 | r, outb, outs = self.mon_command({ | |
449 | 'prefix': 'config dump', | |
450 | 'format': 'json' | |
451 | }) | |
452 | if r != 0: | |
453 | return {} | |
454 | try: | |
455 | dump = json.loads(outb) | |
456 | except json.decoder.JSONDecodeError: | |
457 | return {} | |
458 | for opt in dump: | |
459 | name = opt.get('name') | |
460 | if name: | |
461 | cluster.add(name) | |
462 | # daemon-reported options (which may include ceph.conf) | |
463 | active = set() | |
464 | ls = self.get("modified_config_options") | |
465 | for opt in ls.get('options', {}): | |
466 | active.add(opt) | |
467 | return { | |
468 | 'cluster_changed': sorted(list(cluster)), | |
469 | 'active_changed': sorted(list(active)), | |
470 | } | |
471 | ||
472 | def anonymize_entity_name(self, entity_name:str) -> str: | |
473 | if '.' not in entity_name: | |
474 | self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found") | |
475 | return entity_name | |
476 | ||
477 | (etype, eid) = entity_name.split('.', 1) | |
478 | m = hashlib.sha1() | |
479 | salt = '' | |
480 | if self.salt is not None: | |
481 | salt = self.salt | |
482 | # avoid asserting that salt exists | |
483 | if not self.salt: | |
484 | # do not set self.salt to a temp value | |
485 | salt = f"no_salt_found_{NO_SALT_CNT}" | |
486 | NO_SALT_CNT += 1 | |
487 | self.log.debug(f"No salt found, created a temp one: {salt}") | |
488 | m.update(salt.encode('utf-8')) | |
489 | m.update(eid.encode('utf-8')) | |
490 | m.update(salt.encode('utf-8')) | |
491 | ||
492 | return etype + '.' + m.hexdigest() | |
493 | ||
494 | def get_heap_stats(self) -> Dict[str, dict]: | |
495 | result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) | |
496 | anonymized_daemons = {} | |
497 | osd_map = self.get('osd_map') | |
498 | ||
499 | # Combine available daemons | |
500 | daemons = [] | |
501 | for osd in osd_map['osds']: | |
502 | daemons.append('osd'+'.'+str(osd['osd'])) | |
503 | # perf_memory_metrics collection (1/2) | |
504 | if self.is_enabled_collection(Collection.perf_memory_metrics): | |
505 | mon_map = self.get('mon_map') | |
506 | mds_metadata = self.get('mds_metadata') | |
507 | for mon in mon_map['mons']: | |
508 | daemons.append('mon'+'.'+mon['name']) | |
509 | for mds in mds_metadata: | |
510 | daemons.append('mds'+'.'+mds) | |
511 | ||
512 | # Grab output from the "daemon.x heap stats" command | |
513 | for daemon in daemons: | |
514 | daemon_type, daemon_id = daemon.split('.', 1) | |
515 | heap_stats = self.parse_heap_stats(daemon_type, daemon_id) | |
516 | if heap_stats: | |
517 | if (daemon_type != 'osd'): | |
518 | # Anonymize mon and mds | |
519 | anonymized_daemons[daemon] = self.anonymize_entity_name(daemon) | |
520 | daemon = anonymized_daemons[daemon] | |
521 | result[daemon_type][daemon] = heap_stats | |
522 | else: | |
523 | continue | |
524 | ||
525 | if anonymized_daemons: | |
526 | # for debugging purposes only, this data is never reported | |
527 | self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons)) | |
528 | return result | |
529 | ||
530 | def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]: | |
531 | parsed_output = {} | |
532 | ||
533 | cmd_dict = { | |
534 | 'prefix': 'heap', | |
535 | 'heapcmd': 'stats' | |
536 | } | |
537 | r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict) | |
538 | ||
539 | if r != 0: | |
540 | self.log.error("Invalid command dictionary: {}".format(cmd_dict)) | |
541 | else: | |
542 | if 'tcmalloc heap stats' in outb: | |
543 | values = [int(i) for i in outb.split() if i.isdigit()] | |
544 | # `categories` must be ordered this way for the correct output to be parsed | |
545 | categories = ['use_by_application', | |
546 | 'page_heap_freelist', | |
547 | 'central_cache_freelist', | |
548 | 'transfer_cache_freelist', | |
549 | 'thread_cache_freelists', | |
550 | 'malloc_metadata', | |
551 | 'actual_memory_used', | |
552 | 'released_to_os', | |
553 | 'virtual_address_space_used', | |
554 | 'spans_in_use', | |
555 | 'thread_heaps_in_use', | |
556 | 'tcmalloc_page_size'] | |
557 | if len(values) != len(categories): | |
558 | self.log.error('Received unexpected output from {}.{}; ' \ | |
559 | 'number of values should match the number' \ | |
560 | 'of expected categories:\n values: len={} {} '\ | |
561 | '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs)) | |
562 | else: | |
563 | parsed_output = dict(zip(categories, values)) | |
564 | else: | |
565 | self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs)) | |
566 | ||
567 | return parsed_output | |
568 | ||
569 | def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]: | |
570 | result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) | |
571 | anonymized_daemons = {} | |
572 | osd_map = self.get('osd_map') | |
573 | ||
574 | # Combine available daemons | |
575 | daemons = [] | |
576 | for osd in osd_map['osds']: | |
577 | daemons.append('osd'+'.'+str(osd['osd'])) | |
578 | # perf_memory_metrics collection (2/2) | |
579 | if self.is_enabled_collection(Collection.perf_memory_metrics): | |
580 | mon_map = self.get('mon_map') | |
581 | mds_metadata = self.get('mds_metadata') | |
582 | for mon in mon_map['mons']: | |
583 | daemons.append('mon'+'.'+mon['name']) | |
584 | for mds in mds_metadata: | |
585 | daemons.append('mds'+'.'+mds) | |
586 | ||
587 | # Grab output from the "dump_mempools" command | |
588 | for daemon in daemons: | |
589 | daemon_type, daemon_id = daemon.split('.', 1) | |
590 | cmd_dict = { | |
591 | 'prefix': 'dump_mempools', | |
592 | 'format': 'json' | |
593 | } | |
594 | r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict) | |
595 | if r != 0: | |
596 | self.log.error("Invalid command dictionary: {}".format(cmd_dict)) | |
597 | continue | |
598 | else: | |
599 | try: | |
600 | # This is where the mempool will land. | |
601 | dump = json.loads(outb) | |
602 | if mode == 'separated': | |
603 | # Anonymize mon and mds | |
604 | if daemon_type != 'osd': | |
605 | anonymized_daemons[daemon] = self.anonymize_entity_name(daemon) | |
606 | daemon = anonymized_daemons[daemon] | |
607 | result[daemon_type][daemon] = dump['mempool']['by_pool'] | |
608 | elif mode == 'aggregated': | |
609 | for mem_type in dump['mempool']['by_pool']: | |
610 | result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes'] | |
611 | result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items'] | |
612 | else: | |
613 | self.log.error("Incorrect mode specified in get_mempool: {}".format(mode)) | |
614 | except (json.decoder.JSONDecodeError, KeyError) as e: | |
615 | self.log.exception("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e)) | |
616 | continue | |
617 | ||
618 | if anonymized_daemons: | |
619 | # for debugging purposes only, this data is never reported | |
620 | self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons)) | |
621 | ||
622 | return result | |
623 | ||
624 | def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]: | |
625 | # Initialize result dict | |
626 | result: Dict[str, dict] = defaultdict(lambda: defaultdict( | |
627 | lambda: defaultdict( | |
628 | lambda: defaultdict( | |
629 | lambda: defaultdict( | |
630 | lambda: defaultdict(int)))))) | |
631 | ||
632 | # Get list of osd ids from the metadata | |
633 | osd_metadata = self.get('osd_metadata') | |
634 | ||
635 | # Grab output from the "osd.x perf histogram dump" command | |
636 | for osd_id in osd_metadata: | |
637 | cmd_dict = { | |
638 | 'prefix': 'perf histogram dump', | |
639 | 'id': str(osd_id), | |
640 | 'format': 'json' | |
641 | } | |
642 | r, outb, outs = self.osd_command(cmd_dict) | |
643 | # Check for invalid calls | |
644 | if r != 0: | |
645 | self.log.error("Invalid command dictionary: {}".format(cmd_dict)) | |
646 | continue | |
647 | else: | |
648 | try: | |
649 | # This is where the histograms will land if there are any. | |
650 | dump = json.loads(outb) | |
651 | ||
652 | for histogram in dump['osd']: | |
653 | # Log axis information. There are two axes, each represented | |
654 | # as a dictionary. Both dictionaries are contained inside a | |
655 | # list called 'axes'. | |
656 | axes = [] | |
657 | for axis in dump['osd'][histogram]['axes']: | |
658 | ||
659 | # This is the dict that contains information for an individual | |
660 | # axis. It will be appended to the 'axes' list at the end. | |
661 | axis_dict: Dict[str, Any] = defaultdict() | |
662 | ||
663 | # Collecting information for buckets, min, name, etc. | |
664 | axis_dict['buckets'] = axis['buckets'] | |
665 | axis_dict['min'] = axis['min'] | |
666 | axis_dict['name'] = axis['name'] | |
667 | axis_dict['quant_size'] = axis['quant_size'] | |
668 | axis_dict['scale_type'] = axis['scale_type'] | |
669 | ||
670 | # Collecting ranges; placing them in lists to | |
671 | # improve readability later on. | |
672 | ranges = [] | |
673 | for _range in axis['ranges']: | |
674 | _max, _min = None, None | |
675 | if 'max' in _range: | |
676 | _max = _range['max'] | |
677 | if 'min' in _range: | |
678 | _min = _range['min'] | |
679 | ranges.append([_min, _max]) | |
680 | axis_dict['ranges'] = ranges | |
681 | ||
682 | # Now that 'axis_dict' contains all the appropriate | |
683 | # information for the current axis, append it to the 'axes' list. | |
684 | # There will end up being two axes in the 'axes' list, since the | |
685 | # histograms are 2D. | |
686 | axes.append(axis_dict) | |
687 | ||
688 | # Add the 'axes' list, containing both axes, to result. | |
689 | # At this point, you will see that the name of the key is the string | |
690 | # form of our axes list (str(axes)). This is there so that histograms | |
691 | # with different axis configs will not be combined. | |
692 | # These key names are later dropped when only the values are returned. | |
693 | result[str(axes)][histogram]['axes'] = axes | |
694 | ||
695 | # Collect current values and make sure they are in | |
696 | # integer form. | |
697 | values = [] | |
698 | for value_list in dump['osd'][histogram]['values']: | |
699 | values.append([int(v) for v in value_list]) | |
700 | ||
701 | if mode == 'separated': | |
702 | if 'osds' not in result[str(axes)][histogram]: | |
703 | result[str(axes)][histogram]['osds'] = [] | |
704 | result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values}) | |
705 | ||
706 | elif mode == 'aggregated': | |
707 | # Aggregate values. If 'values' have already been initialized, | |
708 | # we can safely add. | |
709 | if 'values' in result[str(axes)][histogram]: | |
710 | for i in range (0, len(values)): | |
711 | for j in range (0, len(values[i])): | |
712 | values[i][j] += result[str(axes)][histogram]['values'][i][j] | |
713 | ||
714 | # Add the values to result. | |
715 | result[str(axes)][histogram]['values'] = values | |
716 | ||
717 | # Update num_combined_osds | |
718 | if 'num_combined_osds' not in result[str(axes)][histogram]: | |
719 | result[str(axes)][histogram]['num_combined_osds'] = 1 | |
720 | else: | |
721 | result[str(axes)][histogram]['num_combined_osds'] += 1 | |
722 | else: | |
723 | self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode)) | |
724 | return list() | |
725 | ||
726 | # Sometimes, json errors occur if you give it an empty string. | |
727 | # I am also putting in a catch for a KeyError since it could | |
728 | # happen where the code is assuming that a key exists in the | |
729 | # schema when it doesn't. In either case, we'll handle that | |
730 | # by continuing and collecting what we can from other osds. | |
731 | except (json.decoder.JSONDecodeError, KeyError) as e: | |
732 | self.log.exception("Error caught on osd.{}: {}".format(osd_id, e)) | |
733 | continue | |
734 | ||
735 | return list(result.values()) | |
736 | ||
737 | def get_io_rate(self) -> dict: | |
738 | return self.get('io_rate') | |
739 | ||
740 | def get_stats_per_pool(self) -> dict: | |
741 | result = self.get('pg_dump')['pool_stats'] | |
742 | ||
743 | # collect application metadata from osd_map | |
744 | osd_map = self.get('osd_map') | |
745 | application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']} | |
746 | ||
747 | # add application to each pool from pg_dump | |
748 | for pool in result: | |
749 | pool['application'] = [] | |
750 | # Only include default applications | |
751 | for application in application_metadata[pool['poolid']]: | |
752 | if application in ['cephfs', 'mgr', 'rbd', 'rgw']: | |
753 | pool['application'].append(application) | |
754 | ||
755 | return result | |
756 | ||
757 | def get_stats_per_pg(self) -> dict: | |
758 | return self.get('pg_dump')['pg_stats'] | |
759 | ||
760 | def get_rocksdb_stats(self) -> Dict[str, str]: | |
761 | # Initalizers | |
762 | result: Dict[str, str] = defaultdict() | |
763 | version = self.get_rocksdb_version() | |
764 | ||
765 | # Update result | |
766 | result['version'] = version | |
767 | ||
768 | return result | |
769 | ||
770 | def gather_crashinfo(self) -> List[Dict[str, str]]: | |
771 | crashlist: List[Dict[str, str]] = list() | |
772 | errno, crashids, err = self.remote('crash', 'ls') | |
773 | if errno: | |
774 | return crashlist | |
775 | for crashid in crashids.split(): | |
776 | errno, crashinfo, err = self.remote('crash', 'do_info', crashid) | |
777 | if errno: | |
778 | continue | |
779 | c = json.loads(crashinfo) | |
780 | ||
781 | # redact hostname | |
782 | del c['utsname_hostname'] | |
783 | ||
784 | # entity_name might have more than one '.', beware | |
785 | (etype, eid) = c.get('entity_name', '').split('.', 1) | |
786 | m = hashlib.sha1() | |
787 | assert self.salt | |
788 | m.update(self.salt.encode('utf-8')) | |
789 | m.update(eid.encode('utf-8')) | |
790 | m.update(self.salt.encode('utf-8')) | |
791 | c['entity_name'] = etype + '.' + m.hexdigest() | |
792 | ||
793 | # redact final line of python tracebacks, as the exception | |
794 | # payload may contain identifying information | |
795 | if 'mgr_module' in c and 'backtrace' in c: | |
796 | # backtrace might be empty | |
797 | if len(c['backtrace']) > 0: | |
798 | c['backtrace'][-1] = '<redacted>' | |
799 | ||
800 | crashlist.append(c) | |
801 | return crashlist | |
802 | ||
803 | def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]: | |
804 | # Extract perf counter data with get_unlabeled_perf_counters(), a method | |
805 | # from mgr/mgr_module.py. This method returns a nested dictionary that | |
806 | # looks a lot like perf schema, except with some additional fields. | |
807 | # | |
808 | # Example of output, a snapshot of a mon daemon: | |
809 | # "mon.b": { | |
810 | # "bluestore.kv_flush_lat": { | |
811 | # "count": 2431, | |
812 | # "description": "Average kv_thread flush latency", | |
813 | # "nick": "fl_l", | |
814 | # "priority": 8, | |
815 | # "type": 5, | |
816 | # "units": 1, | |
817 | # "value": 88814109 | |
818 | # }, | |
819 | # }, | |
820 | perf_counters = self.get_unlabeled_perf_counters() | |
821 | ||
822 | # Initialize 'result' dict | |
823 | result: Dict[str, dict] = defaultdict(lambda: defaultdict( | |
824 | lambda: defaultdict(lambda: defaultdict(int)))) | |
825 | ||
826 | # 'separated' mode | |
827 | anonymized_daemon_dict = {} | |
828 | ||
829 | for daemon, perf_counters_by_daemon in perf_counters.items(): | |
830 | daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw' | |
831 | ||
832 | if mode == 'separated': | |
833 | # anonymize individual daemon names except osds | |
834 | if (daemon_type != 'osd'): | |
835 | anonymized_daemon = self.anonymize_entity_name(daemon) | |
836 | anonymized_daemon_dict[anonymized_daemon] = daemon | |
837 | daemon = anonymized_daemon | |
838 | ||
839 | # Calculate num combined daemon types if in aggregated mode | |
840 | if mode == 'aggregated': | |
841 | if 'num_combined_daemons' not in result[daemon_type]: | |
842 | result[daemon_type]['num_combined_daemons'] = 1 | |
843 | else: | |
844 | result[daemon_type]['num_combined_daemons'] += 1 | |
845 | ||
846 | for collection in perf_counters_by_daemon: | |
847 | # Split the collection to avoid redundancy in final report; i.e.: | |
848 | # bluestore.kv_flush_lat, bluestore.kv_final_lat --> | |
849 | # bluestore: kv_flush_lat, kv_final_lat | |
850 | col_0, col_1 = collection.split('.') | |
851 | ||
852 | # Debug log for empty keys. This initially was a problem for prioritycache | |
853 | # perf counters, where the col_0 was empty for certain mon counters: | |
854 | # | |
855 | # "mon.a": { instead of "mon.a": { | |
856 | # "": { "prioritycache": { | |
857 | # "cache_bytes": {...}, "cache_bytes": {...}, | |
858 | # | |
859 | # This log is here to detect any future instances of a similar issue. | |
860 | if (daemon == "") or (col_0 == "") or (col_1 == ""): | |
861 | self.log.debug("Instance of an empty key: {}{}".format(daemon, collection)) | |
862 | ||
863 | if mode == 'separated': | |
864 | # Add value to result | |
865 | result[daemon][col_0][col_1]['value'] = \ | |
866 | perf_counters_by_daemon[collection]['value'] | |
867 | ||
868 | # Check that 'count' exists, as not all counters have a count field. | |
869 | if 'count' in perf_counters_by_daemon[collection]: | |
870 | result[daemon][col_0][col_1]['count'] = \ | |
871 | perf_counters_by_daemon[collection]['count'] | |
872 | elif mode == 'aggregated': | |
873 | # Not every rgw daemon has the same schema. Specifically, each rgw daemon | |
874 | # has a uniquely-named collection that starts off identically (i.e. | |
875 | # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw"). | |
876 | # This bit of code combines these unique counters all under one rgw instance. | |
877 | # Without this check, the schema would remain separeted out in the final report. | |
878 | if col_0[0:11] == "objecter-0x": | |
879 | col_0 = "objecter-0x" | |
880 | ||
881 | # Check that the value can be incremented. In some cases, | |
882 | # the files are of type 'pair' (real-integer-pair, integer-integer pair). | |
883 | # In those cases, the value is a dictionary, and not a number. | |
884 | # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"] | |
885 | if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number): | |
886 | result[daemon_type][col_0][col_1]['value'] += \ | |
887 | perf_counters_by_daemon[collection]['value'] | |
888 | ||
889 | # Check that 'count' exists, as not all counters have a count field. | |
890 | if 'count' in perf_counters_by_daemon[collection]: | |
891 | result[daemon_type][col_0][col_1]['count'] += \ | |
892 | perf_counters_by_daemon[collection]['count'] | |
893 | else: | |
894 | self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode)) | |
895 | return {} | |
896 | ||
897 | if mode == 'separated': | |
898 | # for debugging purposes only, this data is never reported | |
899 | self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict)) | |
900 | ||
901 | return result | |
902 | ||
903 | def get_active_channels(self) -> List[str]: | |
904 | r = [] | |
905 | if self.channel_basic: | |
906 | r.append('basic') | |
907 | if self.channel_crash: | |
908 | r.append('crash') | |
909 | if self.channel_device: | |
910 | r.append('device') | |
911 | if self.channel_ident: | |
912 | r.append('ident') | |
913 | if self.channel_perf: | |
914 | r.append('perf') | |
915 | return r | |
916 | ||
917 | def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]: | |
918 | try: | |
919 | time_format = self.remote('devicehealth', 'get_time_format') | |
920 | except Exception as e: | |
921 | self.log.debug('Unable to format time: {}'.format(e)) | |
922 | return {} | |
923 | cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2) | |
924 | min_sample = cutoff.strftime(time_format) | |
925 | ||
926 | devices = self.get('devices')['devices'] | |
927 | if not devices: | |
928 | self.log.debug('Unable to get device info from the mgr.') | |
929 | return {} | |
930 | ||
931 | # anon-host-id -> anon-devid -> { timestamp -> record } | |
932 | res: Dict[str, Dict[str, Dict[str, str]]] = {} | |
933 | for d in devices: | |
934 | devid = d['devid'] | |
935 | try: | |
936 | # this is a map of stamp -> {device info} | |
937 | m = self.remote('devicehealth', 'get_recent_device_metrics', | |
938 | devid, min_sample) | |
939 | except Exception as e: | |
940 | self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e)) | |
941 | continue | |
942 | ||
943 | # anonymize host id | |
944 | try: | |
945 | host = d['location'][0]['host'] | |
946 | except (KeyError, IndexError) as e: | |
947 | self.log.exception('Unable to get host from device with id "{}": {}'.format(devid, e)) | |
948 | continue | |
949 | anon_host = self.get_store('host-id/%s' % host) | |
950 | if not anon_host: | |
951 | anon_host = str(uuid.uuid1()) | |
952 | self.set_store('host-id/%s' % host, anon_host) | |
953 | serial = None | |
954 | for dev, rep in m.items(): | |
955 | rep['host_id'] = anon_host | |
956 | if serial is None and 'serial_number' in rep: | |
957 | serial = rep['serial_number'] | |
958 | ||
959 | # anonymize device id | |
960 | anon_devid = self.get_store('devid-id/%s' % devid) | |
961 | if not anon_devid: | |
962 | # ideally devid is 'vendor_model_serial', | |
963 | # but can also be 'model_serial', 'serial' | |
964 | if '_' in devid: | |
965 | anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}" | |
966 | else: | |
967 | anon_devid = str(uuid.uuid1()) | |
968 | self.set_store('devid-id/%s' % devid, anon_devid) | |
969 | self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid, | |
970 | host, anon_host)) | |
971 | ||
972 | # anonymize the smartctl report itself | |
973 | if serial: | |
974 | m_str = json.dumps(m) | |
975 | m = json.loads(m_str.replace(serial, 'deleted')) | |
976 | ||
977 | if anon_host not in res: | |
978 | res[anon_host] = {} | |
979 | res[anon_host][anon_devid] = m | |
980 | return res | |
981 | ||
982 | def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int: | |
983 | data = self.get_counter(daemon_type, daemon_name, stat)[stat] | |
984 | if data: | |
985 | return data[-1][1] | |
986 | else: | |
987 | return 0 | |
988 | ||
989 | def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]: | |
990 | if not channels: | |
991 | channels = self.get_active_channels() | |
992 | report = { | |
993 | 'leaderboard': self.leaderboard, | |
994 | 'leaderboard_description': self.leaderboard_description, | |
995 | 'report_version': 1, | |
996 | 'report_timestamp': datetime.utcnow().isoformat(), | |
997 | 'report_id': self.report_id, | |
998 | 'channels': channels, | |
999 | 'channels_available': ALL_CHANNELS, | |
1000 | 'license': LICENSE, | |
1001 | 'collections_available': [c['name'].name for c in MODULE_COLLECTION], | |
1002 | 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])], | |
1003 | } | |
1004 | ||
1005 | if 'ident' in channels: | |
1006 | for option in ['description', 'contact', 'organization']: | |
1007 | report[option] = getattr(self, option) | |
1008 | ||
1009 | if 'basic' in channels: | |
1010 | mon_map = self.get('mon_map') | |
1011 | osd_map = self.get('osd_map') | |
1012 | service_map = self.get('service_map') | |
1013 | fs_map = self.get('fs_map') | |
1014 | df = self.get('df') | |
1015 | df_pools = {pool['id']: pool for pool in df['pools']} | |
1016 | ||
1017 | report['created'] = mon_map['created'] | |
1018 | ||
1019 | # mons | |
1020 | v1_mons = 0 | |
1021 | v2_mons = 0 | |
1022 | ipv4_mons = 0 | |
1023 | ipv6_mons = 0 | |
1024 | for mon in mon_map['mons']: | |
1025 | for a in mon['public_addrs']['addrvec']: | |
1026 | if a['type'] == 'v2': | |
1027 | v2_mons += 1 | |
1028 | elif a['type'] == 'v1': | |
1029 | v1_mons += 1 | |
1030 | if a['addr'].startswith('['): | |
1031 | ipv6_mons += 1 | |
1032 | else: | |
1033 | ipv4_mons += 1 | |
1034 | report['mon'] = { | |
1035 | 'count': len(mon_map['mons']), | |
1036 | 'features': mon_map['features'], | |
1037 | 'min_mon_release': mon_map['min_mon_release'], | |
1038 | 'v1_addr_mons': v1_mons, | |
1039 | 'v2_addr_mons': v2_mons, | |
1040 | 'ipv4_addr_mons': ipv4_mons, | |
1041 | 'ipv6_addr_mons': ipv6_mons, | |
1042 | } | |
1043 | ||
1044 | report['config'] = self.gather_configs() | |
1045 | ||
1046 | # pools | |
1047 | ||
1048 | rbd_num_pools = 0 | |
1049 | rbd_num_images_by_pool = [] | |
1050 | rbd_mirroring_by_pool = [] | |
1051 | num_pg = 0 | |
1052 | report['pools'] = list() | |
1053 | for pool in osd_map['pools']: | |
1054 | num_pg += pool['pg_num'] | |
1055 | ec_profile = {} | |
1056 | if pool['erasure_code_profile']: | |
1057 | orig = osd_map['erasure_code_profiles'].get( | |
1058 | pool['erasure_code_profile'], {}) | |
1059 | ec_profile = { | |
1060 | k: orig[k] for k in orig.keys() | |
1061 | if k in ['k', 'm', 'plugin', 'technique', | |
1062 | 'crush-failure-domain', 'l'] | |
1063 | } | |
1064 | pool_data = { | |
1065 | 'pool': pool['pool'], | |
1066 | 'pg_num': pool['pg_num'], | |
1067 | 'pgp_num': pool['pg_placement_num'], | |
1068 | 'size': pool['size'], | |
1069 | 'min_size': pool['min_size'], | |
1070 | 'pg_autoscale_mode': pool['pg_autoscale_mode'], | |
1071 | 'target_max_bytes': pool['target_max_bytes'], | |
1072 | 'target_max_objects': pool['target_max_objects'], | |
1073 | 'type': ['', 'replicated', '', 'erasure'][pool['type']], | |
1074 | 'erasure_code_profile': ec_profile, | |
1075 | 'cache_mode': pool['cache_mode'], | |
1076 | } | |
1077 | ||
1078 | # basic_pool_usage collection | |
1079 | if self.is_enabled_collection(Collection.basic_pool_usage): | |
1080 | pool_data['application'] = [] | |
1081 | for application in pool['application_metadata']: | |
1082 | # Only include default applications | |
1083 | if application in ['cephfs', 'mgr', 'rbd', 'rgw']: | |
1084 | pool_data['application'].append(application) | |
1085 | pool_stats = df_pools[pool['pool']]['stats'] | |
1086 | pool_data['stats'] = { # filter out kb_used | |
1087 | 'avail_raw': pool_stats['avail_raw'], | |
1088 | 'bytes_used': pool_stats['bytes_used'], | |
1089 | 'compress_bytes_used': pool_stats['compress_bytes_used'], | |
1090 | 'compress_under_bytes': pool_stats['compress_under_bytes'], | |
1091 | 'data_bytes_used': pool_stats['data_bytes_used'], | |
1092 | 'dirty': pool_stats['dirty'], | |
1093 | 'max_avail': pool_stats['max_avail'], | |
1094 | 'objects': pool_stats['objects'], | |
1095 | 'omap_bytes_used': pool_stats['omap_bytes_used'], | |
1096 | 'percent_used': pool_stats['percent_used'], | |
1097 | 'quota_bytes': pool_stats['quota_bytes'], | |
1098 | 'quota_objects': pool_stats['quota_objects'], | |
1099 | 'rd': pool_stats['rd'], | |
1100 | 'rd_bytes': pool_stats['rd_bytes'], | |
1101 | 'stored': pool_stats['stored'], | |
1102 | 'stored_data': pool_stats['stored_data'], | |
1103 | 'stored_omap': pool_stats['stored_omap'], | |
1104 | 'stored_raw': pool_stats['stored_raw'], | |
1105 | 'wr': pool_stats['wr'], | |
1106 | 'wr_bytes': pool_stats['wr_bytes'] | |
1107 | } | |
1108 | pool_data['options'] = {} | |
1109 | # basic_pool_options_bluestore collection | |
1110 | if self.is_enabled_collection(Collection.basic_pool_options_bluestore): | |
1111 | bluestore_options = ['compression_algorithm', | |
1112 | 'compression_mode', | |
1113 | 'compression_required_ratio', | |
1114 | 'compression_min_blob_size', | |
1115 | 'compression_max_blob_size'] | |
1116 | for option in bluestore_options: | |
1117 | if option in pool['options']: | |
1118 | pool_data['options'][option] = pool['options'][option] | |
1119 | ||
1120 | # basic_pool_flags collection | |
1121 | if self.is_enabled_collection(Collection.basic_pool_flags): | |
1122 | if 'flags_names' in pool and pool['flags_names'] is not None: | |
1123 | # flags are defined in pg_pool_t (src/osd/osd_types.h) | |
1124 | flags_to_report = [ | |
1125 | 'hashpspool', | |
1126 | 'full', | |
1127 | 'ec_overwrites', | |
1128 | 'incomplete_clones', | |
1129 | 'nodelete', | |
1130 | 'nopgchange', | |
1131 | 'nosizechange', | |
1132 | 'write_fadvise_dontneed', | |
1133 | 'noscrub', | |
1134 | 'nodeep-scrub', | |
1135 | 'full_quota', | |
1136 | 'nearfull', | |
1137 | 'backfillfull', | |
1138 | 'selfmanaged_snaps', | |
1139 | 'pool_snaps', | |
1140 | 'creating', | |
1141 | 'eio', | |
1142 | 'bulk', | |
1143 | 'crimson', | |
1144 | ] | |
1145 | ||
1146 | pool_data['flags_names'] = [flag for flag in pool['flags_names'].split(',') if flag in flags_to_report] | |
1147 | ||
1148 | cast(List[Dict[str, Any]], report['pools']).append(pool_data) | |
1149 | ||
1150 | if 'rbd' in pool['application_metadata']: | |
1151 | rbd_num_pools += 1 | |
1152 | ioctx = self.rados.open_ioctx(pool['pool_name']) | |
1153 | rbd_num_images_by_pool.append( | |
1154 | sum(1 for _ in rbd.RBD().list2(ioctx))) | |
1155 | rbd_mirroring_by_pool.append( | |
1156 | rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED) | |
1157 | report['rbd'] = { | |
1158 | 'num_pools': rbd_num_pools, | |
1159 | 'num_images_by_pool': rbd_num_images_by_pool, | |
1160 | 'mirroring_by_pool': rbd_mirroring_by_pool} | |
1161 | ||
1162 | # osds | |
1163 | cluster_network = False | |
1164 | for osd in osd_map['osds']: | |
1165 | if osd['up'] and not cluster_network: | |
1166 | front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0] | |
1167 | back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0] | |
1168 | if front_ip != back_ip: | |
1169 | cluster_network = True | |
1170 | report['osd'] = { | |
1171 | 'count': len(osd_map['osds']), | |
1172 | 'require_osd_release': osd_map['require_osd_release'], | |
1173 | 'require_min_compat_client': osd_map['require_min_compat_client'], | |
1174 | 'cluster_network': cluster_network, | |
1175 | } | |
1176 | ||
1177 | # crush | |
1178 | report['crush'] = self.gather_crush_info() | |
1179 | ||
1180 | # cephfs | |
1181 | report['fs'] = { | |
1182 | 'count': len(fs_map['filesystems']), | |
1183 | 'feature_flags': fs_map['feature_flags'], | |
1184 | 'num_standby_mds': len(fs_map['standbys']), | |
1185 | 'filesystems': [], | |
1186 | } | |
1187 | num_mds = len(fs_map['standbys']) | |
1188 | for fsm in fs_map['filesystems']: | |
1189 | fs = fsm['mdsmap'] | |
1190 | num_sessions = 0 | |
1191 | cached_ino = 0 | |
1192 | cached_dn = 0 | |
1193 | cached_cap = 0 | |
1194 | subtrees = 0 | |
1195 | rfiles = 0 | |
1196 | rbytes = 0 | |
1197 | rsnaps = 0 | |
1198 | for gid, mds in fs['info'].items(): | |
1199 | num_sessions += self.get_latest('mds', mds['name'], | |
1200 | 'mds_sessions.session_count') | |
1201 | cached_ino += self.get_latest('mds', mds['name'], | |
1202 | 'mds_mem.ino') | |
1203 | cached_dn += self.get_latest('mds', mds['name'], | |
1204 | 'mds_mem.dn') | |
1205 | cached_cap += self.get_latest('mds', mds['name'], | |
1206 | 'mds_mem.cap') | |
1207 | subtrees += self.get_latest('mds', mds['name'], | |
1208 | 'mds.subtrees') | |
1209 | if mds['rank'] == 0: | |
1210 | rfiles = self.get_latest('mds', mds['name'], | |
1211 | 'mds.root_rfiles') | |
1212 | rbytes = self.get_latest('mds', mds['name'], | |
1213 | 'mds.root_rbytes') | |
1214 | rsnaps = self.get_latest('mds', mds['name'], | |
1215 | 'mds.root_rsnaps') | |
1216 | report['fs']['filesystems'].append({ # type: ignore | |
1217 | 'max_mds': fs['max_mds'], | |
1218 | 'ever_allowed_features': fs['ever_allowed_features'], | |
1219 | 'explicitly_allowed_features': fs['explicitly_allowed_features'], | |
1220 | 'num_in': len(fs['in']), | |
1221 | 'num_up': len(fs['up']), | |
1222 | 'num_standby_replay': len( | |
1223 | [mds for gid, mds in fs['info'].items() | |
1224 | if mds['state'] == 'up:standby-replay']), | |
1225 | 'num_mds': len(fs['info']), | |
1226 | 'num_sessions': num_sessions, | |
1227 | 'cached_inos': cached_ino, | |
1228 | 'cached_dns': cached_dn, | |
1229 | 'cached_caps': cached_cap, | |
1230 | 'cached_subtrees': subtrees, | |
1231 | 'balancer_enabled': len(fs['balancer']) > 0, | |
1232 | 'num_data_pools': len(fs['data_pools']), | |
1233 | 'standby_count_wanted': fs['standby_count_wanted'], | |
1234 | 'approx_ctime': fs['created'][0:7], | |
1235 | 'files': rfiles, | |
1236 | 'bytes': rbytes, | |
1237 | 'snaps': rsnaps, | |
1238 | }) | |
1239 | num_mds += len(fs['info']) | |
1240 | report['fs']['total_num_mds'] = num_mds # type: ignore | |
1241 | ||
1242 | # daemons | |
1243 | report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map), | |
1244 | mon=self.gather_mon_metadata(mon_map)) | |
1245 | ||
1246 | if self.is_enabled_collection(Collection.basic_mds_metadata): | |
1247 | report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore | |
1248 | ||
1249 | # host counts | |
1250 | servers = self.list_servers() | |
1251 | self.log.debug('servers %s' % servers) | |
1252 | hosts = { | |
1253 | 'num': len([h for h in servers if h['hostname']]), | |
1254 | } | |
1255 | for t in ['mon', 'mds', 'osd', 'mgr']: | |
1256 | nr_services = sum(1 for host in servers if | |
1257 | any(service for service in cast(List[ServiceInfoT], | |
1258 | host['services']) | |
1259 | if service['type'] == t)) | |
1260 | hosts['num_with_' + t] = nr_services | |
1261 | report['hosts'] = hosts | |
1262 | ||
1263 | report['usage'] = { | |
1264 | 'pools': len(df['pools']), | |
1265 | 'pg_num': num_pg, | |
1266 | 'total_used_bytes': df['stats']['total_used_bytes'], | |
1267 | 'total_bytes': df['stats']['total_bytes'], | |
1268 | 'total_avail_bytes': df['stats']['total_avail_bytes'] | |
1269 | } | |
1270 | # basic_usage_by_class collection | |
1271 | if self.is_enabled_collection(Collection.basic_usage_by_class): | |
1272 | report['usage']['stats_by_class'] = {} # type: ignore | |
1273 | for device_class in df['stats_by_class']: | |
1274 | if device_class in ['hdd', 'ssd', 'nvme']: | |
1275 | report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore | |
1276 | ||
1277 | services: DefaultDict[str, int] = defaultdict(int) | |
1278 | for key, value in service_map['services'].items(): | |
1279 | services[key] += 1 | |
1280 | if key == 'rgw': | |
1281 | rgw = {} | |
1282 | zones = set() | |
1283 | zonegroups = set() | |
1284 | frontends = set() | |
1285 | count = 0 | |
1286 | d = value.get('daemons', dict()) | |
1287 | for k, v in d.items(): | |
1288 | if k == 'summary' and v: | |
1289 | rgw[k] = v | |
1290 | elif isinstance(v, dict) and 'metadata' in v: | |
1291 | count += 1 | |
1292 | zones.add(v['metadata']['zone_id']) | |
1293 | zonegroups.add(v['metadata']['zonegroup_id']) | |
1294 | frontends.add(v['metadata']['frontend_type#0']) | |
1295 | ||
1296 | # we could actually iterate over all the keys of | |
1297 | # the dict and check for how many frontends there | |
1298 | # are, but it is unlikely that one would be running | |
1299 | # more than 2 supported ones | |
1300 | f2 = v['metadata'].get('frontend_type#1', None) | |
1301 | if f2: | |
1302 | frontends.add(f2) | |
1303 | ||
1304 | rgw['count'] = count | |
1305 | rgw['zones'] = len(zones) | |
1306 | rgw['zonegroups'] = len(zonegroups) | |
1307 | rgw['frontends'] = list(frontends) # sets aren't json-serializable | |
1308 | report['rgw'] = rgw | |
1309 | report['services'] = services | |
1310 | ||
1311 | try: | |
1312 | report['balancer'] = self.remote('balancer', 'gather_telemetry') | |
1313 | except ImportError: | |
1314 | report['balancer'] = { | |
1315 | 'active': False | |
1316 | } | |
1317 | ||
1318 | # Rook | |
1319 | self.get_rook_data(report) | |
1320 | ||
1321 | if 'crash' in channels: | |
1322 | report['crashes'] = self.gather_crashinfo() | |
1323 | ||
1324 | if 'perf' in channels: | |
1325 | if self.is_enabled_collection(Collection.perf_perf): | |
1326 | report['perf_counters'] = self.gather_perf_counters('separated') | |
1327 | report['stats_per_pool'] = self.get_stats_per_pool() | |
1328 | report['stats_per_pg'] = self.get_stats_per_pg() | |
1329 | report['io_rate'] = self.get_io_rate() | |
1330 | report['osd_perf_histograms'] = self.get_osd_histograms('separated') | |
1331 | report['mempool'] = self.get_mempool('separated') | |
1332 | report['heap_stats'] = self.get_heap_stats() | |
1333 | report['rocksdb_stats'] = self.get_rocksdb_stats() | |
1334 | ||
1335 | # NOTE: We do not include the 'device' channel in this report; it is | |
1336 | # sent to a different endpoint. | |
1337 | ||
1338 | return report | |
1339 | ||
1340 | def get_rook_data(self, report: Dict[str, object]) -> None: | |
1341 | r, outb, outs = self.mon_command({ | |
1342 | 'prefix': 'config-key dump', | |
1343 | 'format': 'json' | |
1344 | }) | |
1345 | if r != 0: | |
1346 | return | |
1347 | try: | |
1348 | config_kv_dump = json.loads(outb) | |
1349 | except json.decoder.JSONDecodeError: | |
1350 | return | |
1351 | ||
1352 | for elem in ROOK_KEYS_BY_COLLECTION: | |
1353 | # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin") | |
1354 | # elem[1] is the Collection this key belongs to | |
1355 | if self.is_enabled_collection(elem[1]): | |
1356 | self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0])) | |
1357 | ||
1358 | def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None: | |
1359 | last_node = key_path.split('/')[-1] | |
1360 | for node in key_path.split('/')[0:-1]: | |
1361 | if node not in report: | |
1362 | report[node] = {} | |
1363 | report = report[node] # type: ignore | |
1364 | ||
1365 | # sanity check of keys correctness | |
1366 | if not isinstance(report, dict): | |
1367 | self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}") | |
1368 | return | |
1369 | ||
1370 | if last_node in report: | |
1371 | self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point") | |
1372 | return | |
1373 | ||
1374 | report[last_node] = value | |
1375 | ||
1376 | def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]: | |
1377 | self.log.info('Sending %s to: %s' % (what, url)) | |
1378 | proxies = dict() | |
1379 | if self.proxy: | |
1380 | self.log.info('Send using HTTP(S) proxy: %s', self.proxy) | |
1381 | proxies['http'] = self.proxy | |
1382 | proxies['https'] = self.proxy | |
1383 | try: | |
1384 | resp = requests.put(url=url, json=report, proxies=proxies) | |
1385 | resp.raise_for_status() | |
1386 | except Exception as e: | |
1387 | fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e)) | |
1388 | self.log.error(fail_reason) | |
1389 | return fail_reason | |
1390 | return None | |
1391 | ||
1392 | class EndPoint(enum.Enum): | |
1393 | ceph = 'ceph' | |
1394 | device = 'device' | |
1395 | ||
1396 | def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]: | |
1397 | ''' | |
1398 | Find collections that are available in the module, but are not in the db | |
1399 | ''' | |
1400 | if self.db_collection is None: | |
1401 | return None | |
1402 | ||
1403 | if not channels: | |
1404 | channels = ALL_CHANNELS | |
1405 | else: | |
1406 | for ch in channels: | |
1407 | if ch not in ALL_CHANNELS: | |
1408 | self.log.debug(f"invalid channel name: {ch}") | |
1409 | return None | |
1410 | ||
1411 | new_collection : List[Collection] = [] | |
1412 | ||
1413 | for c in MODULE_COLLECTION: | |
1414 | if c['name'].name not in self.db_collection: | |
1415 | if c['channel'] in channels: | |
1416 | new_collection.append(c['name']) | |
1417 | ||
1418 | return new_collection | |
1419 | ||
1420 | def is_major_upgrade(self) -> bool: | |
1421 | ''' | |
1422 | Returns True only if the user last opted-in to an older major | |
1423 | ''' | |
1424 | if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0: | |
1425 | # we do not know what Ceph version was when the user last opted-in, | |
1426 | # thus we do not wish to nag in case of a major upgrade | |
1427 | return False | |
1428 | ||
1429 | mon_map = self.get('mon_map') | |
1430 | mon_min = mon_map.get("min_mon_release", 0) | |
1431 | ||
1432 | if mon_min - self.last_opted_in_ceph_version > 0: | |
1433 | self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}") | |
1434 | return True | |
1435 | ||
1436 | return False | |
1437 | ||
1438 | def is_opted_in(self) -> bool: | |
1439 | # If len is 0 it means that the user is either opted-out (never | |
1440 | # opted-in, or invoked `telemetry off`), or they upgraded from a | |
1441 | # telemetry revision 1 or 2, which required to re-opt in to revision 3, | |
1442 | # regardless, hence is considered as opted-out | |
1443 | if self.db_collection is None: | |
1444 | return False | |
1445 | return len(self.db_collection) > 0 | |
1446 | ||
1447 | def should_nag(self) -> bool: | |
1448 | # Find delta between opted-in collections and module collections; | |
1449 | # nag only if module has a collection which is not in db, and nag == True. | |
1450 | ||
1451 | # We currently do not nag if the user is opted-out (or never opted-in). | |
1452 | # If we wish to do this in the future, we need to have a tri-mode state | |
1453 | # (opted in, opted out, no action yet), and it needs to be guarded by a | |
1454 | # config option (so that nagging can be turned off via config). | |
1455 | # We also need to add a last_opted_out_ceph_version variable, for the | |
1456 | # major upgrade check. | |
1457 | ||
1458 | # check if there are collections the user is not opt-in to | |
1459 | # that we should nag about | |
1460 | if self.db_collection is not None: | |
1461 | for c in MODULE_COLLECTION: | |
1462 | if c['name'].name not in self.db_collection: | |
1463 | if c['nag'] == True: | |
1464 | self.log.debug(f"The collection: {c['name']} is not reported") | |
1465 | return True | |
1466 | ||
1467 | # user might be opted-in to the most recent collection, or there is no | |
1468 | # new collection which requires nagging about; thus nag in case it's a | |
1469 | # major upgrade and there are new collections | |
1470 | # (which their own nag == False): | |
1471 | new_collections = False | |
1472 | col_delta = self.collection_delta() | |
1473 | if col_delta is not None and len(col_delta) > 0: | |
1474 | new_collections = True | |
1475 | ||
1476 | return self.is_major_upgrade() and new_collections | |
1477 | ||
1478 | def init_collection(self) -> None: | |
1479 | # We fetch from db the collections the user had already opted-in to. | |
1480 | # During the transition the results will be empty, but the user might | |
1481 | # be opted-in to an older version (e.g. revision = 3) | |
1482 | ||
1483 | collection = self.get_store('collection') | |
1484 | ||
1485 | if collection is not None: | |
1486 | self.db_collection = json.loads(collection) | |
1487 | ||
1488 | if self.db_collection is None: | |
1489 | # happens once on upgrade | |
1490 | if not self.enabled: | |
1491 | # user is not opted-in | |
1492 | self.set_store('collection', json.dumps([])) | |
1493 | self.log.debug("user is not opted-in") | |
1494 | else: | |
1495 | # user is opted-in, verify the revision: | |
1496 | if self.last_opt_revision == REVISION: | |
1497 | self.log.debug(f"telemetry revision is {REVISION}") | |
1498 | base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name] | |
1499 | self.set_store('collection', json.dumps(base_collection)) | |
1500 | else: | |
1501 | # user is opted-in to an older version, meaning they need | |
1502 | # to re-opt in regardless | |
1503 | self.set_store('collection', json.dumps([])) | |
1504 | self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in") | |
1505 | ||
1506 | # reload collection after setting | |
1507 | collection = self.get_store('collection') | |
1508 | if collection is not None: | |
1509 | self.db_collection = json.loads(collection) | |
1510 | else: | |
1511 | raise RuntimeError('collection is None after initial setting') | |
1512 | else: | |
1513 | # user has already upgraded | |
1514 | self.log.debug(f"user has upgraded already: collection: {self.db_collection}") | |
1515 | ||
1516 | def is_enabled_collection(self, collection: Collection) -> bool: | |
1517 | if self.db_collection is None: | |
1518 | return False | |
1519 | return collection.name in self.db_collection | |
1520 | ||
1521 | def opt_in_all_collections(self) -> None: | |
1522 | """ | |
1523 | Opt-in to all collections; Update db with the currently available collections in the module | |
1524 | """ | |
1525 | if self.db_collection is None: | |
1526 | raise RuntimeError('db_collection is None after initial setting') | |
1527 | ||
1528 | for c in MODULE_COLLECTION: | |
1529 | if c['name'].name not in self.db_collection: | |
1530 | self.db_collection.append(c['name']) | |
1531 | ||
1532 | self.set_store('collection', json.dumps(self.db_collection)) | |
1533 | ||
1534 | def send(self, | |
1535 | report: Dict[str, Dict[str, str]], | |
1536 | endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]: | |
1537 | if not endpoint: | |
1538 | endpoint = [self.EndPoint.ceph, self.EndPoint.device] | |
1539 | failed = [] | |
1540 | success = [] | |
1541 | self.log.debug('Send endpoints %s' % endpoint) | |
1542 | for e in endpoint: | |
1543 | if e == self.EndPoint.ceph: | |
1544 | fail_reason = self._try_post('ceph report', self.url, report) | |
1545 | if fail_reason: | |
1546 | failed.append(fail_reason) | |
1547 | else: | |
1548 | now = int(time.time()) | |
1549 | self.last_upload = now | |
1550 | self.set_store('last_upload', str(now)) | |
1551 | success.append('Ceph report sent to {0}'.format(self.url)) | |
1552 | self.log.info('Sent report to {0}'.format(self.url)) | |
1553 | elif e == self.EndPoint.device: | |
1554 | if 'device' in self.get_active_channels(): | |
1555 | devices = self.gather_device_report() | |
1556 | if devices: | |
1557 | num_devs = 0 | |
1558 | num_hosts = 0 | |
1559 | for host, ls in devices.items(): | |
1560 | self.log.debug('host %s devices %s' % (host, ls)) | |
1561 | if not len(ls): | |
1562 | continue | |
1563 | fail_reason = self._try_post('devices', self.device_url, | |
1564 | ls) | |
1565 | if fail_reason: | |
1566 | failed.append(fail_reason) | |
1567 | else: | |
1568 | num_devs += len(ls) | |
1569 | num_hosts += 1 | |
1570 | if num_devs: | |
1571 | success.append('Reported %d devices from %d hosts across a total of %d hosts' % ( | |
1572 | num_devs, num_hosts, len(devices))) | |
1573 | else: | |
1574 | fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.' | |
1575 | failed.append(fail_reason) | |
1576 | self.log.error(fail_reason) | |
1577 | if failed: | |
1578 | return 1, '', '\n'.join(success + failed) | |
1579 | return 0, '', '\n'.join(success) | |
1580 | ||
1581 | def format_perf_histogram(self, report: Dict[str, Any]) -> None: | |
1582 | # Formatting the perf histograms so they are human-readable. This will change the | |
1583 | # ranges and values, which are currently in list form, into strings so that | |
1584 | # they are displayed horizontally instead of vertically. | |
1585 | if 'report' in report: | |
1586 | report = report['report'] | |
1587 | try: | |
1588 | # Formatting ranges and values in osd_perf_histograms | |
1589 | mode = 'osd_perf_histograms' | |
1590 | for config in report[mode]: | |
1591 | for histogram in config: | |
1592 | # Adjust ranges by converting lists into strings | |
1593 | for axis in config[histogram]['axes']: | |
1594 | for i in range(0, len(axis['ranges'])): | |
1595 | axis['ranges'][i] = str(axis['ranges'][i]) | |
1596 | ||
1597 | for osd in config[histogram]['osds']: | |
1598 | for i in range(0, len(osd['values'])): | |
1599 | osd['values'][i] = str(osd['values'][i]) | |
1600 | except KeyError: | |
1601 | # If the perf channel is not enabled, there should be a KeyError since | |
1602 | # 'osd_perf_histograms' would not be present in the report. In that case, | |
1603 | # the show function should pass as usual without trying to format the | |
1604 | # histograms. | |
1605 | pass | |
1606 | ||
1607 | def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1608 | ''' | |
1609 | Enable or disable a list of channels | |
1610 | ''' | |
1611 | if not self.enabled: | |
1612 | # telemetry should be on for channels to be toggled | |
1613 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1614 | 'Preview sample reports with `ceph telemetry preview`.' | |
1615 | return 0, msg, '' | |
1616 | ||
1617 | if channels is None: | |
1618 | msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.' | |
1619 | return 0, msg, '' | |
1620 | ||
1621 | state = action == 'enable' | |
1622 | msg = '' | |
1623 | for c in channels: | |
1624 | if c not in ALL_CHANNELS: | |
1625 | msg = f"{msg}{c} is not a valid channel name. "\ | |
1626 | f"Available channels: {ALL_CHANNELS}.\n" | |
1627 | else: | |
1628 | self.set_module_option(f"channel_{c}", state) | |
1629 | setattr(self, | |
1630 | f"channel_{c}", | |
1631 | state) | |
1632 | msg = f"{msg}channel_{c} is {action}d\n" | |
1633 | ||
1634 | return 0, msg, '' | |
1635 | ||
1636 | @CLIReadCommand('telemetry status') | |
1637 | def status(self) -> Tuple[int, str, str]: | |
1638 | ''' | |
1639 | Show current configuration | |
1640 | ''' | |
1641 | r = {} | |
1642 | for opt in self.MODULE_OPTIONS: | |
1643 | r[opt['name']] = getattr(self, opt['name']) | |
1644 | r['last_upload'] = (time.ctime(self.last_upload) | |
1645 | if self.last_upload else self.last_upload) | |
1646 | return 0, json.dumps(r, indent=4, sort_keys=True), '' | |
1647 | ||
1648 | @CLIReadCommand('telemetry diff') | |
1649 | def diff(self) -> Tuple[int, str, str]: | |
1650 | ''' | |
1651 | Show the diff between opted-in collection and available collection | |
1652 | ''' | |
1653 | diff = [] | |
1654 | keys = ['nag'] | |
1655 | ||
1656 | for c in MODULE_COLLECTION: | |
1657 | if not self.is_enabled_collection(c['name']): | |
1658 | diff.append({key: val for key, val in c.items() if key not in keys}) | |
1659 | ||
1660 | r = None | |
1661 | if diff == []: | |
1662 | r = "Telemetry is up to date" | |
1663 | else: | |
1664 | r = json.dumps(diff, indent=4, sort_keys=True) | |
1665 | ||
1666 | return 0, r, '' | |
1667 | ||
1668 | @CLICommand('telemetry on') | |
1669 | def on(self, license: Optional[str] = None) -> Tuple[int, str, str]: | |
1670 | ''' | |
1671 | Enable telemetry reports from this cluster | |
1672 | ''' | |
1673 | if license != LICENSE: | |
1674 | return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}). | |
1675 | To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.''' | |
1676 | else: | |
1677 | self.set_module_option('enabled', True) | |
1678 | self.enabled = True | |
1679 | self.opt_in_all_collections() | |
1680 | ||
1681 | # for major releases upgrade nagging | |
1682 | mon_map = self.get('mon_map') | |
1683 | mon_min = mon_map.get("min_mon_release", 0) | |
1684 | self.set_store('last_opted_in_ceph_version', str(mon_min)) | |
1685 | self.last_opted_in_ceph_version = mon_min | |
1686 | ||
1687 | msg = 'Telemetry is on.' | |
1688 | disabled_channels = '' | |
1689 | active_channels = self.get_active_channels() | |
1690 | for c in ALL_CHANNELS: | |
1691 | if c not in active_channels and c != 'ident': | |
1692 | disabled_channels = f"{disabled_channels} {c}" | |
1693 | ||
1694 | if len(disabled_channels) > 0: | |
1695 | msg = f"{msg}\nSome channels are disabled, please enable with:\n"\ | |
1696 | f"`ceph telemetry enable channel{disabled_channels}`" | |
1697 | ||
1698 | # wake up serve() to reset health warning | |
1699 | self.event.set() | |
1700 | ||
1701 | return 0, msg, '' | |
1702 | ||
1703 | @CLICommand('telemetry off') | |
1704 | def off(self) -> Tuple[int, str, str]: | |
1705 | ''' | |
1706 | Disable telemetry reports from this cluster | |
1707 | ''' | |
1708 | if not self.enabled: | |
1709 | # telemetry is already off | |
1710 | msg = 'Telemetry is currently not enabled, nothing to turn off. '\ | |
1711 | 'Please consider opting-in with `ceph telemetry on`.\n' \ | |
1712 | 'Preview sample reports with `ceph telemetry preview`.' | |
1713 | return 0, msg, '' | |
1714 | ||
1715 | self.set_module_option('enabled', False) | |
1716 | self.enabled = False | |
1717 | self.set_store('collection', json.dumps([])) | |
1718 | self.db_collection = [] | |
1719 | ||
1720 | # we might need this info in the future, in case | |
1721 | # of nagging when user is opted-out | |
1722 | mon_map = self.get('mon_map') | |
1723 | mon_min = mon_map.get("min_mon_release", 0) | |
1724 | self.set_store('last_opted_out_ceph_version', str(mon_min)) | |
1725 | self.last_opted_out_ceph_version = mon_min | |
1726 | ||
1727 | msg = 'Telemetry is now disabled.' | |
1728 | return 0, msg, '' | |
1729 | ||
1730 | @CLIReadCommand('telemetry enable channel all') | |
1731 | def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]: | |
1732 | ''' | |
1733 | Enable all channels | |
1734 | ''' | |
1735 | return self.toggle_channel('enable', channels) | |
1736 | ||
1737 | @CLIReadCommand('telemetry enable channel') | |
1738 | def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1739 | ''' | |
1740 | Enable a list of channels | |
1741 | ''' | |
1742 | return self.toggle_channel('enable', channels) | |
1743 | ||
1744 | @CLIReadCommand('telemetry disable channel all') | |
1745 | def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]: | |
1746 | ''' | |
1747 | Disable all channels | |
1748 | ''' | |
1749 | return self.toggle_channel('disable', channels) | |
1750 | ||
1751 | @CLIReadCommand('telemetry disable channel') | |
1752 | def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1753 | ''' | |
1754 | Disable a list of channels | |
1755 | ''' | |
1756 | return self.toggle_channel('disable', channels) | |
1757 | ||
1758 | @CLIReadCommand('telemetry channel ls') | |
1759 | def channel_ls(self) -> Tuple[int, str, str]: | |
1760 | ''' | |
1761 | List all channels | |
1762 | ''' | |
1763 | table = PrettyTable( | |
1764 | [ | |
1765 | 'NAME', 'ENABLED', 'DEFAULT', 'DESC', | |
1766 | ], | |
1767 | border=False) | |
1768 | table.align['NAME'] = 'l' | |
1769 | table.align['ENABLED'] = 'l' | |
1770 | table.align['DEFAULT'] = 'l' | |
1771 | table.align['DESC'] = 'l' | |
1772 | table.left_padding_width = 0 | |
1773 | table.right_padding_width = 4 | |
1774 | ||
1775 | for c in ALL_CHANNELS: | |
1776 | enabled = "ON" if getattr(self, f"channel_{c}") else "OFF" | |
1777 | for o in self.MODULE_OPTIONS: | |
1778 | if o['name'] == f"channel_{c}": | |
1779 | default = "ON" if o.get('default', None) else "OFF" | |
1780 | desc = o.get('desc', None) | |
1781 | ||
1782 | table.add_row(( | |
1783 | c, | |
1784 | enabled, | |
1785 | default, | |
1786 | desc, | |
1787 | )) | |
1788 | ||
1789 | return 0, table.get_string(sortby="NAME"), '' | |
1790 | ||
1791 | @CLIReadCommand('telemetry collection ls') | |
1792 | def collection_ls(self) -> Tuple[int, str, str]: | |
1793 | ''' | |
1794 | List all collections | |
1795 | ''' | |
1796 | col_delta = self.collection_delta() | |
1797 | msg = '' | |
1798 | if col_delta is not None and len(col_delta) > 0: | |
1799 | msg = f"New collections are available:\n" \ | |
1800 | f"{sorted([c.name for c in col_delta])}\n" \ | |
1801 | f"Run `ceph telemetry on` to opt-in to these collections.\n" | |
1802 | ||
1803 | table = PrettyTable( | |
1804 | [ | |
1805 | 'NAME', 'STATUS', 'DESC', | |
1806 | ], | |
1807 | border=False) | |
1808 | table.align['NAME'] = 'l' | |
1809 | table.align['STATUS'] = 'l' | |
1810 | table.align['DESC'] = 'l' | |
1811 | table.left_padding_width = 0 | |
1812 | table.right_padding_width = 4 | |
1813 | ||
1814 | for c in MODULE_COLLECTION: | |
1815 | name = c['name'] | |
1816 | opted_in = self.is_enabled_collection(name) | |
1817 | channel_enabled = getattr(self, f"channel_{c['channel']}") | |
1818 | ||
1819 | status = '' | |
1820 | if channel_enabled and opted_in: | |
1821 | status = "REPORTING" | |
1822 | else: | |
1823 | why = '' | |
1824 | delimiter = '' | |
1825 | ||
1826 | if not opted_in: | |
1827 | why += "NOT OPTED-IN" | |
1828 | delimiter = ', ' | |
1829 | if not channel_enabled: | |
1830 | why += f"{delimiter}CHANNEL {c['channel']} IS OFF" | |
1831 | ||
1832 | status = f"NOT REPORTING: {why}" | |
1833 | ||
1834 | desc = c['description'] | |
1835 | ||
1836 | table.add_row(( | |
1837 | name, | |
1838 | status, | |
1839 | desc, | |
1840 | )) | |
1841 | ||
1842 | if len(msg): | |
1843 | # add a new line between message and table output | |
1844 | msg = f"{msg} \n" | |
1845 | ||
1846 | return 0, f'{msg}{table.get_string(sortby="NAME")}', '' | |
1847 | ||
1848 | @CLICommand('telemetry send') | |
1849 | def do_send(self, | |
1850 | endpoint: Optional[List[EndPoint]] = None, | |
1851 | license: Optional[str] = None) -> Tuple[int, str, str]: | |
1852 | ''' | |
1853 | Send a sample report | |
1854 | ''' | |
1855 | if not self.is_opted_in() and license != LICENSE: | |
1856 | self.log.debug(('A telemetry send attempt while opted-out. ' | |
1857 | 'Asking for license agreement')) | |
1858 | return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}). | |
1859 | To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command. | |
1860 | Please consider enabling the telemetry module with 'ceph telemetry on'.''' | |
1861 | else: | |
1862 | self.last_report = self.compile_report() | |
1863 | return self.send(self.last_report, endpoint) | |
1864 | ||
1865 | @CLIReadCommand('telemetry show') | |
1866 | def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1867 | ''' | |
1868 | Show a sample report of opted-in collections (except for 'device') | |
1869 | ''' | |
1870 | if not self.enabled: | |
1871 | # if telemetry is off, no report is being sent, hence nothing to show | |
1872 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1873 | 'Preview sample reports with `ceph telemetry preview`.' | |
1874 | return 0, msg, '' | |
1875 | ||
1876 | report = self.get_report_locked(channels=channels) | |
1877 | self.format_perf_histogram(report) | |
1878 | report = json.dumps(report, indent=4, sort_keys=True) | |
1879 | ||
1880 | if self.channel_device: | |
1881 | report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.''' | |
1882 | ||
1883 | return 0, report, '' | |
1884 | ||
1885 | @CLIReadCommand('telemetry preview') | |
1886 | def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: | |
1887 | ''' | |
1888 | Preview a sample report of the most recent collections available (except for 'device') | |
1889 | ''' | |
1890 | report = {} | |
1891 | ||
1892 | # We use a lock to prevent a scenario where the user wishes to preview | |
1893 | # the report, and at the same time the module hits the interval of | |
1894 | # sending a report with the opted-in collection, which has less data | |
1895 | # than in the preview report. | |
1896 | col_delta = self.collection_delta() | |
1897 | with self.get_report_lock: | |
1898 | if col_delta is not None and len(col_delta) == 0: | |
1899 | # user is already opted-in to the most recent collection | |
1900 | msg = 'Telemetry is up to date, see report with `ceph telemetry show`.' | |
1901 | return 0, msg, '' | |
1902 | else: | |
1903 | # there are collections the user is not opted-in to | |
1904 | next_collection = [] | |
1905 | ||
1906 | for c in MODULE_COLLECTION: | |
1907 | next_collection.append(c['name'].name) | |
1908 | ||
1909 | opted_in_collection = self.db_collection | |
1910 | self.db_collection = next_collection | |
1911 | report = self.get_report(channels=channels) | |
1912 | self.db_collection = opted_in_collection | |
1913 | ||
1914 | self.format_perf_histogram(report) | |
1915 | report = json.dumps(report, indent=4, sort_keys=True) | |
1916 | ||
1917 | if self.channel_device: | |
1918 | report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.''' | |
1919 | ||
1920 | return 0, report, '' | |
1921 | ||
1922 | @CLIReadCommand('telemetry show-device') | |
1923 | def show_device(self) -> Tuple[int, str, str]: | |
1924 | ''' | |
1925 | Show a sample device report | |
1926 | ''' | |
1927 | if not self.enabled: | |
1928 | # if telemetry is off, no report is being sent, hence nothing to show | |
1929 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1930 | 'Preview sample device reports with `ceph telemetry preview-device`.' | |
1931 | return 0, msg, '' | |
1932 | ||
1933 | if not self.channel_device: | |
1934 | # if device channel is off, device report is not being sent, hence nothing to show | |
1935 | msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \ | |
1936 | 'Preview sample device reports with `ceph telemetry preview-device`.' | |
1937 | return 0, msg, '' | |
1938 | ||
1939 | return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), '' | |
1940 | ||
1941 | @CLIReadCommand('telemetry preview-device') | |
1942 | def preview_device(self) -> Tuple[int, str, str]: | |
1943 | ''' | |
1944 | Preview a sample device report of the most recent device collection | |
1945 | ''' | |
1946 | report = {} | |
1947 | ||
1948 | device_col_delta = self.collection_delta(['device']) | |
1949 | with self.get_report_lock: | |
1950 | if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device: | |
1951 | # user is already opted-in to the most recent device collection, | |
1952 | # and device channel is on, thus `show-device` should be called | |
1953 | msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.' | |
1954 | return 0, msg, '' | |
1955 | ||
1956 | # either the user is not opted-in at all, or there are collections | |
1957 | # they are not opted-in to | |
1958 | next_collection = [] | |
1959 | ||
1960 | for c in MODULE_COLLECTION: | |
1961 | next_collection.append(c['name'].name) | |
1962 | ||
1963 | opted_in_collection = self.db_collection | |
1964 | self.db_collection = next_collection | |
1965 | report = self.get_report('device') | |
1966 | self.db_collection = opted_in_collection | |
1967 | ||
1968 | report = json.dumps(report, indent=4, sort_keys=True) | |
1969 | return 0, report, '' | |
1970 | ||
1971 | @CLIReadCommand('telemetry show-all') | |
1972 | def show_all(self) -> Tuple[int, str, str]: | |
1973 | ''' | |
1974 | Show a sample report of all enabled channels (including 'device' channel) | |
1975 | ''' | |
1976 | if not self.enabled: | |
1977 | # if telemetry is off, no report is being sent, hence nothing to show | |
1978 | msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ | |
1979 | 'Preview sample reports with `ceph telemetry preview`.' | |
1980 | return 0, msg, '' | |
1981 | ||
1982 | if not self.channel_device: | |
1983 | # device channel is off, no need to display its report | |
1984 | report = self.get_report_locked('default') | |
1985 | else: | |
1986 | # telemetry is on and device channel is enabled, show both | |
1987 | report = self.get_report_locked('all') | |
1988 | ||
1989 | self.format_perf_histogram(report) | |
1990 | return 0, json.dumps(report, indent=4, sort_keys=True), '' | |
1991 | ||
1992 | @CLIReadCommand('telemetry preview-all') | |
1993 | def preview_all(self) -> Tuple[int, str, str]: | |
1994 | ''' | |
1995 | Preview a sample report of the most recent collections available of all channels (including 'device') | |
1996 | ''' | |
1997 | report = {} | |
1998 | ||
1999 | col_delta = self.collection_delta() | |
2000 | with self.get_report_lock: | |
2001 | if col_delta is not None and len(col_delta) == 0: | |
2002 | # user is already opted-in to the most recent collection | |
2003 | msg = 'Telemetry is up to date, see report with `ceph telemetry show`.' | |
2004 | return 0, msg, '' | |
2005 | ||
2006 | # there are collections the user is not opted-in to | |
2007 | next_collection = [] | |
2008 | ||
2009 | for c in MODULE_COLLECTION: | |
2010 | next_collection.append(c['name'].name) | |
2011 | ||
2012 | opted_in_collection = self.db_collection | |
2013 | self.db_collection = next_collection | |
2014 | report = self.get_report('all') | |
2015 | self.db_collection = opted_in_collection | |
2016 | ||
2017 | self.format_perf_histogram(report) | |
2018 | report = json.dumps(report, indent=4, sort_keys=True) | |
2019 | ||
2020 | return 0, report, '' | |
2021 | ||
2022 | def get_report_locked(self, | |
2023 | report_type: str = 'default', | |
2024 | channels: Optional[List[str]] = None) -> Dict[str, Any]: | |
2025 | ''' | |
2026 | A wrapper around get_report to allow for compiling a report of the most recent module collections | |
2027 | ''' | |
2028 | with self.get_report_lock: | |
2029 | return self.get_report(report_type, channels) | |
2030 | ||
2031 | def get_report(self, | |
2032 | report_type: str = 'default', | |
2033 | channels: Optional[List[str]] = None) -> Dict[str, Any]: | |
2034 | if report_type == 'default': | |
2035 | return self.compile_report(channels=channels) | |
2036 | elif report_type == 'device': | |
2037 | return self.gather_device_report() | |
2038 | elif report_type == 'all': | |
2039 | return {'report': self.compile_report(channels=channels), | |
2040 | 'device_report': self.gather_device_report()} | |
2041 | return {} | |
2042 | ||
2043 | def self_test(self) -> None: | |
2044 | self.opt_in_all_collections() | |
2045 | report = self.compile_report(channels=ALL_CHANNELS) | |
2046 | if len(report) == 0: | |
2047 | raise RuntimeError('Report is empty') | |
2048 | ||
2049 | if 'report_id' not in report: | |
2050 | raise RuntimeError('report_id not found in report') | |
2051 | ||
2052 | def shutdown(self) -> None: | |
2053 | self.run = False | |
2054 | self.event.set() | |
2055 | ||
2056 | def refresh_health_checks(self) -> None: | |
2057 | health_checks = {} | |
2058 | # TODO do we want to nag also in case the user is not opted-in? | |
2059 | if self.enabled and self.should_nag(): | |
2060 | health_checks['TELEMETRY_CHANGED'] = { | |
2061 | 'severity': 'warning', | |
2062 | 'summary': 'Telemetry requires re-opt-in', | |
2063 | 'detail': [ | |
2064 | 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`' | |
2065 | ] | |
2066 | } | |
2067 | self.set_health_checks(health_checks) | |
2068 | ||
2069 | def serve(self) -> None: | |
2070 | self.load() | |
2071 | self.run = True | |
2072 | ||
2073 | self.log.debug('Waiting for mgr to warm up') | |
2074 | time.sleep(10) | |
2075 | ||
2076 | while self.run: | |
2077 | self.event.clear() | |
2078 | ||
2079 | self.refresh_health_checks() | |
2080 | ||
2081 | if not self.is_opted_in(): | |
2082 | self.log.debug('Not sending report until user re-opts-in') | |
2083 | self.event.wait(1800) | |
2084 | continue | |
2085 | if not self.enabled: | |
2086 | self.log.debug('Not sending report until configured to do so') | |
2087 | self.event.wait(1800) | |
2088 | continue | |
2089 | ||
2090 | now = int(time.time()) | |
2091 | if not self.last_upload or \ | |
2092 | (now - self.last_upload) > self.interval * 3600: | |
2093 | self.log.info('Compiling and sending report to %s', | |
2094 | self.url) | |
2095 | ||
2096 | try: | |
2097 | self.last_report = self.compile_report() | |
2098 | except Exception: | |
2099 | self.log.exception('Exception while compiling report:') | |
2100 | ||
2101 | self.send(self.last_report) | |
2102 | else: | |
2103 | self.log.debug('Interval for sending new report has not expired') | |
2104 | ||
2105 | sleep = 3600 | |
2106 | self.log.debug('Sleeping for %d seconds', sleep) | |
2107 | self.event.wait(sleep) | |
2108 | ||
2109 | @staticmethod | |
2110 | def can_run() -> Tuple[bool, str]: | |
2111 | return True, '' |