]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import errno
8 import hashlib
9 import json
10 import rbd
11 import re
12 import requests
13 import uuid
14 import time
15 from datetime import datetime, timedelta
16 from threading import Event
17 from collections import defaultdict
18
19 from mgr_module import MgrModule
20
21
22 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
23
24 LICENSE='sharing-1-0'
25 LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
26 LICENSE_URL='https://cdla.io/sharing-1-0/'
27
28 # If the telemetry revision has changed since this point, re-require
29 # an opt-in. This should happen each time we add new information to
30 # the telemetry report.
31 LAST_REVISION_RE_OPT_IN = 2
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Module(MgrModule):
63 config = dict()
64
65 metadata_keys = [
66 "arch",
67 "ceph_version",
68 "os",
69 "cpu",
70 "kernel_description",
71 "kernel_version",
72 "distro_description",
73 "distro"
74 ]
75
76 MODULE_OPTIONS = [
77 {
78 'name': 'url',
79 'type': 'str',
80 'default': 'https://telemetry.ceph.com/report'
81 },
82 {
83 'name': 'device_url',
84 'type': 'str',
85 'default': 'https://telemetry.ceph.com/device'
86 },
87 {
88 'name': 'enabled',
89 'type': 'bool',
90 'default': False
91 },
92 {
93 'name': 'last_opt_revision',
94 'type': 'int',
95 'default': 1,
96 },
97 {
98 'name': 'leaderboard',
99 'type': 'bool',
100 'default': False
101 },
102 {
103 'name': 'description',
104 'type': 'str',
105 'default': None
106 },
107 {
108 'name': 'contact',
109 'type': 'str',
110 'default': None
111 },
112 {
113 'name': 'organization',
114 'type': 'str',
115 'default': None
116 },
117 {
118 'name': 'proxy',
119 'type': 'str',
120 'default': None
121 },
122 {
123 'name': 'interval',
124 'type': 'int',
125 'default': 24,
126 'min': 8
127 },
128 {
129 'name': 'channel_basic',
130 'type': 'bool',
131 'default': True,
132 'desc': 'Share basic cluster information (size, version)',
133 },
134 {
135 'name': 'channel_ident',
136 'type': 'bool',
137 'default': False,
138 'description': 'Share a user-provided description and/or contact email for the cluster',
139 },
140 {
141 'name': 'channel_crash',
142 'type': 'bool',
143 'default': True,
144 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
145 },
146 {
147 'name': 'channel_device',
148 'type': 'bool',
149 'default': True,
150 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
151 },
152 ]
153
154 COMMANDS = [
155 {
156 "cmd": "telemetry status",
157 "desc": "Show current configuration",
158 "perm": "r"
159 },
160 {
161 "cmd": "telemetry send "
162 "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false "
163 "name=license,type=CephString,req=false",
164 "desc": "Force sending data to Ceph telemetry",
165 "perm": "rw"
166 },
167 {
168 "cmd": "telemetry show "
169 "name=channels,type=CephString,n=N,req=False",
170 "desc": "Show last report or report to be sent",
171 "perm": "r"
172 },
173 {
174 "cmd": "telemetry show-device",
175 "desc": "Show last device report or device report to be sent",
176 "perm": "r"
177 },
178 {
179 "cmd": "telemetry show-all",
180 "desc": "Show report of all channels",
181 "perm": "r"
182 },
183 {
184 "cmd": "telemetry on name=license,type=CephString,req=false",
185 "desc": "Enable telemetry reports from this cluster",
186 "perm": "rw",
187 },
188 {
189 "cmd": "telemetry off",
190 "desc": "Disable telemetry reports from this cluster",
191 "perm": "rw",
192 },
193 ]
194
195 @property
196 def config_keys(self):
197 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
198
199 def __init__(self, *args, **kwargs):
200 super(Module, self).__init__(*args, **kwargs)
201 self.event = Event()
202 self.run = False
203 self.last_upload = None
204 self.last_report = dict()
205 self.report_id = None
206 self.salt = None
207
208 def config_notify(self):
209 for opt in self.MODULE_OPTIONS:
210 setattr(self,
211 opt['name'],
212 self.get_module_option(opt['name']))
213 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
214 # wake up serve() thread
215 self.event.set()
216
217 def load(self):
218 self.last_upload = self.get_store('last_upload', None)
219 if self.last_upload is not None:
220 self.last_upload = int(self.last_upload)
221
222 self.report_id = self.get_store('report_id', None)
223 if self.report_id is None:
224 self.report_id = str(uuid.uuid4())
225 self.set_store('report_id', self.report_id)
226
227 self.salt = self.get_store('salt', None)
228 if not self.salt:
229 self.salt = str(uuid.uuid4())
230 self.set_store('salt', self.salt)
231
232 def gather_osd_metadata(self, osd_map):
233 keys = ["osd_objectstore", "rotational"]
234 keys += self.metadata_keys
235
236 metadata = dict()
237 for key in keys:
238 metadata[key] = defaultdict(int)
239
240 for osd in osd_map['osds']:
241 res = self.get_metadata('osd', str(osd['osd'])).items()
242 if res is None:
243 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
244 continue
245 for k, v in res:
246 if k not in keys:
247 continue
248
249 metadata[k][v] += 1
250
251 return metadata
252
253 def gather_mon_metadata(self, mon_map):
254 keys = list()
255 keys += self.metadata_keys
256
257 metadata = dict()
258 for key in keys:
259 metadata[key] = defaultdict(int)
260
261 for mon in mon_map['mons']:
262 res = self.get_metadata('mon', mon['name']).items()
263 if res is None:
264 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
265 continue
266 for k, v in res:
267 if k not in keys:
268 continue
269
270 metadata[k][v] += 1
271
272 return metadata
273
274 def gather_crush_info(self):
275 osdmap = self.get_osdmap()
276 crush_raw = osdmap.get_crush()
277 crush = crush_raw.dump()
278
279 def inc(d, k):
280 if k in d:
281 d[k] += 1
282 else:
283 d[k] = 1
284
285 device_classes = {}
286 for dev in crush['devices']:
287 inc(device_classes, dev.get('class', ''))
288
289 bucket_algs = {}
290 bucket_types = {}
291 bucket_sizes = {}
292 for bucket in crush['buckets']:
293 if '~' in bucket['name']: # ignore shadow buckets
294 continue
295 inc(bucket_algs, bucket['alg'])
296 inc(bucket_types, bucket['type_id'])
297 inc(bucket_sizes, len(bucket['items']))
298
299 return {
300 'num_devices': len(crush['devices']),
301 'num_types': len(crush['types']),
302 'num_buckets': len(crush['buckets']),
303 'num_rules': len(crush['rules']),
304 'device_classes': list(device_classes.values()),
305 'tunables': crush['tunables'],
306 'compat_weight_set': '-1' in crush['choose_args'],
307 'num_weight_sets': len(crush['choose_args']),
308 'bucket_algs': bucket_algs,
309 'bucket_sizes': bucket_sizes,
310 'bucket_types': bucket_types,
311 }
312
313 def gather_configs(self):
314 # cluster config options
315 cluster = set()
316 r, outb, outs = self.mon_command({
317 'prefix': 'config dump',
318 'format': 'json'
319 });
320 if r != 0:
321 return {}
322 try:
323 dump = json.loads(outb)
324 except json.decoder.JSONDecodeError:
325 return {}
326 for opt in dump:
327 name = opt.get('name')
328 if name:
329 cluster.add(name)
330 # daemon-reported options (which may include ceph.conf)
331 active = set()
332 ls = self.get("modified_config_options");
333 for opt in ls.get('options', {}):
334 active.add(opt)
335 return {
336 'cluster_changed': sorted(list(cluster)),
337 'active_changed': sorted(list(active)),
338 }
339
340 def gather_crashinfo(self):
341 crashlist = list()
342 errno, crashids, err = self.remote('crash', 'ls')
343 if errno:
344 return ''
345 for crashid in crashids.split():
346 cmd = {'id': crashid}
347 errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
348 if errno:
349 continue
350 c = json.loads(crashinfo)
351 del c['utsname_hostname']
352 # entity_name might have more than one '.', beware
353 (etype, eid) = c.get('entity_name', '').split('.', 1)
354 m = hashlib.sha1()
355 m.update(self.salt.encode('utf-8'))
356 m.update(eid.encode('utf-8'))
357 m.update(self.salt.encode('utf-8'))
358 c['entity_name'] = etype + '.' + m.hexdigest()
359 crashlist.append(c)
360 return crashlist
361
362 def get_active_channels(self):
363 r = []
364 if self.channel_basic:
365 r.append('basic')
366 if self.channel_crash:
367 r.append('crash')
368 if self.channel_device:
369 r.append('device')
370 return r
371
372 def gather_device_report(self):
373 try:
374 time_format = self.remote('devicehealth', 'get_time_format')
375 except:
376 return None
377 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
378 min_sample = cutoff.strftime(time_format)
379
380 devices = self.get('devices')['devices']
381
382 res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
383 for d in devices:
384 devid = d['devid']
385 try:
386 # this is a map of stamp -> {device info}
387 m = self.remote('devicehealth', 'get_recent_device_metrics',
388 devid, min_sample)
389 except:
390 continue
391
392 # anonymize host id
393 try:
394 host = d['location'][0]['host']
395 except:
396 continue
397 anon_host = self.get_store('host-id/%s' % host)
398 if not anon_host:
399 anon_host = str(uuid.uuid1())
400 self.set_store('host-id/%s' % host, anon_host)
401 for dev, rep in m.items():
402 rep['host_id'] = anon_host
403
404 # anonymize device id
405 anon_devid = self.get_store('devid-id/%s' % devid)
406 if not anon_devid:
407 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
408 self.set_store('devid-id/%s' % devid, anon_devid)
409 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
410 host, anon_host))
411
412 # anonymize the smartctl report itself
413 serial = devid.rsplit('_', 1)[1]
414 m_str = json.dumps(m)
415 m = json.loads(m_str.replace(serial, 'deleted'))
416
417 if anon_host not in res:
418 res[anon_host] = {}
419 res[anon_host][anon_devid] = m
420 return res
421
422 def get_latest(self, daemon_type, daemon_name, stat):
423 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
424 #self.log.error("get_latest {0} data={1}".format(stat, data))
425 if data:
426 return data[-1][1]
427 else:
428 return 0
429
430 def compile_report(self, channels=[]):
431 if not channels:
432 channels = self.get_active_channels()
433 report = {
434 'leaderboard': False,
435 'report_version': 1,
436 'report_timestamp': datetime.utcnow().isoformat(),
437 'report_id': self.report_id,
438 'channels': channels,
439 'channels_available': ALL_CHANNELS,
440 'license': LICENSE,
441 }
442
443 if 'ident' in channels:
444 if self.leaderboard:
445 report['leaderboard'] = True
446 for option in ['description', 'contact', 'organization']:
447 report[option] = getattr(self, option)
448
449 if 'basic' in channels:
450 mon_map = self.get('mon_map')
451 osd_map = self.get('osd_map')
452 service_map = self.get('service_map')
453 fs_map = self.get('fs_map')
454 df = self.get('df')
455
456 report['created'] = mon_map['created']
457
458 # mons
459 v1_mons = 0
460 v2_mons = 0
461 ipv4_mons = 0
462 ipv6_mons = 0
463 for mon in mon_map['mons']:
464 for a in mon['public_addrs']['addrvec']:
465 if a['type'] == 'v2':
466 v2_mons += 1
467 elif a['type'] == 'v1':
468 v1_mons += 1
469 if a['addr'].startswith('['):
470 ipv6_mons += 1
471 else:
472 ipv4_mons += 1
473 report['mon'] = {
474 'count': len(mon_map['mons']),
475 'features': mon_map['features'],
476 'min_mon_release': mon_map['min_mon_release'],
477 'v1_addr_mons': v1_mons,
478 'v2_addr_mons': v2_mons,
479 'ipv4_addr_mons': ipv4_mons,
480 'ipv6_addr_mons': ipv6_mons,
481 }
482
483 report['config'] = self.gather_configs()
484
485 # pools
486 report['rbd'] = {
487 'num_pools': 0,
488 'num_images_by_pool': [],
489 'mirroring_by_pool': [],
490 }
491 num_pg = 0
492 report['pools'] = list()
493 for pool in osd_map['pools']:
494 num_pg += pool['pg_num']
495 ec_profile = {}
496 if pool['erasure_code_profile']:
497 orig = osd_map['erasure_code_profiles'].get(
498 pool['erasure_code_profile'], {})
499 ec_profile = {
500 k: orig[k] for k in orig.keys()
501 if k in ['k', 'm', 'plugin', 'technique',
502 'crush-failure-domain', 'l']
503 }
504 report['pools'].append(
505 {
506 'pool': pool['pool'],
507 'type': pool['type'],
508 'pg_num': pool['pg_num'],
509 'pgp_num': pool['pg_placement_num'],
510 'size': pool['size'],
511 'min_size': pool['min_size'],
512 'pg_autoscale_mode': pool['pg_autoscale_mode'],
513 'target_max_bytes': pool['target_max_bytes'],
514 'target_max_objects': pool['target_max_objects'],
515 'type': ['', 'replicated', '', 'erasure'][pool['type']],
516 'erasure_code_profile': ec_profile,
517 'cache_mode': pool['cache_mode'],
518 }
519 )
520 if 'rbd' in pool['application_metadata']:
521 report['rbd']['num_pools'] += 1
522 ioctx = self.rados.open_ioctx(pool['pool_name'])
523 report['rbd']['num_images_by_pool'].append(
524 sum(1 for _ in rbd.RBD().list2(ioctx)))
525 report['rbd']['mirroring_by_pool'].append(
526 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
527
528 # osds
529 cluster_network = False
530 for osd in osd_map['osds']:
531 if osd['up'] and not cluster_network:
532 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
533 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
534 if front_ip != back_ip:
535 cluster_network = True
536 report['osd'] = {
537 'count': len(osd_map['osds']),
538 'require_osd_release': osd_map['require_osd_release'],
539 'require_min_compat_client': osd_map['require_min_compat_client'],
540 'cluster_network': cluster_network,
541 }
542
543 # crush
544 report['crush'] = self.gather_crush_info()
545
546 # cephfs
547 report['fs'] = {
548 'count': len(fs_map['filesystems']),
549 'feature_flags': fs_map['feature_flags'],
550 'num_standby_mds': len(fs_map['standbys']),
551 'filesystems': [],
552 }
553 num_mds = len(fs_map['standbys'])
554 for fsm in fs_map['filesystems']:
555 fs = fsm['mdsmap']
556 num_sessions = 0
557 cached_ino = 0
558 cached_dn = 0
559 cached_cap = 0
560 subtrees = 0
561 rfiles = 0
562 rbytes = 0
563 rsnaps = 0
564 for gid, mds in fs['info'].items():
565 num_sessions += self.get_latest('mds', mds['name'],
566 'mds_sessions.session_count')
567 cached_ino += self.get_latest('mds', mds['name'],
568 'mds_mem.ino')
569 cached_dn += self.get_latest('mds', mds['name'],
570 'mds_mem.dn')
571 cached_cap += self.get_latest('mds', mds['name'],
572 'mds_mem.cap')
573 subtrees += self.get_latest('mds', mds['name'],
574 'mds.subtrees')
575 if mds['rank'] == 0:
576 rfiles = self.get_latest('mds', mds['name'],
577 'mds.root_rfiles')
578 rbytes = self.get_latest('mds', mds['name'],
579 'mds.root_rbytes')
580 rsnaps = self.get_latest('mds', mds['name'],
581 'mds.root_rsnaps')
582 report['fs']['filesystems'].append({
583 'max_mds': fs['max_mds'],
584 'ever_allowed_features': fs['ever_allowed_features'],
585 'explicitly_allowed_features': fs['explicitly_allowed_features'],
586 'num_in': len(fs['in']),
587 'num_up': len(fs['up']),
588 'num_standby_replay': len(
589 [mds for gid, mds in fs['info'].items()
590 if mds['state'] == 'up:standby-replay']),
591 'num_mds': len(fs['info']),
592 'num_sessions': num_sessions,
593 'cached_inos': cached_ino,
594 'cached_dns': cached_dn,
595 'cached_caps': cached_cap,
596 'cached_subtrees': subtrees,
597 'balancer_enabled': len(fs['balancer']) > 0,
598 'num_data_pools': len(fs['data_pools']),
599 'standby_count_wanted': fs['standby_count_wanted'],
600 'approx_ctime': fs['created'][0:7],
601 'files': rfiles,
602 'bytes': rbytes,
603 'snaps': rsnaps,
604 })
605 num_mds += len(fs['info'])
606 report['fs']['total_num_mds'] = num_mds
607
608 # daemons
609 report['metadata'] = dict()
610 report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
611 report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
612
613 # host counts
614 servers = self.list_servers()
615 self.log.debug('servers %s' % servers)
616 report['hosts'] = {
617 'num': len([h for h in servers if h['hostname']]),
618 }
619 for t in ['mon', 'mds', 'osd', 'mgr']:
620 report['hosts']['num_with_' + t] = len(
621 [h for h in servers
622 if len([s for s in h['services'] if s['type'] == t])]
623 )
624
625 report['usage'] = {
626 'pools': len(df['pools']),
627 'pg_num': num_pg,
628 'total_used_bytes': df['stats']['total_used_bytes'],
629 'total_bytes': df['stats']['total_bytes'],
630 'total_avail_bytes': df['stats']['total_avail_bytes']
631 }
632
633 report['services'] = defaultdict(int)
634 for key, value in service_map['services'].items():
635 report['services'][key] += 1
636 if key == 'rgw':
637 report['rgw'] = {
638 'count': 0,
639 }
640 zones = set()
641 realms = set()
642 zonegroups = set()
643 frontends = set()
644 d = value.get('daemons', dict())
645
646 for k,v in d.items():
647 if k == 'summary' and v:
648 report['rgw'][k] = v
649 elif isinstance(v, dict) and 'metadata' in v:
650 report['rgw']['count'] += 1
651 zones.add(v['metadata']['zone_id'])
652 zonegroups.add(v['metadata']['zonegroup_id'])
653 frontends.add(v['metadata']['frontend_type#0'])
654
655 # we could actually iterate over all the keys of
656 # the dict and check for how many frontends there
657 # are, but it is unlikely that one would be running
658 # more than 2 supported ones
659 f2 = v['metadata'].get('frontend_type#1', None)
660 if f2:
661 frontends.add(f2)
662
663 report['rgw']['zones'] = len(zones)
664 report['rgw']['zonegroups'] = len(zonegroups)
665 report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
666
667 try:
668 report['balancer'] = self.remote('balancer', 'gather_telemetry')
669 except ImportError:
670 report['balancer'] = {
671 'active': False
672 }
673
674 if 'crash' in channels:
675 report['crashes'] = self.gather_crashinfo()
676
677 # NOTE: We do not include the 'device' channel in this report; it is
678 # sent to a different endpoint.
679
680 return report
681
682 def _try_post(self, what, url, report):
683 self.log.info('Sending %s to: %s' % (what, url))
684 proxies = dict()
685 if self.proxy:
686 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
687 proxies['http'] = self.proxy
688 proxies['https'] = self.proxy
689 try:
690 resp = requests.put(url=url, json=report, proxies=proxies)
691 resp.raise_for_status()
692 except Exception as e:
693 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
694 self.log.error(fail_reason)
695 return fail_reason
696 return None
697
698 def send(self, report, endpoint=None):
699 if not endpoint:
700 endpoint = ['ceph', 'device']
701 failed = []
702 success = []
703 self.log.debug('Send endpoints %s' % endpoint)
704 for e in endpoint:
705 if e == 'ceph':
706 fail_reason = self._try_post('ceph report', self.url, report)
707 if fail_reason:
708 failed.append(fail_reason)
709 else:
710 now = int(time.time())
711 self.last_upload = now
712 self.set_store('last_upload', str(now))
713 success.append('Ceph report sent to {0}'.format(self.url))
714 self.log.info('Sent report to {0}'.format(self.url))
715 elif e == 'device':
716 if 'device' in self.get_active_channels():
717 devices = self.gather_device_report()
718 num_devs = 0
719 num_hosts = 0
720 for host, ls in devices.items():
721 self.log.debug('host %s devices %s' % (host, ls))
722 if not len(ls):
723 continue
724 fail_reason = self._try_post('devices', self.device_url,
725 ls)
726 if fail_reason:
727 failed.append(fail_reason)
728 else:
729 num_devs += len(ls)
730 num_hosts += 1
731 if num_devs:
732 success.append('Reported %d devices across %d hosts' % (
733 num_devs, len(devices)))
734 if failed:
735 return 1, '', '\n'.join(success + failed)
736 return 0, '', '\n'.join(success)
737
738 def handle_command(self, inbuf, command):
739 if command['prefix'] == 'telemetry status':
740 r = {}
741 for opt in self.MODULE_OPTIONS:
742 r[opt['name']] = getattr(self, opt['name'])
743 r['last_upload'] = time.ctime(self.last_upload) if self.last_upload else self.last_upload
744 return 0, json.dumps(r, indent=4, sort_keys=True), ''
745 elif command['prefix'] == 'telemetry on':
746 if command.get('license') != LICENSE:
747 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
748 self.on()
749 return 0, '', ''
750 elif command['prefix'] == 'telemetry off':
751 self.off()
752 return 0, '', ''
753 elif command['prefix'] == 'telemetry send':
754 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and command.get('license') != LICENSE:
755 self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
756 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
757 self.last_report = self.compile_report()
758 return self.send(self.last_report, command.get('endpoint'))
759
760 elif command['prefix'] == 'telemetry show':
761 report = self.get_report(channels=command.get('channels', None))
762 report = json.dumps(report, indent=4, sort_keys=True)
763 if self.channel_device:
764 report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.'
765 return 0, report, ''
766 elif command['prefix'] == 'telemetry show-device':
767 return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), ''
768 elif command['prefix'] == 'telemetry show-all':
769 return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), ''
770 else:
771 return (-errno.EINVAL, '',
772 "Command not found '{0}'".format(command['prefix']))
773
774 def on(self):
775 self.set_module_option('enabled', True)
776 self.set_module_option('last_opt_revision', REVISION)
777
778 def off(self):
779 self.set_module_option('enabled', False)
780 self.set_module_option('last_opt_revision', 1)
781
782 def get_report(self, report_type='default', channels=None):
783 if report_type == 'default':
784 return self.compile_report(channels=channels)
785 elif report_type == 'device':
786 return self.gather_device_report()
787 elif report_type == 'all':
788 return {'report': self.compile_report(channels=channels),
789 'device_report': self.gather_device_report()}
790 return {}
791
792 def self_test(self):
793 report = self.compile_report()
794 if len(report) == 0:
795 raise RuntimeError('Report is empty')
796
797 if 'report_id' not in report:
798 raise RuntimeError('report_id not found in report')
799
800 def shutdown(self):
801 self.run = False
802 self.event.set()
803
804 def refresh_health_checks(self):
805 health_checks = {}
806 if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
807 health_checks['TELEMETRY_CHANGED'] = {
808 'severity': 'warning',
809 'summary': 'Telemetry requires re-opt-in',
810 'detail': [
811 'telemetry report includes new information; must re-opt-in (or out)'
812 ]
813 }
814 self.set_health_checks(health_checks)
815
816 def serve(self):
817 self.load()
818 self.config_notify()
819 self.run = True
820
821 self.log.debug('Waiting for mgr to warm up')
822 self.event.wait(10)
823
824 while self.run:
825 self.event.clear()
826
827 self.refresh_health_checks()
828
829 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
830 self.log.debug('Not sending report until user re-opts-in')
831 self.event.wait(1800)
832 continue
833 if not self.enabled:
834 self.log.debug('Not sending report until configured to do so')
835 self.event.wait(1800)
836 continue
837
838 now = int(time.time())
839 if not self.last_upload or (now - self.last_upload) > \
840 self.interval * 3600:
841 self.log.info('Compiling and sending report to %s',
842 self.url)
843
844 try:
845 self.last_report = self.compile_report()
846 except:
847 self.log.exception('Exception while compiling report:')
848
849 self.send(self.last_report)
850 else:
851 self.log.debug('Interval for sending new report has not expired')
852
853 sleep = 3600
854 self.log.debug('Sleeping for %d seconds', sleep)
855 self.event.wait(sleep)
856
857 def self_test(self):
858 self.compile_report()
859 return True
860
861 @staticmethod
862 def can_run():
863 return True, ''