]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
f025e94013a81d05951e2bbea413ca2ba1ea83d0
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import errno
8 import hashlib
9 import json
10 import rbd
11 import re
12 import requests
13 import uuid
14 import time
15 from datetime import datetime, timedelta
16 from threading import Event
17 from collections import defaultdict
18
19 from mgr_module import MgrModule
20
21
22 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
23
24 LICENSE='sharing-1-0'
25 LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
26 LICENSE_URL='https://cdla.io/sharing-1-0/'
27
28 # If the telemetry revision has changed since this point, re-require
29 # an opt-in. This should happen each time we add new information to
30 # the telemetry report.
31 LAST_REVISION_RE_OPT_IN = 2
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Module(MgrModule):
63 config = dict()
64
65 metadata_keys = [
66 "arch",
67 "ceph_version",
68 "os",
69 "cpu",
70 "kernel_description",
71 "kernel_version",
72 "distro_description",
73 "distro"
74 ]
75
76 MODULE_OPTIONS = [
77 {
78 'name': 'url',
79 'type': 'str',
80 'default': 'https://telemetry.ceph.com/report'
81 },
82 {
83 'name': 'device_url',
84 'type': 'str',
85 'default': 'https://telemetry.ceph.com/device'
86 },
87 {
88 'name': 'enabled',
89 'type': 'bool',
90 'default': False
91 },
92 {
93 'name': 'last_opt_revision',
94 'type': 'int',
95 'default': 1,
96 },
97 {
98 'name': 'leaderboard',
99 'type': 'bool',
100 'default': False
101 },
102 {
103 'name': 'description',
104 'type': 'str',
105 'default': None
106 },
107 {
108 'name': 'contact',
109 'type': 'str',
110 'default': None
111 },
112 {
113 'name': 'organization',
114 'type': 'str',
115 'default': None
116 },
117 {
118 'name': 'proxy',
119 'type': 'str',
120 'default': None
121 },
122 {
123 'name': 'interval',
124 'type': 'int',
125 'default': 24,
126 'min': 8
127 },
128 {
129 'name': 'channel_basic',
130 'type': 'bool',
131 'default': True,
132 'desc': 'Share basic cluster information (size, version)',
133 },
134 {
135 'name': 'channel_ident',
136 'type': 'bool',
137 'default': False,
138 'description': 'Share a user-provided description and/or contact email for the cluster',
139 },
140 {
141 'name': 'channel_crash',
142 'type': 'bool',
143 'default': True,
144 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
145 },
146 {
147 'name': 'channel_device',
148 'type': 'bool',
149 'default': True,
150 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
151 },
152 ]
153
154 COMMANDS = [
155 {
156 "cmd": "telemetry status",
157 "desc": "Show current configuration",
158 "perm": "r"
159 },
160 {
161 "cmd": "telemetry send "
162 "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false "
163 "name=license,type=CephString,req=false",
164 "desc": "Force sending data to Ceph telemetry",
165 "perm": "rw"
166 },
167 {
168 "cmd": "telemetry show "
169 "name=channels,type=CephString,n=N,req=False",
170 "desc": "Show last report or report to be sent",
171 "perm": "r"
172 },
173 {
174 "cmd": "telemetry show-device",
175 "desc": "Show last device report or device report to be sent",
176 "perm": "r"
177 },
178 {
179 "cmd": "telemetry show-all",
180 "desc": "Show report of all channels",
181 "perm": "r"
182 },
183 {
184 "cmd": "telemetry on name=license,type=CephString,req=false",
185 "desc": "Enable telemetry reports from this cluster",
186 "perm": "rw",
187 },
188 {
189 "cmd": "telemetry off",
190 "desc": "Disable telemetry reports from this cluster",
191 "perm": "rw",
192 },
193 ]
194
195 @property
196 def config_keys(self):
197 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
198
199 def __init__(self, *args, **kwargs):
200 super(Module, self).__init__(*args, **kwargs)
201 self.event = Event()
202 self.run = False
203 self.last_upload = None
204 self.last_report = dict()
205 self.report_id = None
206 self.salt = None
207
208 def config_notify(self):
209 for opt in self.MODULE_OPTIONS:
210 setattr(self,
211 opt['name'],
212 self.get_module_option(opt['name']))
213 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
214 # wake up serve() thread
215 self.event.set()
216
217 def load(self):
218 self.last_upload = self.get_store('last_upload', None)
219 if self.last_upload is not None:
220 self.last_upload = int(self.last_upload)
221
222 self.report_id = self.get_store('report_id', None)
223 if self.report_id is None:
224 self.report_id = str(uuid.uuid4())
225 self.set_store('report_id', self.report_id)
226
227 self.salt = self.get_store('salt', None)
228 if not self.salt:
229 self.salt = str(uuid.uuid4())
230 self.set_store('salt', self.salt)
231
232 def gather_osd_metadata(self, osd_map):
233 keys = ["osd_objectstore", "rotational"]
234 keys += self.metadata_keys
235
236 metadata = dict()
237 for key in keys:
238 metadata[key] = defaultdict(int)
239
240 for osd in osd_map['osds']:
241 res = self.get_metadata('osd', str(osd['osd'])).items()
242 if res is None:
243 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
244 continue
245 for k, v in res:
246 if k not in keys:
247 continue
248
249 metadata[k][v] += 1
250
251 return metadata
252
253 def gather_mon_metadata(self, mon_map):
254 keys = list()
255 keys += self.metadata_keys
256
257 metadata = dict()
258 for key in keys:
259 metadata[key] = defaultdict(int)
260
261 for mon in mon_map['mons']:
262 res = self.get_metadata('mon', mon['name']).items()
263 if res is None:
264 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
265 continue
266 for k, v in res:
267 if k not in keys:
268 continue
269
270 metadata[k][v] += 1
271
272 return metadata
273
274 def gather_crush_info(self):
275 osdmap = self.get_osdmap()
276 crush_raw = osdmap.get_crush()
277 crush = crush_raw.dump()
278
279 def inc(d, k):
280 if k in d:
281 d[k] += 1
282 else:
283 d[k] = 1
284
285 device_classes = {}
286 for dev in crush['devices']:
287 inc(device_classes, dev.get('class', ''))
288
289 bucket_algs = {}
290 bucket_types = {}
291 bucket_sizes = {}
292 for bucket in crush['buckets']:
293 if '~' in bucket['name']: # ignore shadow buckets
294 continue
295 inc(bucket_algs, bucket['alg'])
296 inc(bucket_types, bucket['type_id'])
297 inc(bucket_sizes, len(bucket['items']))
298
299 return {
300 'num_devices': len(crush['devices']),
301 'num_types': len(crush['types']),
302 'num_buckets': len(crush['buckets']),
303 'num_rules': len(crush['rules']),
304 'device_classes': list(device_classes.values()),
305 'tunables': crush['tunables'],
306 'compat_weight_set': '-1' in crush['choose_args'],
307 'num_weight_sets': len(crush['choose_args']),
308 'bucket_algs': bucket_algs,
309 'bucket_sizes': bucket_sizes,
310 'bucket_types': bucket_types,
311 }
312
313 def gather_configs(self):
314 # cluster config options
315 cluster = set()
316 r, outb, outs = self.mon_command({
317 'prefix': 'config dump',
318 'format': 'json'
319 });
320 if r != 0:
321 return {}
322 try:
323 dump = json.loads(outb)
324 except json.decoder.JSONDecodeError:
325 return {}
326 for opt in dump:
327 name = opt.get('name')
328 if name:
329 cluster.add(name)
330 # daemon-reported options (which may include ceph.conf)
331 active = set()
332 ls = self.get("modified_config_options");
333 for opt in ls.get('options', {}):
334 active.add(opt)
335 return {
336 'cluster_changed': sorted(list(cluster)),
337 'active_changed': sorted(list(active)),
338 }
339
340 def gather_crashinfo(self):
341 crashlist = list()
342 errno, crashids, err = self.remote('crash', 'ls')
343 if errno:
344 return ''
345 for crashid in crashids.split():
346 cmd = {'id': crashid}
347 errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
348 if errno:
349 continue
350 c = json.loads(crashinfo)
351 del c['utsname_hostname']
352 # entity_name might have more than one '.', beware
353 (etype, eid) = c.get('entity_name', '').split('.', 1)
354 m = hashlib.sha1()
355 m.update(self.salt.encode('utf-8'))
356 m.update(eid.encode('utf-8'))
357 m.update(self.salt.encode('utf-8'))
358 c['entity_name'] = etype + '.' + m.hexdigest()
359 crashlist.append(c)
360 return crashlist
361
362 def get_active_channels(self):
363 r = []
364 if self.channel_basic:
365 r.append('basic')
366 if self.channel_crash:
367 r.append('crash')
368 if self.channel_device:
369 r.append('device')
370 if self.channel_ident:
371 r.append('ident')
372 return r
373
374 def gather_device_report(self):
375 try:
376 time_format = self.remote('devicehealth', 'get_time_format')
377 except:
378 return None
379 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
380 min_sample = cutoff.strftime(time_format)
381
382 devices = self.get('devices')['devices']
383
384 res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
385 for d in devices:
386 devid = d['devid']
387 try:
388 # this is a map of stamp -> {device info}
389 m = self.remote('devicehealth', 'get_recent_device_metrics',
390 devid, min_sample)
391 except:
392 continue
393
394 # anonymize host id
395 try:
396 host = d['location'][0]['host']
397 except:
398 continue
399 anon_host = self.get_store('host-id/%s' % host)
400 if not anon_host:
401 anon_host = str(uuid.uuid1())
402 self.set_store('host-id/%s' % host, anon_host)
403 serial = None
404 for dev, rep in m.items():
405 rep['host_id'] = anon_host
406 if serial is None and 'serial_number' in rep:
407 serial = rep['serial_number']
408
409 # anonymize device id
410 anon_devid = self.get_store('devid-id/%s' % devid)
411 if not anon_devid:
412 # ideally devid is 'vendor_model_serial',
413 # but can also be 'model_serial', 'serial'
414 if '_' in devid:
415 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
416 else:
417 anon_devid = str(uuid.uuid1())
418 self.set_store('devid-id/%s' % devid, anon_devid)
419 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
420 host, anon_host))
421
422 # anonymize the smartctl report itself
423 if serial:
424 m_str = json.dumps(m)
425 m = json.loads(m_str.replace(serial, 'deleted'))
426
427 if anon_host not in res:
428 res[anon_host] = {}
429 res[anon_host][anon_devid] = m
430 return res
431
432 def get_latest(self, daemon_type, daemon_name, stat):
433 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
434 #self.log.error("get_latest {0} data={1}".format(stat, data))
435 if data:
436 return data[-1][1]
437 else:
438 return 0
439
440 def compile_report(self, channels=[]):
441 if not channels:
442 channels = self.get_active_channels()
443 report = {
444 'leaderboard': self.leaderboard,
445 'report_version': 1,
446 'report_timestamp': datetime.utcnow().isoformat(),
447 'report_id': self.report_id,
448 'channels': channels,
449 'channels_available': ALL_CHANNELS,
450 'license': LICENSE,
451 }
452
453 if 'ident' in channels:
454 for option in ['description', 'contact', 'organization']:
455 report[option] = getattr(self, option)
456
457 if 'basic' in channels:
458 mon_map = self.get('mon_map')
459 osd_map = self.get('osd_map')
460 service_map = self.get('service_map')
461 fs_map = self.get('fs_map')
462 df = self.get('df')
463
464 report['created'] = mon_map['created']
465
466 # mons
467 v1_mons = 0
468 v2_mons = 0
469 ipv4_mons = 0
470 ipv6_mons = 0
471 for mon in mon_map['mons']:
472 for a in mon['public_addrs']['addrvec']:
473 if a['type'] == 'v2':
474 v2_mons += 1
475 elif a['type'] == 'v1':
476 v1_mons += 1
477 if a['addr'].startswith('['):
478 ipv6_mons += 1
479 else:
480 ipv4_mons += 1
481 report['mon'] = {
482 'count': len(mon_map['mons']),
483 'features': mon_map['features'],
484 'min_mon_release': mon_map['min_mon_release'],
485 'v1_addr_mons': v1_mons,
486 'v2_addr_mons': v2_mons,
487 'ipv4_addr_mons': ipv4_mons,
488 'ipv6_addr_mons': ipv6_mons,
489 }
490
491 report['config'] = self.gather_configs()
492
493 # pools
494 report['rbd'] = {
495 'num_pools': 0,
496 'num_images_by_pool': [],
497 'mirroring_by_pool': [],
498 }
499 num_pg = 0
500 report['pools'] = list()
501 for pool in osd_map['pools']:
502 num_pg += pool['pg_num']
503 ec_profile = {}
504 if pool['erasure_code_profile']:
505 orig = osd_map['erasure_code_profiles'].get(
506 pool['erasure_code_profile'], {})
507 ec_profile = {
508 k: orig[k] for k in orig.keys()
509 if k in ['k', 'm', 'plugin', 'technique',
510 'crush-failure-domain', 'l']
511 }
512 report['pools'].append(
513 {
514 'pool': pool['pool'],
515 'type': pool['type'],
516 'pg_num': pool['pg_num'],
517 'pgp_num': pool['pg_placement_num'],
518 'size': pool['size'],
519 'min_size': pool['min_size'],
520 'pg_autoscale_mode': pool['pg_autoscale_mode'],
521 'target_max_bytes': pool['target_max_bytes'],
522 'target_max_objects': pool['target_max_objects'],
523 'type': ['', 'replicated', '', 'erasure'][pool['type']],
524 'erasure_code_profile': ec_profile,
525 'cache_mode': pool['cache_mode'],
526 }
527 )
528 if 'rbd' in pool['application_metadata']:
529 report['rbd']['num_pools'] += 1
530 ioctx = self.rados.open_ioctx(pool['pool_name'])
531 report['rbd']['num_images_by_pool'].append(
532 sum(1 for _ in rbd.RBD().list2(ioctx)))
533 report['rbd']['mirroring_by_pool'].append(
534 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
535
536 # osds
537 cluster_network = False
538 for osd in osd_map['osds']:
539 if osd['up'] and not cluster_network:
540 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
541 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
542 if front_ip != back_ip:
543 cluster_network = True
544 report['osd'] = {
545 'count': len(osd_map['osds']),
546 'require_osd_release': osd_map['require_osd_release'],
547 'require_min_compat_client': osd_map['require_min_compat_client'],
548 'cluster_network': cluster_network,
549 }
550
551 # crush
552 report['crush'] = self.gather_crush_info()
553
554 # cephfs
555 report['fs'] = {
556 'count': len(fs_map['filesystems']),
557 'feature_flags': fs_map['feature_flags'],
558 'num_standby_mds': len(fs_map['standbys']),
559 'filesystems': [],
560 }
561 num_mds = len(fs_map['standbys'])
562 for fsm in fs_map['filesystems']:
563 fs = fsm['mdsmap']
564 num_sessions = 0
565 cached_ino = 0
566 cached_dn = 0
567 cached_cap = 0
568 subtrees = 0
569 rfiles = 0
570 rbytes = 0
571 rsnaps = 0
572 for gid, mds in fs['info'].items():
573 num_sessions += self.get_latest('mds', mds['name'],
574 'mds_sessions.session_count')
575 cached_ino += self.get_latest('mds', mds['name'],
576 'mds_mem.ino')
577 cached_dn += self.get_latest('mds', mds['name'],
578 'mds_mem.dn')
579 cached_cap += self.get_latest('mds', mds['name'],
580 'mds_mem.cap')
581 subtrees += self.get_latest('mds', mds['name'],
582 'mds.subtrees')
583 if mds['rank'] == 0:
584 rfiles = self.get_latest('mds', mds['name'],
585 'mds.root_rfiles')
586 rbytes = self.get_latest('mds', mds['name'],
587 'mds.root_rbytes')
588 rsnaps = self.get_latest('mds', mds['name'],
589 'mds.root_rsnaps')
590 report['fs']['filesystems'].append({
591 'max_mds': fs['max_mds'],
592 'ever_allowed_features': fs['ever_allowed_features'],
593 'explicitly_allowed_features': fs['explicitly_allowed_features'],
594 'num_in': len(fs['in']),
595 'num_up': len(fs['up']),
596 'num_standby_replay': len(
597 [mds for gid, mds in fs['info'].items()
598 if mds['state'] == 'up:standby-replay']),
599 'num_mds': len(fs['info']),
600 'num_sessions': num_sessions,
601 'cached_inos': cached_ino,
602 'cached_dns': cached_dn,
603 'cached_caps': cached_cap,
604 'cached_subtrees': subtrees,
605 'balancer_enabled': len(fs['balancer']) > 0,
606 'num_data_pools': len(fs['data_pools']),
607 'standby_count_wanted': fs['standby_count_wanted'],
608 'approx_ctime': fs['created'][0:7],
609 'files': rfiles,
610 'bytes': rbytes,
611 'snaps': rsnaps,
612 })
613 num_mds += len(fs['info'])
614 report['fs']['total_num_mds'] = num_mds
615
616 # daemons
617 report['metadata'] = dict()
618 report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
619 report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
620
621 # host counts
622 servers = self.list_servers()
623 self.log.debug('servers %s' % servers)
624 report['hosts'] = {
625 'num': len([h for h in servers if h['hostname']]),
626 }
627 for t in ['mon', 'mds', 'osd', 'mgr']:
628 report['hosts']['num_with_' + t] = len(
629 [h for h in servers
630 if len([s for s in h['services'] if s['type'] == t])]
631 )
632
633 report['usage'] = {
634 'pools': len(df['pools']),
635 'pg_num': num_pg,
636 'total_used_bytes': df['stats']['total_used_bytes'],
637 'total_bytes': df['stats']['total_bytes'],
638 'total_avail_bytes': df['stats']['total_avail_bytes']
639 }
640
641 report['services'] = defaultdict(int)
642 for key, value in service_map['services'].items():
643 report['services'][key] += 1
644 if key == 'rgw':
645 report['rgw'] = {
646 'count': 0,
647 }
648 zones = set()
649 realms = set()
650 zonegroups = set()
651 frontends = set()
652 d = value.get('daemons', dict())
653
654 for k,v in d.items():
655 if k == 'summary' and v:
656 report['rgw'][k] = v
657 elif isinstance(v, dict) and 'metadata' in v:
658 report['rgw']['count'] += 1
659 zones.add(v['metadata']['zone_id'])
660 zonegroups.add(v['metadata']['zonegroup_id'])
661 frontends.add(v['metadata']['frontend_type#0'])
662
663 # we could actually iterate over all the keys of
664 # the dict and check for how many frontends there
665 # are, but it is unlikely that one would be running
666 # more than 2 supported ones
667 f2 = v['metadata'].get('frontend_type#1', None)
668 if f2:
669 frontends.add(f2)
670
671 report['rgw']['zones'] = len(zones)
672 report['rgw']['zonegroups'] = len(zonegroups)
673 report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
674
675 try:
676 report['balancer'] = self.remote('balancer', 'gather_telemetry')
677 except ImportError:
678 report['balancer'] = {
679 'active': False
680 }
681
682 if 'crash' in channels:
683 report['crashes'] = self.gather_crashinfo()
684
685 # NOTE: We do not include the 'device' channel in this report; it is
686 # sent to a different endpoint.
687
688 return report
689
690 def _try_post(self, what, url, report):
691 self.log.info('Sending %s to: %s' % (what, url))
692 proxies = dict()
693 if self.proxy:
694 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
695 proxies['http'] = self.proxy
696 proxies['https'] = self.proxy
697 try:
698 resp = requests.put(url=url, json=report, proxies=proxies)
699 resp.raise_for_status()
700 except Exception as e:
701 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
702 self.log.error(fail_reason)
703 return fail_reason
704 return None
705
706 def send(self, report, endpoint=None):
707 if not endpoint:
708 endpoint = ['ceph', 'device']
709 failed = []
710 success = []
711 self.log.debug('Send endpoints %s' % endpoint)
712 for e in endpoint:
713 if e == 'ceph':
714 fail_reason = self._try_post('ceph report', self.url, report)
715 if fail_reason:
716 failed.append(fail_reason)
717 else:
718 now = int(time.time())
719 self.last_upload = now
720 self.set_store('last_upload', str(now))
721 success.append('Ceph report sent to {0}'.format(self.url))
722 self.log.info('Sent report to {0}'.format(self.url))
723 elif e == 'device':
724 if 'device' in self.get_active_channels():
725 devices = self.gather_device_report()
726 num_devs = 0
727 num_hosts = 0
728 for host, ls in devices.items():
729 self.log.debug('host %s devices %s' % (host, ls))
730 if not len(ls):
731 continue
732 fail_reason = self._try_post('devices', self.device_url,
733 ls)
734 if fail_reason:
735 failed.append(fail_reason)
736 else:
737 num_devs += len(ls)
738 num_hosts += 1
739 if num_devs:
740 success.append('Reported %d devices across %d hosts' % (
741 num_devs, len(devices)))
742 if failed:
743 return 1, '', '\n'.join(success + failed)
744 return 0, '', '\n'.join(success)
745
746 def handle_command(self, inbuf, command):
747 if command['prefix'] == 'telemetry status':
748 r = {}
749 for opt in self.MODULE_OPTIONS:
750 r[opt['name']] = getattr(self, opt['name'])
751 r['last_upload'] = time.ctime(self.last_upload) if self.last_upload else self.last_upload
752 return 0, json.dumps(r, indent=4, sort_keys=True), ''
753 elif command['prefix'] == 'telemetry on':
754 if command.get('license') != LICENSE:
755 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
756 self.on()
757 return 0, '', ''
758 elif command['prefix'] == 'telemetry off':
759 self.off()
760 return 0, '', ''
761 elif command['prefix'] == 'telemetry send':
762 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and command.get('license') != LICENSE:
763 self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
764 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
765 self.last_report = self.compile_report()
766 return self.send(self.last_report, command.get('endpoint'))
767
768 elif command['prefix'] == 'telemetry show':
769 report = self.get_report(channels=command.get('channels', None))
770 report = json.dumps(report, indent=4, sort_keys=True)
771 if self.channel_device:
772 report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.'
773 return 0, report, ''
774 elif command['prefix'] == 'telemetry show-device':
775 return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), ''
776 elif command['prefix'] == 'telemetry show-all':
777 return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), ''
778 else:
779 return (-errno.EINVAL, '',
780 "Command not found '{0}'".format(command['prefix']))
781
782 def on(self):
783 self.set_module_option('enabled', True)
784 self.set_module_option('last_opt_revision', REVISION)
785
786 def off(self):
787 self.set_module_option('enabled', False)
788 self.set_module_option('last_opt_revision', 1)
789
790 def get_report(self, report_type='default', channels=None):
791 if report_type == 'default':
792 return self.compile_report(channels=channels)
793 elif report_type == 'device':
794 return self.gather_device_report()
795 elif report_type == 'all':
796 return {'report': self.compile_report(channels=channels),
797 'device_report': self.gather_device_report()}
798 return {}
799
800 def self_test(self):
801 report = self.compile_report()
802 if len(report) == 0:
803 raise RuntimeError('Report is empty')
804
805 if 'report_id' not in report:
806 raise RuntimeError('report_id not found in report')
807
808 def shutdown(self):
809 self.run = False
810 self.event.set()
811
812 def refresh_health_checks(self):
813 health_checks = {}
814 if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
815 health_checks['TELEMETRY_CHANGED'] = {
816 'severity': 'warning',
817 'summary': 'Telemetry requires re-opt-in',
818 'detail': [
819 'telemetry report includes new information; must re-opt-in (or out)'
820 ]
821 }
822 self.set_health_checks(health_checks)
823
824 def serve(self):
825 self.load()
826 self.config_notify()
827 self.run = True
828
829 self.log.debug('Waiting for mgr to warm up')
830 self.event.wait(10)
831
832 while self.run:
833 self.event.clear()
834
835 self.refresh_health_checks()
836
837 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
838 self.log.debug('Not sending report until user re-opts-in')
839 self.event.wait(1800)
840 continue
841 if not self.enabled:
842 self.log.debug('Not sending report until configured to do so')
843 self.event.wait(1800)
844 continue
845
846 now = int(time.time())
847 if not self.last_upload or (now - self.last_upload) > \
848 self.interval * 3600:
849 self.log.info('Compiling and sending report to %s',
850 self.url)
851
852 try:
853 self.last_report = self.compile_report()
854 except:
855 self.log.exception('Exception while compiling report:')
856
857 self.send(self.last_report)
858 else:
859 self.log.debug('Interval for sending new report has not expired')
860
861 sleep = 3600
862 self.log.debug('Sleeping for %d seconds', sleep)
863 self.event.wait(sleep)
864
865 def self_test(self):
866 self.compile_report()
867 return True
868
869 @staticmethod
870 def can_run():
871 return True, ''