]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import errno
8 import hashlib
9 import json
10 import rbd
11 import re
12 import requests
13 import uuid
14 import time
15 from datetime import datetime, timedelta
16 from threading import Event
17 from collections import defaultdict
18
19 from mgr_module import MgrModule
20
21
22 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
23
24 LICENSE='sharing-1-0'
25 LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
26 LICENSE_URL='https://cdla.io/sharing-1-0/'
27
28 # If the telemetry revision has changed since this point, re-require
29 # an opt-in. This should happen each time we add new information to
30 # the telemetry report.
31 LAST_REVISION_RE_OPT_IN = 2
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Module(MgrModule):
63 config = dict()
64
65 metadata_keys = [
66 "arch",
67 "ceph_version",
68 "os",
69 "cpu",
70 "kernel_description",
71 "kernel_version",
72 "distro_description",
73 "distro"
74 ]
75
76 MODULE_OPTIONS = [
77 {
78 'name': 'url',
79 'type': 'str',
80 'default': 'https://telemetry.ceph.com/report'
81 },
82 {
83 'name': 'device_url',
84 'type': 'str',
85 'default': 'https://telemetry.ceph.com/device'
86 },
87 {
88 'name': 'enabled',
89 'type': 'bool',
90 'default': False
91 },
92 {
93 'name': 'last_opt_revision',
94 'type': 'int',
95 'default': 1,
96 },
97 {
98 'name': 'leaderboard',
99 'type': 'bool',
100 'default': False
101 },
102 {
103 'name': 'description',
104 'type': 'str',
105 'default': None
106 },
107 {
108 'name': 'contact',
109 'type': 'str',
110 'default': None
111 },
112 {
113 'name': 'organization',
114 'type': 'str',
115 'default': None
116 },
117 {
118 'name': 'proxy',
119 'type': 'str',
120 'default': None
121 },
122 {
123 'name': 'interval',
124 'type': 'int',
125 'default': 24,
126 'min': 8
127 },
128 {
129 'name': 'channel_basic',
130 'type': 'bool',
131 'default': True,
132 'desc': 'Share basic cluster information (size, version)',
133 },
134 {
135 'name': 'channel_ident',
136 'type': 'bool',
137 'default': False,
138 'description': 'Share a user-provided description and/or contact email for the cluster',
139 },
140 {
141 'name': 'channel_crash',
142 'type': 'bool',
143 'default': True,
144 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
145 },
146 {
147 'name': 'channel_device',
148 'type': 'bool',
149 'default': True,
150 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
151 },
152 ]
153
154 COMMANDS = [
155 {
156 "cmd": "telemetry status",
157 "desc": "Show current configuration",
158 "perm": "r"
159 },
160 {
161 "cmd": "telemetry send "
162 "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false "
163 "name=license,type=CephString,req=false",
164 "desc": "Force sending data to Ceph telemetry",
165 "perm": "rw"
166 },
167 {
168 "cmd": "telemetry show "
169 "name=channels,type=CephString,n=N,req=False",
170 "desc": "Show last report or report to be sent",
171 "perm": "r"
172 },
173 {
174 "cmd": "telemetry show-device",
175 "desc": "Show last device report or device report to be sent",
176 "perm": "r"
177 },
178 {
179 "cmd": "telemetry show-all",
180 "desc": "Show report of all channels",
181 "perm": "r"
182 },
183 {
184 "cmd": "telemetry on name=license,type=CephString,req=false",
185 "desc": "Enable telemetry reports from this cluster",
186 "perm": "rw",
187 },
188 {
189 "cmd": "telemetry off",
190 "desc": "Disable telemetry reports from this cluster",
191 "perm": "rw",
192 },
193 ]
194
195 @property
196 def config_keys(self):
197 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
198
199 def __init__(self, *args, **kwargs):
200 super(Module, self).__init__(*args, **kwargs)
201 self.event = Event()
202 self.run = False
203 self.last_upload = None
204 self.last_report = dict()
205 self.report_id = None
206 self.salt = None
207
208 def config_notify(self):
209 for opt in self.MODULE_OPTIONS:
210 setattr(self,
211 opt['name'],
212 self.get_module_option(opt['name']))
213 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
214 # wake up serve() thread
215 self.event.set()
216
217 def load(self):
218 self.last_upload = self.get_store('last_upload', None)
219 if self.last_upload is not None:
220 self.last_upload = int(self.last_upload)
221
222 self.report_id = self.get_store('report_id', None)
223 if self.report_id is None:
224 self.report_id = str(uuid.uuid4())
225 self.set_store('report_id', self.report_id)
226
227 self.salt = self.get_store('salt', None)
228 if not self.salt:
229 self.salt = str(uuid.uuid4())
230 self.set_store('salt', self.salt)
231
232 def gather_osd_metadata(self, osd_map):
233 keys = ["osd_objectstore", "rotational"]
234 keys += self.metadata_keys
235
236 metadata = dict()
237 for key in keys:
238 metadata[key] = defaultdict(int)
239
240 for osd in osd_map['osds']:
241 res = self.get_metadata('osd', str(osd['osd'])).items()
242 if res is None:
243 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
244 continue
245 for k, v in res:
246 if k not in keys:
247 continue
248
249 metadata[k][v] += 1
250
251 return metadata
252
253 def gather_mon_metadata(self, mon_map):
254 keys = list()
255 keys += self.metadata_keys
256
257 metadata = dict()
258 for key in keys:
259 metadata[key] = defaultdict(int)
260
261 for mon in mon_map['mons']:
262 res = self.get_metadata('mon', mon['name']).items()
263 if res is None:
264 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
265 continue
266 for k, v in res:
267 if k not in keys:
268 continue
269
270 metadata[k][v] += 1
271
272 return metadata
273
274 def gather_crush_info(self):
275 osdmap = self.get_osdmap()
276 crush_raw = osdmap.get_crush()
277 crush = crush_raw.dump()
278
279 def inc(d, k):
280 if k in d:
281 d[k] += 1
282 else:
283 d[k] = 1
284
285 device_classes = {}
286 for dev in crush['devices']:
287 inc(device_classes, dev.get('class', ''))
288
289 bucket_algs = {}
290 bucket_types = {}
291 bucket_sizes = {}
292 for bucket in crush['buckets']:
293 if '~' in bucket['name']: # ignore shadow buckets
294 continue
295 inc(bucket_algs, bucket['alg'])
296 inc(bucket_types, bucket['type_id'])
297 inc(bucket_sizes, len(bucket['items']))
298
299 return {
300 'num_devices': len(crush['devices']),
301 'num_types': len(crush['types']),
302 'num_buckets': len(crush['buckets']),
303 'num_rules': len(crush['rules']),
304 'device_classes': list(device_classes.values()),
305 'tunables': crush['tunables'],
306 'compat_weight_set': '-1' in crush['choose_args'],
307 'num_weight_sets': len(crush['choose_args']),
308 'bucket_algs': bucket_algs,
309 'bucket_sizes': bucket_sizes,
310 'bucket_types': bucket_types,
311 }
312
313 def gather_configs(self):
314 # cluster config options
315 cluster = set()
316 r, outb, outs = self.mon_command({
317 'prefix': 'config dump',
318 'format': 'json'
319 });
320 if r != 0:
321 return {}
322 try:
323 dump = json.loads(outb)
324 except json.decoder.JSONDecodeError:
325 return {}
326 for opt in dump:
327 name = opt.get('name')
328 if name:
329 cluster.add(name)
330 # daemon-reported options (which may include ceph.conf)
331 active = set()
332 ls = self.get("modified_config_options");
333 for opt in ls.get('options', {}):
334 active.add(opt)
335 return {
336 'cluster_changed': sorted(list(cluster)),
337 'active_changed': sorted(list(active)),
338 }
339
340 def gather_crashinfo(self):
341 crashlist = list()
342 errno, crashids, err = self.remote('crash', 'ls')
343 if errno:
344 return ''
345 for crashid in crashids.split():
346 cmd = {'id': crashid}
347 errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
348 if errno:
349 continue
350 c = json.loads(crashinfo)
351 del c['utsname_hostname']
352 # entity_name might have more than one '.', beware
353 (etype, eid) = c.get('entity_name', '').split('.', 1)
354 m = hashlib.sha1()
355 m.update(self.salt.encode('utf-8'))
356 m.update(eid.encode('utf-8'))
357 m.update(self.salt.encode('utf-8'))
358 c['entity_name'] = etype + '.' + m.hexdigest()
359 crashlist.append(c)
360 return crashlist
361
362 def get_active_channels(self):
363 r = []
364 if self.channel_basic:
365 r.append('basic')
366 if self.channel_crash:
367 r.append('crash')
368 if self.channel_device:
369 r.append('device')
370 if self.channel_ident:
371 r.append('ident')
372 return r
373
374 def gather_device_report(self):
375 try:
376 time_format = self.remote('devicehealth', 'get_time_format')
377 except:
378 return None
379 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
380 min_sample = cutoff.strftime(time_format)
381
382 devices = self.get('devices')['devices']
383
384 res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
385 for d in devices:
386 devid = d['devid']
387 try:
388 # this is a map of stamp -> {device info}
389 m = self.remote('devicehealth', 'get_recent_device_metrics',
390 devid, min_sample)
391 except:
392 continue
393
394 # anonymize host id
395 try:
396 host = d['location'][0]['host']
397 except:
398 continue
399 anon_host = self.get_store('host-id/%s' % host)
400 if not anon_host:
401 anon_host = str(uuid.uuid1())
402 self.set_store('host-id/%s' % host, anon_host)
403 serial = None
404 for dev, rep in m.items():
405 rep['host_id'] = anon_host
406 if serial is None and 'serial_number' in rep:
407 serial = rep['serial_number']
408
409 # anonymize device id
410 anon_devid = self.get_store('devid-id/%s' % devid)
411 if not anon_devid:
412 # ideally devid is 'vendor_model_serial',
413 # but can also be 'model_serial', 'serial'
414 if '_' in devid:
415 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
416 else:
417 anon_devid = str(uuid.uuid1())
418 self.set_store('devid-id/%s' % devid, anon_devid)
419 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
420 host, anon_host))
421
422 # anonymize the smartctl report itself
423 if serial:
424 m_str = json.dumps(m)
425 m = json.loads(m_str.replace(serial, 'deleted'))
426
427 if anon_host not in res:
428 res[anon_host] = {}
429 res[anon_host][anon_devid] = m
430 return res
431
432 def get_latest(self, daemon_type, daemon_name, stat):
433 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
434 #self.log.error("get_latest {0} data={1}".format(stat, data))
435 if data:
436 return data[-1][1]
437 else:
438 return 0
439
440 def compile_report(self, channels=[]):
441 if not channels:
442 channels = self.get_active_channels()
443 report = {
444 'leaderboard': False,
445 'report_version': 1,
446 'report_timestamp': datetime.utcnow().isoformat(),
447 'report_id': self.report_id,
448 'channels': channels,
449 'channels_available': ALL_CHANNELS,
450 'license': LICENSE,
451 }
452
453 if 'ident' in channels:
454 if self.leaderboard:
455 report['leaderboard'] = True
456 for option in ['description', 'contact', 'organization']:
457 report[option] = getattr(self, option)
458
459 if 'basic' in channels:
460 mon_map = self.get('mon_map')
461 osd_map = self.get('osd_map')
462 service_map = self.get('service_map')
463 fs_map = self.get('fs_map')
464 df = self.get('df')
465
466 report['created'] = mon_map['created']
467
468 # mons
469 v1_mons = 0
470 v2_mons = 0
471 ipv4_mons = 0
472 ipv6_mons = 0
473 for mon in mon_map['mons']:
474 for a in mon['public_addrs']['addrvec']:
475 if a['type'] == 'v2':
476 v2_mons += 1
477 elif a['type'] == 'v1':
478 v1_mons += 1
479 if a['addr'].startswith('['):
480 ipv6_mons += 1
481 else:
482 ipv4_mons += 1
483 report['mon'] = {
484 'count': len(mon_map['mons']),
485 'features': mon_map['features'],
486 'min_mon_release': mon_map['min_mon_release'],
487 'v1_addr_mons': v1_mons,
488 'v2_addr_mons': v2_mons,
489 'ipv4_addr_mons': ipv4_mons,
490 'ipv6_addr_mons': ipv6_mons,
491 }
492
493 report['config'] = self.gather_configs()
494
495 # pools
496 report['rbd'] = {
497 'num_pools': 0,
498 'num_images_by_pool': [],
499 'mirroring_by_pool': [],
500 }
501 num_pg = 0
502 report['pools'] = list()
503 for pool in osd_map['pools']:
504 num_pg += pool['pg_num']
505 ec_profile = {}
506 if pool['erasure_code_profile']:
507 orig = osd_map['erasure_code_profiles'].get(
508 pool['erasure_code_profile'], {})
509 ec_profile = {
510 k: orig[k] for k in orig.keys()
511 if k in ['k', 'm', 'plugin', 'technique',
512 'crush-failure-domain', 'l']
513 }
514 report['pools'].append(
515 {
516 'pool': pool['pool'],
517 'type': pool['type'],
518 'pg_num': pool['pg_num'],
519 'pgp_num': pool['pg_placement_num'],
520 'size': pool['size'],
521 'min_size': pool['min_size'],
522 'pg_autoscale_mode': pool['pg_autoscale_mode'],
523 'target_max_bytes': pool['target_max_bytes'],
524 'target_max_objects': pool['target_max_objects'],
525 'type': ['', 'replicated', '', 'erasure'][pool['type']],
526 'erasure_code_profile': ec_profile,
527 'cache_mode': pool['cache_mode'],
528 }
529 )
530 if 'rbd' in pool['application_metadata']:
531 report['rbd']['num_pools'] += 1
532 ioctx = self.rados.open_ioctx(pool['pool_name'])
533 report['rbd']['num_images_by_pool'].append(
534 sum(1 for _ in rbd.RBD().list2(ioctx)))
535 report['rbd']['mirroring_by_pool'].append(
536 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
537
538 # osds
539 cluster_network = False
540 for osd in osd_map['osds']:
541 if osd['up'] and not cluster_network:
542 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
543 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
544 if front_ip != back_ip:
545 cluster_network = True
546 report['osd'] = {
547 'count': len(osd_map['osds']),
548 'require_osd_release': osd_map['require_osd_release'],
549 'require_min_compat_client': osd_map['require_min_compat_client'],
550 'cluster_network': cluster_network,
551 }
552
553 # crush
554 report['crush'] = self.gather_crush_info()
555
556 # cephfs
557 report['fs'] = {
558 'count': len(fs_map['filesystems']),
559 'feature_flags': fs_map['feature_flags'],
560 'num_standby_mds': len(fs_map['standbys']),
561 'filesystems': [],
562 }
563 num_mds = len(fs_map['standbys'])
564 for fsm in fs_map['filesystems']:
565 fs = fsm['mdsmap']
566 num_sessions = 0
567 cached_ino = 0
568 cached_dn = 0
569 cached_cap = 0
570 subtrees = 0
571 rfiles = 0
572 rbytes = 0
573 rsnaps = 0
574 for gid, mds in fs['info'].items():
575 num_sessions += self.get_latest('mds', mds['name'],
576 'mds_sessions.session_count')
577 cached_ino += self.get_latest('mds', mds['name'],
578 'mds_mem.ino')
579 cached_dn += self.get_latest('mds', mds['name'],
580 'mds_mem.dn')
581 cached_cap += self.get_latest('mds', mds['name'],
582 'mds_mem.cap')
583 subtrees += self.get_latest('mds', mds['name'],
584 'mds.subtrees')
585 if mds['rank'] == 0:
586 rfiles = self.get_latest('mds', mds['name'],
587 'mds.root_rfiles')
588 rbytes = self.get_latest('mds', mds['name'],
589 'mds.root_rbytes')
590 rsnaps = self.get_latest('mds', mds['name'],
591 'mds.root_rsnaps')
592 report['fs']['filesystems'].append({
593 'max_mds': fs['max_mds'],
594 'ever_allowed_features': fs['ever_allowed_features'],
595 'explicitly_allowed_features': fs['explicitly_allowed_features'],
596 'num_in': len(fs['in']),
597 'num_up': len(fs['up']),
598 'num_standby_replay': len(
599 [mds for gid, mds in fs['info'].items()
600 if mds['state'] == 'up:standby-replay']),
601 'num_mds': len(fs['info']),
602 'num_sessions': num_sessions,
603 'cached_inos': cached_ino,
604 'cached_dns': cached_dn,
605 'cached_caps': cached_cap,
606 'cached_subtrees': subtrees,
607 'balancer_enabled': len(fs['balancer']) > 0,
608 'num_data_pools': len(fs['data_pools']),
609 'standby_count_wanted': fs['standby_count_wanted'],
610 'approx_ctime': fs['created'][0:7],
611 'files': rfiles,
612 'bytes': rbytes,
613 'snaps': rsnaps,
614 })
615 num_mds += len(fs['info'])
616 report['fs']['total_num_mds'] = num_mds
617
618 # daemons
619 report['metadata'] = dict()
620 report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
621 report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
622
623 # host counts
624 servers = self.list_servers()
625 self.log.debug('servers %s' % servers)
626 report['hosts'] = {
627 'num': len([h for h in servers if h['hostname']]),
628 }
629 for t in ['mon', 'mds', 'osd', 'mgr']:
630 report['hosts']['num_with_' + t] = len(
631 [h for h in servers
632 if len([s for s in h['services'] if s['type'] == t])]
633 )
634
635 report['usage'] = {
636 'pools': len(df['pools']),
637 'pg_num': num_pg,
638 'total_used_bytes': df['stats']['total_used_bytes'],
639 'total_bytes': df['stats']['total_bytes'],
640 'total_avail_bytes': df['stats']['total_avail_bytes']
641 }
642
643 report['services'] = defaultdict(int)
644 for key, value in service_map['services'].items():
645 report['services'][key] += 1
646 if key == 'rgw':
647 report['rgw'] = {
648 'count': 0,
649 }
650 zones = set()
651 realms = set()
652 zonegroups = set()
653 frontends = set()
654 d = value.get('daemons', dict())
655
656 for k,v in d.items():
657 if k == 'summary' and v:
658 report['rgw'][k] = v
659 elif isinstance(v, dict) and 'metadata' in v:
660 report['rgw']['count'] += 1
661 zones.add(v['metadata']['zone_id'])
662 zonegroups.add(v['metadata']['zonegroup_id'])
663 frontends.add(v['metadata']['frontend_type#0'])
664
665 # we could actually iterate over all the keys of
666 # the dict and check for how many frontends there
667 # are, but it is unlikely that one would be running
668 # more than 2 supported ones
669 f2 = v['metadata'].get('frontend_type#1', None)
670 if f2:
671 frontends.add(f2)
672
673 report['rgw']['zones'] = len(zones)
674 report['rgw']['zonegroups'] = len(zonegroups)
675 report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
676
677 try:
678 report['balancer'] = self.remote('balancer', 'gather_telemetry')
679 except ImportError:
680 report['balancer'] = {
681 'active': False
682 }
683
684 if 'crash' in channels:
685 report['crashes'] = self.gather_crashinfo()
686
687 # NOTE: We do not include the 'device' channel in this report; it is
688 # sent to a different endpoint.
689
690 return report
691
692 def _try_post(self, what, url, report):
693 self.log.info('Sending %s to: %s' % (what, url))
694 proxies = dict()
695 if self.proxy:
696 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
697 proxies['http'] = self.proxy
698 proxies['https'] = self.proxy
699 try:
700 resp = requests.put(url=url, json=report, proxies=proxies)
701 resp.raise_for_status()
702 except Exception as e:
703 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
704 self.log.error(fail_reason)
705 return fail_reason
706 return None
707
708 def send(self, report, endpoint=None):
709 if not endpoint:
710 endpoint = ['ceph', 'device']
711 failed = []
712 success = []
713 self.log.debug('Send endpoints %s' % endpoint)
714 for e in endpoint:
715 if e == 'ceph':
716 fail_reason = self._try_post('ceph report', self.url, report)
717 if fail_reason:
718 failed.append(fail_reason)
719 else:
720 now = int(time.time())
721 self.last_upload = now
722 self.set_store('last_upload', str(now))
723 success.append('Ceph report sent to {0}'.format(self.url))
724 self.log.info('Sent report to {0}'.format(self.url))
725 elif e == 'device':
726 if 'device' in self.get_active_channels():
727 devices = self.gather_device_report()
728 num_devs = 0
729 num_hosts = 0
730 for host, ls in devices.items():
731 self.log.debug('host %s devices %s' % (host, ls))
732 if not len(ls):
733 continue
734 fail_reason = self._try_post('devices', self.device_url,
735 ls)
736 if fail_reason:
737 failed.append(fail_reason)
738 else:
739 num_devs += len(ls)
740 num_hosts += 1
741 if num_devs:
742 success.append('Reported %d devices across %d hosts' % (
743 num_devs, len(devices)))
744 if failed:
745 return 1, '', '\n'.join(success + failed)
746 return 0, '', '\n'.join(success)
747
748 def handle_command(self, inbuf, command):
749 if command['prefix'] == 'telemetry status':
750 r = {}
751 for opt in self.MODULE_OPTIONS:
752 r[opt['name']] = getattr(self, opt['name'])
753 r['last_upload'] = time.ctime(self.last_upload) if self.last_upload else self.last_upload
754 return 0, json.dumps(r, indent=4, sort_keys=True), ''
755 elif command['prefix'] == 'telemetry on':
756 if command.get('license') != LICENSE:
757 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
758 self.on()
759 return 0, '', ''
760 elif command['prefix'] == 'telemetry off':
761 self.off()
762 return 0, '', ''
763 elif command['prefix'] == 'telemetry send':
764 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and command.get('license') != LICENSE:
765 self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
766 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
767 self.last_report = self.compile_report()
768 return self.send(self.last_report, command.get('endpoint'))
769
770 elif command['prefix'] == 'telemetry show':
771 report = self.get_report(channels=command.get('channels', None))
772 report = json.dumps(report, indent=4, sort_keys=True)
773 if self.channel_device:
774 report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.'
775 return 0, report, ''
776 elif command['prefix'] == 'telemetry show-device':
777 return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), ''
778 elif command['prefix'] == 'telemetry show-all':
779 return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), ''
780 else:
781 return (-errno.EINVAL, '',
782 "Command not found '{0}'".format(command['prefix']))
783
784 def on(self):
785 self.set_module_option('enabled', True)
786 self.set_module_option('last_opt_revision', REVISION)
787
788 def off(self):
789 self.set_module_option('enabled', False)
790 self.set_module_option('last_opt_revision', 1)
791
792 def get_report(self, report_type='default', channels=None):
793 if report_type == 'default':
794 return self.compile_report(channels=channels)
795 elif report_type == 'device':
796 return self.gather_device_report()
797 elif report_type == 'all':
798 return {'report': self.compile_report(channels=channels),
799 'device_report': self.gather_device_report()}
800 return {}
801
802 def self_test(self):
803 report = self.compile_report()
804 if len(report) == 0:
805 raise RuntimeError('Report is empty')
806
807 if 'report_id' not in report:
808 raise RuntimeError('report_id not found in report')
809
810 def shutdown(self):
811 self.run = False
812 self.event.set()
813
814 def refresh_health_checks(self):
815 health_checks = {}
816 if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
817 health_checks['TELEMETRY_CHANGED'] = {
818 'severity': 'warning',
819 'summary': 'Telemetry requires re-opt-in',
820 'detail': [
821 'telemetry report includes new information; must re-opt-in (or out)'
822 ]
823 }
824 self.set_health_checks(health_checks)
825
826 def serve(self):
827 self.load()
828 self.config_notify()
829 self.run = True
830
831 self.log.debug('Waiting for mgr to warm up')
832 self.event.wait(10)
833
834 while self.run:
835 self.event.clear()
836
837 self.refresh_health_checks()
838
839 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
840 self.log.debug('Not sending report until user re-opts-in')
841 self.event.wait(1800)
842 continue
843 if not self.enabled:
844 self.log.debug('Not sending report until configured to do so')
845 self.event.wait(1800)
846 continue
847
848 now = int(time.time())
849 if not self.last_upload or (now - self.last_upload) > \
850 self.interval * 3600:
851 self.log.info('Compiling and sending report to %s',
852 self.url)
853
854 try:
855 self.last_report = self.compile_report()
856 except:
857 self.log.exception('Exception while compiling report:')
858
859 self.send(self.last_report)
860 else:
861 self.log.debug('Interval for sending new report has not expired')
862
863 sleep = 3600
864 self.log.debug('Sleeping for %d seconds', sleep)
865 self.event.wait(sleep)
866
867 def self_test(self):
868 self.compile_report()
869 return True
870
871 @staticmethod
872 def can_run():
873 return True, ''