]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import errno
8 import hashlib
9 import json
10 import rbd
11 import re
12 import requests
13 import uuid
14 import time
15 from datetime import datetime, timedelta
16 from threading import Event
17 from collections import defaultdict
18
19 from mgr_module import MgrModule
20
21
22 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
23
24 LICENSE='sharing-1-0'
25 LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
26 LICENSE_URL='https://cdla.io/sharing-1-0/'
27
28 # If the telemetry revision has changed since this point, re-require
29 # an opt-in. This should happen each time we add new information to
30 # the telemetry report.
31 LAST_REVISION_RE_OPT_IN = 2
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Module(MgrModule):
63 config = dict()
64
65 metadata_keys = [
66 "arch",
67 "ceph_version",
68 "os",
69 "cpu",
70 "kernel_description",
71 "kernel_version",
72 "distro_description",
73 "distro"
74 ]
75
76 MODULE_OPTIONS = [
77 {
78 'name': 'url',
79 'type': 'str',
80 'default': 'https://telemetry.ceph.com/report'
81 },
82 {
83 'name': 'device_url',
84 'type': 'str',
85 'default': 'https://telemetry.ceph.com/device'
86 },
87 {
88 'name': 'enabled',
89 'type': 'bool',
90 'default': False
91 },
92 {
93 'name': 'last_opt_revision',
94 'type': 'int',
95 'default': 1,
96 },
97 {
98 'name': 'leaderboard',
99 'type': 'bool',
100 'default': False
101 },
102 {
103 'name': 'description',
104 'type': 'str',
105 'default': None
106 },
107 {
108 'name': 'contact',
109 'type': 'str',
110 'default': None
111 },
112 {
113 'name': 'organization',
114 'type': 'str',
115 'default': None
116 },
117 {
118 'name': 'proxy',
119 'type': 'str',
120 'default': None
121 },
122 {
123 'name': 'interval',
124 'type': 'int',
125 'default': 24,
126 'min': 8
127 },
128 {
129 'name': 'channel_basic',
130 'type': 'bool',
131 'default': True,
132 'desc': 'Share basic cluster information (size, version)',
133 },
134 {
135 'name': 'channel_ident',
136 'type': 'bool',
137 'default': False,
138 'description': 'Share a user-provided description and/or contact email for the cluster',
139 },
140 {
141 'name': 'channel_crash',
142 'type': 'bool',
143 'default': True,
144 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
145 },
146 {
147 'name': 'channel_device',
148 'type': 'bool',
149 'default': True,
150 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
151 },
152 ]
153
154 COMMANDS = [
155 {
156 "cmd": "telemetry status",
157 "desc": "Show current configuration",
158 "perm": "r"
159 },
160 {
161 "cmd": "telemetry send "
162 "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false",
163 "desc": "Force sending data to Ceph telemetry",
164 "perm": "rw"
165 },
166 {
167 "cmd": "telemetry show "
168 "name=channels,type=CephString,n=N,req=False",
169 "desc": "Show last report or report to be sent",
170 "perm": "r"
171 },
172 {
173 "cmd": "telemetry show-device",
174 "desc": "Show last device report or device report to be sent",
175 "perm": "r"
176 },
177 {
178 "cmd": "telemetry on name=license,type=CephString,req=false",
179 "desc": "Enable telemetry reports from this cluster",
180 "perm": "rw",
181 },
182 {
183 "cmd": "telemetry off",
184 "desc": "Disable telemetry reports from this cluster",
185 "perm": "rw",
186 },
187 ]
188
189 @property
190 def config_keys(self):
191 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
192
193 def __init__(self, *args, **kwargs):
194 super(Module, self).__init__(*args, **kwargs)
195 self.event = Event()
196 self.run = False
197 self.last_upload = None
198 self.last_report = dict()
199 self.report_id = None
200 self.salt = None
201
202 def config_notify(self):
203 for opt in self.MODULE_OPTIONS:
204 setattr(self,
205 opt['name'],
206 self.get_module_option(opt['name']))
207 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
208 # wake up serve() thread
209 self.event.set()
210
211 @staticmethod
212 def parse_timestamp(timestamp):
213 return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
214
215 def load(self):
216 self.last_upload = self.get_store('last_upload', None)
217 if self.last_upload is not None:
218 self.last_upload = int(self.last_upload)
219
220 self.report_id = self.get_store('report_id', None)
221 if self.report_id is None:
222 self.report_id = str(uuid.uuid4())
223 self.set_store('report_id', self.report_id)
224
225 self.salt = self.get_store('salt', None)
226 if not self.salt:
227 self.salt = str(uuid.uuid4())
228 self.set_store('salt', self.salt)
229
230 def gather_osd_metadata(self, osd_map):
231 keys = ["osd_objectstore", "rotational"]
232 keys += self.metadata_keys
233
234 metadata = dict()
235 for key in keys:
236 metadata[key] = defaultdict(int)
237
238 for osd in osd_map['osds']:
239 res = self.get_metadata('osd', str(osd['osd'])).items()
240 if res is None:
241 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
242 continue
243 for k, v in res:
244 if k not in keys:
245 continue
246
247 metadata[k][v] += 1
248
249 return metadata
250
251 def gather_mon_metadata(self, mon_map):
252 keys = list()
253 keys += self.metadata_keys
254
255 metadata = dict()
256 for key in keys:
257 metadata[key] = defaultdict(int)
258
259 for mon in mon_map['mons']:
260 res = self.get_metadata('mon', mon['name']).items()
261 if res is None:
262 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
263 continue
264 for k, v in res:
265 if k not in keys:
266 continue
267
268 metadata[k][v] += 1
269
270 return metadata
271
272 def gather_crush_info(self):
273 osdmap = self.get_osdmap()
274 crush_raw = osdmap.get_crush()
275 crush = crush_raw.dump()
276
277 def inc(d, k):
278 if k in d:
279 d[k] += 1
280 else:
281 d[k] = 1
282
283 device_classes = {}
284 for dev in crush['devices']:
285 inc(device_classes, dev.get('class', ''))
286
287 bucket_algs = {}
288 bucket_types = {}
289 bucket_sizes = {}
290 for bucket in crush['buckets']:
291 if '~' in bucket['name']: # ignore shadow buckets
292 continue
293 inc(bucket_algs, bucket['alg'])
294 inc(bucket_types, bucket['type_id'])
295 inc(bucket_sizes, len(bucket['items']))
296
297 return {
298 'num_devices': len(crush['devices']),
299 'num_types': len(crush['types']),
300 'num_buckets': len(crush['buckets']),
301 'num_rules': len(crush['rules']),
302 'device_classes': list(device_classes.values()),
303 'tunables': crush['tunables'],
304 'compat_weight_set': '-1' in crush['choose_args'],
305 'num_weight_sets': len(crush['choose_args']),
306 'bucket_algs': bucket_algs,
307 'bucket_sizes': bucket_sizes,
308 'bucket_types': bucket_types,
309 }
310
311 def gather_configs(self):
312 # cluster config options
313 cluster = set()
314 r, outb, outs = self.mon_command({
315 'prefix': 'config dump',
316 'format': 'json'
317 });
318 if r != 0:
319 return {}
320 try:
321 dump = json.loads(outb)
322 except json.decoder.JSONDecodeError:
323 return {}
324 for opt in dump:
325 name = opt.get('name')
326 if name:
327 cluster.add(name)
328 # daemon-reported options (which may include ceph.conf)
329 active = set()
330 ls = self.get("modified_config_options");
331 for opt in ls.get('options', {}):
332 active.add(opt)
333 return {
334 'cluster_changed': sorted(list(cluster)),
335 'active_changed': sorted(list(active)),
336 }
337
338 def gather_crashinfo(self):
339 crashlist = list()
340 errno, crashids, err = self.remote('crash', 'ls')
341 if errno:
342 return ''
343 for crashid in crashids.split():
344 cmd = {'id': crashid}
345 errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
346 if errno:
347 continue
348 c = json.loads(crashinfo)
349 del c['utsname_hostname']
350 # entity_name might have more than one '.', beware
351 (etype, eid) = c.get('entity_name', '').split('.', 1)
352 m = hashlib.sha1()
353 m.update(self.salt.encode('utf-8'))
354 m.update(eid.encode('utf-8'))
355 m.update(self.salt.encode('utf-8'))
356 c['entity_name'] = etype + '.' + m.hexdigest()
357 crashlist.append(c)
358 return crashlist
359
360 def get_active_channels(self):
361 r = []
362 if self.channel_basic:
363 r.append('basic')
364 if self.channel_crash:
365 r.append('crash')
366 if self.channel_device:
367 r.append('device')
368 return r
369
370 def gather_device_report(self):
371 try:
372 time_format = self.remote('devicehealth', 'get_time_format')
373 except:
374 return None
375 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
376 min_sample = cutoff.strftime(time_format)
377
378 devices = self.get('devices')['devices']
379
380 res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
381 for d in devices:
382 devid = d['devid']
383 try:
384 # this is a map of stamp -> {device info}
385 m = self.remote('devicehealth', 'get_recent_device_metrics',
386 devid, min_sample)
387 except:
388 continue
389
390 # anonymize host id
391 try:
392 host = d['location'][0]['host']
393 except:
394 continue
395 anon_host = self.get_store('host-id/%s' % host)
396 if not anon_host:
397 anon_host = str(uuid.uuid1())
398 self.set_store('host-id/%s' % host, anon_host)
399 for dev, rep in m.items():
400 rep['host_id'] = anon_host
401
402 # anonymize device id
403 anon_devid = self.get_store('devid-id/%s' % devid)
404 if not anon_devid:
405 anon_devid = devid[:devid.rfind('_')] + '_' + uuid.uuid1()
406 self.set_store('devid-id/%s' % devid, anon_devid)
407 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
408 host, anon_host))
409
410 # anonymize the smartctl report itself
411 serial = devid.rsplit('_', 1)[1]
412 m_str = json.dumps(m)
413 m = json.loads(m_str.replace(serial, 'deleted'))
414
415 if anon_host not in res:
416 res[anon_host] = {}
417 res[anon_host][anon_devid] = m
418 return res
419
420 def get_latest(self, daemon_type, daemon_name, stat):
421 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
422 #self.log.error("get_latest {0} data={1}".format(stat, data))
423 if data:
424 return data[-1][1]
425 else:
426 return 0
427
428 def compile_report(self, channels=[]):
429 if not channels:
430 channels = self.get_active_channels()
431 report = {
432 'leaderboard': False,
433 'report_version': 1,
434 'report_timestamp': datetime.utcnow().isoformat(),
435 'report_id': self.report_id,
436 'channels': channels,
437 'channels_available': ALL_CHANNELS,
438 'license': LICENSE,
439 }
440
441 if 'ident' in channels:
442 if self.leaderboard:
443 report['leaderboard'] = True
444 for option in ['description', 'contact', 'organization']:
445 report[option] = getattr(self, option)
446
447 if 'basic' in channels:
448 mon_map = self.get('mon_map')
449 osd_map = self.get('osd_map')
450 service_map = self.get('service_map')
451 fs_map = self.get('fs_map')
452 df = self.get('df')
453
454 report['created'] = self.parse_timestamp(mon_map['created']).isoformat()
455
456 # mons
457 v1_mons = 0
458 v2_mons = 0
459 ipv4_mons = 0
460 ipv6_mons = 0
461 for mon in mon_map['mons']:
462 for a in mon['public_addrs']['addrvec']:
463 if a['type'] == 'v2':
464 v2_mons += 1
465 elif a['type'] == 'v1':
466 v1_mons += 1
467 if a['addr'].startswith('['):
468 ipv6_mons += 1
469 else:
470 ipv4_mons += 1
471 report['mon'] = {
472 'count': len(mon_map['mons']),
473 'features': mon_map['features'],
474 'min_mon_release': mon_map['min_mon_release'],
475 'v1_addr_mons': v1_mons,
476 'v2_addr_mons': v2_mons,
477 'ipv4_addr_mons': ipv4_mons,
478 'ipv6_addr_mons': ipv6_mons,
479 }
480
481 report['config'] = self.gather_configs()
482
483 # pools
484 report['rbd'] = {
485 'num_pools': 0,
486 'num_images_by_pool': [],
487 'mirroring_by_pool': [],
488 }
489 num_pg = 0
490 report['pools'] = list()
491 for pool in osd_map['pools']:
492 num_pg += pool['pg_num']
493 ec_profile = {}
494 if pool['erasure_code_profile']:
495 orig = osd_map['erasure_code_profiles'].get(
496 pool['erasure_code_profile'], {})
497 ec_profile = {
498 k: orig[k] for k in orig.keys()
499 if k in ['k', 'm', 'plugin', 'technique',
500 'crush-failure-domain', 'l']
501 }
502 report['pools'].append(
503 {
504 'pool': pool['pool'],
505 'type': pool['type'],
506 'pg_num': pool['pg_num'],
507 'pgp_num': pool['pg_placement_num'],
508 'size': pool['size'],
509 'min_size': pool['min_size'],
510 'pg_autoscale_mode': pool['pg_autoscale_mode'],
511 'target_max_bytes': pool['target_max_bytes'],
512 'target_max_objects': pool['target_max_objects'],
513 'type': ['', 'replicated', '', 'erasure'][pool['type']],
514 'erasure_code_profile': ec_profile,
515 'cache_mode': pool['cache_mode'],
516 }
517 )
518 if 'rbd' in pool['application_metadata']:
519 report['rbd']['num_pools'] += 1
520 ioctx = self.rados.open_ioctx(pool['pool_name'])
521 report['rbd']['num_images_by_pool'].append(
522 sum(1 for _ in rbd.RBD().list2(ioctx)))
523 report['rbd']['mirroring_by_pool'].append(
524 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
525
526 # osds
527 cluster_network = False
528 for osd in osd_map['osds']:
529 if osd['up'] and not cluster_network:
530 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
531 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
532 if front_ip != back_ip:
533 cluster_network = True
534 report['osd'] = {
535 'count': len(osd_map['osds']),
536 'require_osd_release': osd_map['require_osd_release'],
537 'require_min_compat_client': osd_map['require_min_compat_client'],
538 'cluster_network': cluster_network,
539 }
540
541 # crush
542 report['crush'] = self.gather_crush_info()
543
544 # cephfs
545 report['fs'] = {
546 'count': len(fs_map['filesystems']),
547 'feature_flags': fs_map['feature_flags'],
548 'num_standby_mds': len(fs_map['standbys']),
549 'filesystems': [],
550 }
551 num_mds = len(fs_map['standbys'])
552 for fsm in fs_map['filesystems']:
553 fs = fsm['mdsmap']
554 num_sessions = 0
555 cached_ino = 0
556 cached_dn = 0
557 cached_cap = 0
558 subtrees = 0
559 rfiles = 0
560 rbytes = 0
561 rsnaps = 0
562 for gid, mds in fs['info'].items():
563 num_sessions += self.get_latest('mds', mds['name'],
564 'mds_sessions.session_count')
565 cached_ino += self.get_latest('mds', mds['name'],
566 'mds_mem.ino')
567 cached_dn += self.get_latest('mds', mds['name'],
568 'mds_mem.dn')
569 cached_cap += self.get_latest('mds', mds['name'],
570 'mds_mem.cap')
571 subtrees += self.get_latest('mds', mds['name'],
572 'mds.subtrees')
573 if mds['rank'] == 0:
574 rfiles = self.get_latest('mds', mds['name'],
575 'mds.root_rfiles')
576 rbytes = self.get_latest('mds', mds['name'],
577 'mds.root_rbytes')
578 rsnaps = self.get_latest('mds', mds['name'],
579 'mds.root_rsnaps')
580 report['fs']['filesystems'].append({
581 'max_mds': fs['max_mds'],
582 'ever_allowed_features': fs['ever_allowed_features'],
583 'explicitly_allowed_features': fs['explicitly_allowed_features'],
584 'num_in': len(fs['in']),
585 'num_up': len(fs['up']),
586 'num_standby_replay': len(
587 [mds for gid, mds in fs['info'].items()
588 if mds['state'] == 'up:standby-replay']),
589 'num_mds': len(fs['info']),
590 'num_sessions': num_sessions,
591 'cached_inos': cached_ino,
592 'cached_dns': cached_dn,
593 'cached_caps': cached_cap,
594 'cached_subtrees': subtrees,
595 'balancer_enabled': len(fs['balancer']) > 0,
596 'num_data_pools': len(fs['data_pools']),
597 'standby_count_wanted': fs['standby_count_wanted'],
598 'approx_ctime': fs['created'][0:7],
599 'files': rfiles,
600 'bytes': rbytes,
601 'snaps': rsnaps,
602 })
603 num_mds += len(fs['info'])
604 report['fs']['total_num_mds'] = num_mds
605
606 # daemons
607 report['metadata'] = dict()
608 report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
609 report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
610
611 # host counts
612 servers = self.list_servers()
613 self.log.debug('servers %s' % servers)
614 report['hosts'] = {
615 'num': len([h for h in servers if h['hostname']]),
616 }
617 for t in ['mon', 'mds', 'osd', 'mgr']:
618 report['hosts']['num_with_' + t] = len(
619 [h for h in servers
620 if len([s for s in h['services'] if s['type'] == t])]
621 )
622
623 report['usage'] = {
624 'pools': len(df['pools']),
625 'pg_num': num_pg,
626 'total_used_bytes': df['stats']['total_used_bytes'],
627 'total_bytes': df['stats']['total_bytes'],
628 'total_avail_bytes': df['stats']['total_avail_bytes']
629 }
630
631 report['services'] = defaultdict(int)
632 for key, value in service_map['services'].items():
633 report['services'][key] += 1
634 if key == 'rgw':
635 report['rgw'] = {
636 'count': 0,
637 }
638 zones = set()
639 realms = set()
640 zonegroups = set()
641 frontends = set()
642 d = value.get('daemons', dict())
643
644 for k,v in d.items():
645 if k == 'summary' and v:
646 report['rgw'][k] = v
647 elif isinstance(v, dict) and 'metadata' in v:
648 report['rgw']['count'] += 1
649 zones.add(v['metadata']['zone_id'])
650 zonegroups.add(v['metadata']['zonegroup_id'])
651 frontends.add(v['metadata']['frontend_type#0'])
652
653 # we could actually iterate over all the keys of
654 # the dict and check for how many frontends there
655 # are, but it is unlikely that one would be running
656 # more than 2 supported ones
657 f2 = v['metadata'].get('frontend_type#1', None)
658 if f2:
659 frontends.add(f2)
660
661 report['rgw']['zones'] = len(zones)
662 report['rgw']['zonegroups'] = len(zonegroups)
663 report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
664
665 try:
666 report['balancer'] = self.remote('balancer', 'gather_telemetry')
667 except ImportError:
668 report['balancer'] = {
669 'active': False
670 }
671
672 if 'crash' in channels:
673 report['crashes'] = self.gather_crashinfo()
674
675 # NOTE: We do not include the 'device' channel in this report; it is
676 # sent to a different endpoint.
677
678 return report
679
680 def send(self, report, endpoint=None):
681 if not endpoint:
682 endpoint = ['ceph', 'device']
683 failed = []
684 success = []
685 proxies = dict()
686 self.log.debug('Send endpoints %s' % endpoint)
687 if self.proxy:
688 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
689 proxies['http'] = self.proxy
690 proxies['https'] = self.proxy
691 for e in endpoint:
692 if e == 'ceph':
693 self.log.info('Sending ceph report to: %s', self.url)
694 resp = requests.put(url=self.url, json=report, proxies=proxies)
695 if not resp.ok:
696 self.log.error("Report send failed: %d %s %s" %
697 (resp.status_code, resp.reason, resp.text))
698 failed.append('Failed to send report to %s: %d %s %s' % (
699 self.url,
700 resp.status_code,
701 resp.reason,
702 resp.text
703 ))
704 else:
705 now = int(time.time())
706 self.last_upload = now
707 self.set_store('last_upload', str(now))
708 success.append('Ceph report sent to {0}'.format(self.url))
709 self.log.info('Sent report to {0}'.format(self.url))
710 elif e == 'device':
711 if 'device' in self.get_active_channels():
712 self.log.info('hi')
713 self.log.info('Sending device report to: %s',
714 self.device_url)
715 devices = self.gather_device_report()
716 num_devs = 0
717 num_hosts = 0
718 for host, ls in devices.items():
719 self.log.debug('host %s devices %s' % (host, ls))
720 if not len(ls):
721 continue
722 resp = requests.put(url=self.device_url, json=ls,
723 proxies=proxies)
724 if not resp.ok:
725 self.log.error(
726 "Device report failed: %d %s %s" %
727 (resp.status_code, resp.reason, resp.text))
728 failed.append(
729 'Failed to send devices to %s: %d %s %s' % (
730 self.device_url,
731 resp.status_code,
732 resp.reason,
733 resp.text
734 ))
735 else:
736 num_devs += len(ls)
737 num_hosts += 1
738 if num_devs:
739 success.append('Reported %d devices across %d hosts' % (
740 num_devs, len(devices)))
741 if failed:
742 return 1, '', '\n'.join(success + failed)
743 return 0, '', '\n'.join(success)
744
745 def handle_command(self, inbuf, command):
746 if command['prefix'] == 'telemetry status':
747 r = {}
748 for opt in self.MODULE_OPTIONS:
749 r[opt['name']] = getattr(self, opt['name'])
750 return 0, json.dumps(r, indent=4), ''
751 elif command['prefix'] == 'telemetry on':
752 if command.get('license') != LICENSE:
753 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
754 self.set_module_option('enabled', True)
755 self.set_module_option('last_opt_revision', REVISION)
756 return 0, '', ''
757 elif command['prefix'] == 'telemetry off':
758 self.set_module_option('enabled', False)
759 self.set_module_option('last_opt_revision', REVISION)
760 return 0, '', ''
761 elif command['prefix'] == 'telemetry send':
762 self.last_report = self.compile_report()
763 return self.send(self.last_report, command.get('endpoint'))
764
765 elif command['prefix'] == 'telemetry show':
766 report = self.compile_report(
767 channels=command.get('channels', None)
768 )
769 report = json.dumps(report, indent=4)
770 if self.channel_device:
771 report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.'
772 return 0, report, ''
773 elif command['prefix'] == 'telemetry show-device':
774 return 0, json.dumps(self.gather_device_report(), indent=4, sort_keys=True), ''
775 else:
776 return (-errno.EINVAL, '',
777 "Command not found '{0}'".format(command['prefix']))
778
779 def self_test(self):
780 report = self.compile_report()
781 if len(report) == 0:
782 raise RuntimeError('Report is empty')
783
784 if 'report_id' not in report:
785 raise RuntimeError('report_id not found in report')
786
787 def shutdown(self):
788 self.run = False
789 self.event.set()
790
791 def refresh_health_checks(self):
792 health_checks = {}
793 if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
794 health_checks['TELEMETRY_CHANGED'] = {
795 'severity': 'warning',
796 'summary': 'Telemetry requires re-opt-in',
797 'detail': [
798 'telemetry report includes new information; must re-opt-in (or out)'
799 ]
800 }
801 self.set_health_checks(health_checks)
802
803 def serve(self):
804 self.load()
805 self.config_notify()
806 self.run = True
807
808 self.log.debug('Waiting for mgr to warm up')
809 self.event.wait(10)
810
811 while self.run:
812 self.event.clear()
813
814 self.refresh_health_checks()
815
816 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
817 self.log.debug('Not sending report until user re-opts-in')
818 self.event.wait(1800)
819 continue
820 if not self.enabled:
821 self.log.debug('Not sending report until configured to do so')
822 self.event.wait(1800)
823 continue
824
825 now = int(time.time())
826 if not self.last_upload or (now - self.last_upload) > \
827 self.interval * 3600:
828 self.log.info('Compiling and sending report to %s',
829 self.url)
830
831 try:
832 self.last_report = self.compile_report()
833 except:
834 self.log.exception('Exception while compiling report:')
835
836 self.send(self.last_report)
837 else:
838 self.log.debug('Interval for sending new report has not expired')
839
840 sleep = 3600
841 self.log.debug('Sleeping for %d seconds', sleep)
842 self.event.wait(sleep)
843
844 def self_test(self):
845 self.compile_report()
846 return True
847
848 @staticmethod
849 def can_run():
850 return True, ''