]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
import ceph 14.2.5
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
1 """
2 Telemetry module for ceph-mgr
3
4 Collect statistics from Ceph cluster and send this back to the Ceph project
5 when user has opted-in
6 """
7 import errno
8 import hashlib
9 import json
10 import rbd
11 import re
12 import requests
13 import uuid
14 import time
15 from datetime import datetime, timedelta
16 from threading import Event
17 from collections import defaultdict
18
19 from mgr_module import MgrModule
20
21
22 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
23
24 LICENSE='sharing-1-0'
25 LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
26 LICENSE_URL='https://cdla.io/sharing-1-0/'
27
28 # If the telemetry revision has changed since this point, re-require
29 # an opt-in. This should happen each time we add new information to
30 # the telemetry report.
31 LAST_REVISION_RE_OPT_IN = 2
32
33 # Latest revision of the telemetry report. Bump this each time we make
34 # *any* change.
35 REVISION = 3
36
37 # History of revisions
38 # --------------------
39 #
40 # Version 1:
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
43 #
44 # Version 2:
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
47 # - added channels
48 # - added explicit license acknowledgement to the opt-in process
49 #
50 # Version 3:
51 # - added device health metrics (i.e., SMART data, minus serial number)
52 # - remove crush_rule
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
60 # - crush map stats
61
62 class Module(MgrModule):
63 config = dict()
64
65 metadata_keys = [
66 "arch",
67 "ceph_version",
68 "os",
69 "cpu",
70 "kernel_description",
71 "kernel_version",
72 "distro_description",
73 "distro"
74 ]
75
76 MODULE_OPTIONS = [
77 {
78 'name': 'url',
79 'type': 'str',
80 'default': 'https://telemetry.ceph.com/report'
81 },
82 {
83 'name': 'device_url',
84 'type': 'str',
85 'default': 'https://telemetry.ceph.com/device'
86 },
87 {
88 'name': 'enabled',
89 'type': 'bool',
90 'default': False
91 },
92 {
93 'name': 'last_opt_revision',
94 'type': 'int',
95 'default': 1,
96 },
97 {
98 'name': 'leaderboard',
99 'type': 'bool',
100 'default': False
101 },
102 {
103 'name': 'description',
104 'type': 'str',
105 'default': None
106 },
107 {
108 'name': 'contact',
109 'type': 'str',
110 'default': None
111 },
112 {
113 'name': 'organization',
114 'type': 'str',
115 'default': None
116 },
117 {
118 'name': 'proxy',
119 'type': 'str',
120 'default': None
121 },
122 {
123 'name': 'interval',
124 'type': 'int',
125 'default': 24,
126 'min': 8
127 },
128 {
129 'name': 'channel_basic',
130 'type': 'bool',
131 'default': True,
132 'desc': 'Share basic cluster information (size, version)',
133 },
134 {
135 'name': 'channel_ident',
136 'type': 'bool',
137 'default': False,
138 'description': 'Share a user-provided description and/or contact email for the cluster',
139 },
140 {
141 'name': 'channel_crash',
142 'type': 'bool',
143 'default': True,
144 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
145 },
146 {
147 'name': 'channel_device',
148 'type': 'bool',
149 'default': True,
150 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
151 },
152 ]
153
154 COMMANDS = [
155 {
156 "cmd": "telemetry status",
157 "desc": "Show current configuration",
158 "perm": "r"
159 },
160 {
161 "cmd": "telemetry send "
162 "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false",
163 "desc": "Force sending data to Ceph telemetry",
164 "perm": "rw"
165 },
166 {
167 "cmd": "telemetry show "
168 "name=channels,type=CephString,n=N,req=False",
169 "desc": "Show last report or report to be sent",
170 "perm": "r"
171 },
172 {
173 "cmd": "telemetry on name=license,type=CephString,req=false",
174 "desc": "Enable telemetry reports from this cluster",
175 "perm": "rw",
176 },
177 {
178 "cmd": "telemetry off",
179 "desc": "Disable telemetry reports from this cluster",
180 "perm": "rw",
181 },
182 ]
183
184 @property
185 def config_keys(self):
186 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
187
188 def __init__(self, *args, **kwargs):
189 super(Module, self).__init__(*args, **kwargs)
190 self.event = Event()
191 self.run = False
192 self.last_upload = None
193 self.last_report = dict()
194 self.report_id = None
195 self.salt = None
196
197 def config_notify(self):
198 for opt in self.MODULE_OPTIONS:
199 setattr(self,
200 opt['name'],
201 self.get_module_option(opt['name']))
202 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
203 # wake up serve() thread
204 self.event.set()
205
206 @staticmethod
207 def parse_timestamp(timestamp):
208 return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
209
210 def load(self):
211 self.last_upload = self.get_store('last_upload', None)
212 if self.last_upload is not None:
213 self.last_upload = int(self.last_upload)
214
215 self.report_id = self.get_store('report_id', None)
216 if self.report_id is None:
217 self.report_id = str(uuid.uuid4())
218 self.set_store('report_id', self.report_id)
219
220 self.salt = self.get_store('salt', None)
221 if not self.salt:
222 self.salt = str(uuid.uuid4())
223 self.set_store('salt', self.salt)
224
225 def gather_osd_metadata(self, osd_map):
226 keys = ["osd_objectstore", "rotational"]
227 keys += self.metadata_keys
228
229 metadata = dict()
230 for key in keys:
231 metadata[key] = defaultdict(int)
232
233 for osd in osd_map['osds']:
234 for k, v in self.get_metadata('osd', str(osd['osd'])).items():
235 if k not in keys:
236 continue
237
238 metadata[k][v] += 1
239
240 return metadata
241
242 def gather_mon_metadata(self, mon_map):
243 keys = list()
244 keys += self.metadata_keys
245
246 metadata = dict()
247 for key in keys:
248 metadata[key] = defaultdict(int)
249
250 for mon in mon_map['mons']:
251 for k, v in self.get_metadata('mon', mon['name']).items():
252 if k not in keys:
253 continue
254
255 metadata[k][v] += 1
256
257 return metadata
258
259 def gather_crush_info(self):
260 osdmap = self.get_osdmap()
261 crush_raw = osdmap.get_crush()
262 crush = crush_raw.dump()
263
264 def inc(d, k):
265 if k in d:
266 d[k] += 1
267 else:
268 d[k] = 1
269
270 device_classes = {}
271 for dev in crush['devices']:
272 inc(device_classes, dev.get('class', ''))
273
274 bucket_algs = {}
275 bucket_types = {}
276 bucket_sizes = {}
277 for bucket in crush['buckets']:
278 if '~' in bucket['name']: # ignore shadow buckets
279 continue
280 inc(bucket_algs, bucket['alg'])
281 inc(bucket_types, bucket['type_id'])
282 inc(bucket_sizes, len(bucket['items']))
283
284 return {
285 'num_devices': len(crush['devices']),
286 'num_types': len(crush['types']),
287 'num_buckets': len(crush['buckets']),
288 'num_rules': len(crush['rules']),
289 'device_classes': list(device_classes.values()),
290 'tunables': crush['tunables'],
291 'compat_weight_set': '-1' in crush['choose_args'],
292 'num_weight_sets': len(crush['choose_args']),
293 'bucket_algs': bucket_algs,
294 'bucket_sizes': bucket_sizes,
295 'bucket_types': bucket_types,
296 }
297
298 def gather_configs(self):
299 # cluster config options
300 cluster = set()
301 r, outb, outs = self.mon_command({
302 'prefix': 'config dump',
303 'format': 'json'
304 });
305 if r != 0:
306 return {}
307 try:
308 dump = json.loads(outb)
309 except json.decoder.JSONDecodeError:
310 return {}
311 for opt in dump:
312 name = opt.get('name')
313 if name:
314 cluster.add(name)
315 # daemon-reported options (which may include ceph.conf)
316 active = set()
317 ls = self.get("modified_config_options");
318 for opt in ls.get('options', {}):
319 active.add(opt)
320 return {
321 'cluster_changed': sorted(list(cluster)),
322 'active_changed': sorted(list(active)),
323 }
324
325 def gather_crashinfo(self):
326 crashlist = list()
327 errno, crashids, err = self.remote('crash', 'ls')
328 if errno:
329 return ''
330 for crashid in crashids.split():
331 cmd = {'id': crashid}
332 errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
333 if errno:
334 continue
335 c = json.loads(crashinfo)
336 del c['utsname_hostname']
337 (etype, eid) = c.get('entity_name', '').split('.')
338 m = hashlib.sha1()
339 m.update(self.salt.encode('utf-8'))
340 m.update(eid.encode('utf-8'))
341 m.update(self.salt.encode('utf-8'))
342 c['entity_name'] = etype + '.' + m.hexdigest()
343 crashlist.append(c)
344 return crashlist
345
346 def get_active_channels(self):
347 r = []
348 if self.channel_basic:
349 r.append('basic')
350 if self.channel_crash:
351 r.append('crash')
352 if self.channel_device:
353 r.append('device')
354 return r
355
356 def gather_device_report(self):
357 try:
358 time_format = self.remote('devicehealth', 'get_time_format')
359 except:
360 return None
361 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
362 min_sample = cutoff.strftime(time_format)
363
364 devices = self.get('devices')['devices']
365
366 res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
367 for d in devices:
368 devid = d['devid']
369 try:
370 # this is a map of stamp -> {device info}
371 m = self.remote('devicehealth', 'get_recent_device_metrics',
372 devid, min_sample)
373 except:
374 continue
375
376 # anonymize host id
377 try:
378 host = d['location'][0]['host']
379 except:
380 continue
381 anon_host = self.get_store('host-id/%s' % host)
382 if not anon_host:
383 anon_host = str(uuid.uuid1())
384 self.set_store('host-id/%s' % host, anon_host)
385 for dev, rep in m.items():
386 rep['host_id'] = anon_host
387
388 # anonymize device id
389 (vendor, model, serial) = devid.split('_')
390 anon_devid = self.get_store('devid-id/%s' % devid)
391 if not anon_devid:
392 anon_devid = '%s_%s_%s' % (vendor, model, uuid.uuid1())
393 self.set_store('devid-id/%s' % devid, anon_devid)
394 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
395 host, anon_host))
396
397 # anonymize the smartctl report itself
398 for k in ['serial_number']:
399 if k in m:
400 m.pop(k)
401
402 if anon_host not in res:
403 res[anon_host] = {}
404 res[anon_host][anon_devid] = m
405 return res
406
407 def get_latest(self, daemon_type, daemon_name, stat):
408 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
409 #self.log.error("get_latest {0} data={1}".format(stat, data))
410 if data:
411 return data[-1][1]
412 else:
413 return 0
414
415 def compile_report(self, channels=[]):
416 if not channels:
417 channels = self.get_active_channels()
418 report = {
419 'leaderboard': False,
420 'report_version': 1,
421 'report_timestamp': datetime.utcnow().isoformat(),
422 'report_id': self.report_id,
423 'channels': channels,
424 'channels_available': ALL_CHANNELS,
425 'license': LICENSE,
426 }
427
428 if 'ident' in channels:
429 if self.leaderboard:
430 report['leaderboard'] = True
431 for option in ['description', 'contact', 'organization']:
432 report[option] = getattr(self, option)
433
434 if 'basic' in channels:
435 mon_map = self.get('mon_map')
436 osd_map = self.get('osd_map')
437 service_map = self.get('service_map')
438 fs_map = self.get('fs_map')
439 df = self.get('df')
440
441 report['created'] = self.parse_timestamp(mon_map['created']).isoformat()
442
443 # mons
444 v1_mons = 0
445 v2_mons = 0
446 ipv4_mons = 0
447 ipv6_mons = 0
448 for mon in mon_map['mons']:
449 for a in mon['public_addrs']['addrvec']:
450 if a['type'] == 'v2':
451 v2_mons += 1
452 elif a['type'] == 'v1':
453 v1_mons += 1
454 if a['addr'].startswith('['):
455 ipv6_mons += 1
456 else:
457 ipv4_mons += 1
458 report['mon'] = {
459 'count': len(mon_map['mons']),
460 'features': mon_map['features'],
461 'min_mon_release': mon_map['min_mon_release'],
462 'v1_addr_mons': v1_mons,
463 'v2_addr_mons': v2_mons,
464 'ipv4_addr_mons': ipv4_mons,
465 'ipv6_addr_mons': ipv6_mons,
466 }
467
468 report['config'] = self.gather_configs()
469
470 # pools
471 report['rbd'] = {
472 'num_pools': 0,
473 'num_images_by_pool': [],
474 'mirroring_by_pool': [],
475 }
476 num_pg = 0
477 report['pools'] = list()
478 for pool in osd_map['pools']:
479 num_pg += pool['pg_num']
480 ec_profile = {}
481 if pool['erasure_code_profile']:
482 orig = osd_map['erasure_code_profiles'].get(
483 pool['erasure_code_profile'], {})
484 ec_profile = {
485 k: orig[k] for k in orig.keys()
486 if k in ['k', 'm', 'plugin', 'technique',
487 'crush-failure-domain', 'l']
488 }
489 report['pools'].append(
490 {
491 'pool': pool['pool'],
492 'type': pool['type'],
493 'pg_num': pool['pg_num'],
494 'pgp_num': pool['pg_placement_num'],
495 'size': pool['size'],
496 'min_size': pool['min_size'],
497 'pg_autoscale_mode': pool['pg_autoscale_mode'],
498 'target_max_bytes': pool['target_max_bytes'],
499 'target_max_objects': pool['target_max_objects'],
500 'type': ['', 'replicated', '', 'erasure'][pool['type']],
501 'erasure_code_profile': ec_profile,
502 'cache_mode': pool['cache_mode'],
503 }
504 )
505 if 'rbd' in pool['application_metadata']:
506 report['rbd']['num_pools'] += 1
507 ioctx = self.rados.open_ioctx(pool['pool_name'])
508 report['rbd']['num_images_by_pool'].append(
509 sum(1 for _ in rbd.RBD().list2(ioctx)))
510 report['rbd']['mirroring_by_pool'].append(
511 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
512
513 # osds
514 cluster_network = False
515 for osd in osd_map['osds']:
516 if osd['up'] and not cluster_network:
517 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
518 back_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
519 if front_ip != back_ip:
520 cluster_network = True
521 report['osd'] = {
522 'count': len(osd_map['osds']),
523 'require_osd_release': osd_map['require_osd_release'],
524 'require_min_compat_client': osd_map['require_min_compat_client'],
525 'cluster_network': cluster_network,
526 }
527
528 # crush
529 report['crush'] = self.gather_crush_info()
530
531 # cephfs
532 report['fs'] = {
533 'count': len(fs_map['filesystems']),
534 'feature_flags': fs_map['feature_flags'],
535 'num_standby_mds': len(fs_map['standbys']),
536 'filesystems': [],
537 }
538 num_mds = len(fs_map['standbys'])
539 for fsm in fs_map['filesystems']:
540 fs = fsm['mdsmap']
541 num_sessions = 0
542 cached_ino = 0
543 cached_dn = 0
544 cached_cap = 0
545 subtrees = 0
546 rfiles = 0
547 rbytes = 0
548 rsnaps = 0
549 for gid, mds in fs['info'].items():
550 num_sessions += self.get_latest('mds', mds['name'],
551 'mds_sessions.session_count')
552 cached_ino += self.get_latest('mds', mds['name'],
553 'mds_mem.ino')
554 cached_dn += self.get_latest('mds', mds['name'],
555 'mds_mem.dn')
556 cached_cap += self.get_latest('mds', mds['name'],
557 'mds_mem.cap')
558 subtrees += self.get_latest('mds', mds['name'],
559 'mds.subtrees')
560 if mds['rank'] == 0:
561 rfiles = self.get_latest('mds', mds['name'],
562 'mds.root_rfiles')
563 rbytes = self.get_latest('mds', mds['name'],
564 'mds.root_rbytes')
565 rsnaps = self.get_latest('mds', mds['name'],
566 'mds.root_rsnaps')
567 report['fs']['filesystems'].append({
568 'max_mds': fs['max_mds'],
569 'ever_allowed_features': fs['ever_allowed_features'],
570 'explicitly_allowed_features': fs['explicitly_allowed_features'],
571 'num_in': len(fs['in']),
572 'num_up': len(fs['up']),
573 'num_standby_replay': len(
574 [mds for gid, mds in fs['info'].items()
575 if mds['state'] == 'up:standby-replay']),
576 'num_mds': len(fs['info']),
577 'num_sessions': num_sessions,
578 'cached_inos': cached_ino,
579 'cached_dns': cached_dn,
580 'cached_caps': cached_cap,
581 'cached_subtrees': subtrees,
582 'balancer_enabled': len(fs['balancer']) > 0,
583 'num_data_pools': len(fs['data_pools']),
584 'standby_count_wanted': fs['standby_count_wanted'],
585 'approx_ctime': fs['created'][0:7],
586 'files': rfiles,
587 'bytes': rbytes,
588 'snaps': rsnaps,
589 })
590 num_mds += len(fs['info'])
591 report['fs']['total_num_mds'] = num_mds
592
593 # daemons
594 report['metadata'] = dict()
595 report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
596 report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
597
598 # host counts
599 servers = self.list_servers()
600 self.log.debug('servers %s' % servers)
601 report['hosts'] = {
602 'num': len([h for h in servers if h['hostname']]),
603 }
604 for t in ['mon', 'mds', 'osd', 'mgr']:
605 report['hosts']['num_with_' + t] = len(
606 [h for h in servers
607 if len([s for s in h['services'] if s['type'] == t])]
608 )
609
610 report['usage'] = {
611 'pools': len(df['pools']),
612 'pg_num:': num_pg,
613 'total_used_bytes': df['stats']['total_used_bytes'],
614 'total_bytes': df['stats']['total_bytes'],
615 'total_avail_bytes': df['stats']['total_avail_bytes']
616 }
617
618 report['services'] = defaultdict(int)
619 for key, value in service_map['services'].items():
620 report['services'][key] += 1
621 if key == 'rgw':
622 report['rgw'] = {
623 'count': 0,
624 }
625 zones = set()
626 realms = set()
627 zonegroups = set()
628 frontends = set()
629 d = value.get('daemons', dict())
630
631 for k,v in d.items():
632 if k == 'summary' and v:
633 report['rgw'][k] = v
634 elif isinstance(v, dict) and 'metadata' in v:
635 report['rgw']['count'] += 1
636 zones.add(v['metadata']['zone_id'])
637 zonegroups.add(v['metadata']['zonegroup_id'])
638 frontends.add(v['metadata']['frontend_type#0'])
639
640 # we could actually iterate over all the keys of
641 # the dict and check for how many frontends there
642 # are, but it is unlikely that one would be running
643 # more than 2 supported ones
644 f2 = v['metadata'].get('frontend_type#1', None)
645 if f2:
646 frontends.add(f2)
647
648 report['rgw']['zones'] = len(zones)
649 report['rgw']['zonegroups'] = len(zonegroups)
650 report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
651
652 try:
653 report['balancer'] = self.remote('balancer', 'gather_telemetry')
654 except ImportError:
655 report['balancer'] = {
656 'active': False
657 }
658
659 if 'crash' in channels:
660 report['crashes'] = self.gather_crashinfo()
661
662 # NOTE: We do not include the 'device' channel in this report; it is
663 # sent to a different endpoint.
664
665 return report
666
667 def send(self, report, endpoint=None):
668 if not endpoint:
669 endpoint = ['ceph', 'device']
670 failed = []
671 success = []
672 proxies = dict()
673 self.log.debug('Send endpoints %s' % endpoint)
674 if self.proxy:
675 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
676 proxies['http'] = self.proxy
677 proxies['https'] = self.proxy
678 for e in endpoint:
679 if e == 'ceph':
680 self.log.info('Sending ceph report to: %s', self.url)
681 resp = requests.put(url=self.url, json=report, proxies=proxies)
682 if not resp.ok:
683 self.log.error("Report send failed: %d %s %s" %
684 (resp.status_code, resp.reason, resp.text))
685 failed.append('Failed to send report to %s: %d %s %s' % (
686 self.url,
687 resp.status_code,
688 resp.reason,
689 resp.text
690 ))
691 else:
692 now = int(time.time())
693 self.last_upload = now
694 self.set_store('last_upload', str(now))
695 success.append('Ceph report sent to {0}'.format(self.url))
696 self.log.info('Sent report to {0}'.format(self.url))
697 elif e == 'device':
698 if 'device' in self.get_active_channels():
699 self.log.info('hi')
700 self.log.info('Sending device report to: %s',
701 self.device_url)
702 devices = self.gather_device_report()
703 num_devs = 0
704 num_hosts = 0
705 for host, ls in devices.items():
706 self.log.debug('host %s devices %s' % (host, ls))
707 if not len(ls):
708 continue
709 resp = requests.put(url=self.device_url, json=ls,
710 proxies=proxies)
711 if not resp.ok:
712 self.log.error(
713 "Device report failed: %d %s %s" %
714 (resp.status_code, resp.reason, resp.text))
715 failed.append(
716 'Failed to send devices to %s: %d %s %s' % (
717 self.device_url,
718 resp.status_code,
719 resp.reason,
720 resp.text
721 ))
722 else:
723 num_devs += len(ls)
724 num_hosts += 1
725 if num_devs:
726 success.append('Reported %d devices across %d hosts' % (
727 num_devs, len(devices)))
728 if failed:
729 return 1, '', '\n'.join(success + failed)
730 return 0, '', '\n'.join(success)
731
732 def handle_command(self, inbuf, command):
733 if command['prefix'] == 'telemetry status':
734 r = {}
735 for opt in self.MODULE_OPTIONS:
736 r[opt['name']] = getattr(self, opt['name'])
737 return 0, json.dumps(r, indent=4), ''
738 elif command['prefix'] == 'telemetry on':
739 if command.get('license') != LICENSE:
740 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
741 self.set_module_option('enabled', True)
742 self.set_module_option('last_opt_revision', REVISION)
743 return 0, '', ''
744 elif command['prefix'] == 'telemetry off':
745 self.set_module_option('enabled', False)
746 self.set_module_option('last_opt_revision', REVISION)
747 return 0, '', ''
748 elif command['prefix'] == 'telemetry send':
749 self.last_report = self.compile_report()
750 return self.send(self.last_report, command.get('endpoint'))
751
752 elif command['prefix'] == 'telemetry show':
753 report = self.compile_report(
754 channels=command.get('channels', None)
755 )
756 return 0, json.dumps(report, indent=4), ''
757 else:
758 return (-errno.EINVAL, '',
759 "Command not found '{0}'".format(command['prefix']))
760
761 def self_test(self):
762 report = self.compile_report()
763 if len(report) == 0:
764 raise RuntimeError('Report is empty')
765
766 if 'report_id' not in report:
767 raise RuntimeError('report_id not found in report')
768
769 def shutdown(self):
770 self.run = False
771 self.event.set()
772
773 def refresh_health_checks(self):
774 health_checks = {}
775 if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
776 health_checks['TELEMETRY_CHANGED'] = {
777 'severity': 'warning',
778 'summary': 'Telemetry requires re-opt-in',
779 'detail': [
780 'telemetry report includes new information; must re-opt-in (or out)'
781 ]
782 }
783 self.set_health_checks(health_checks)
784
785 def serve(self):
786 self.load()
787 self.config_notify()
788 self.run = True
789
790 self.log.debug('Waiting for mgr to warm up')
791 self.event.wait(10)
792
793 while self.run:
794 self.event.clear()
795
796 self.refresh_health_checks()
797
798 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
799 self.log.debug('Not sending report until user re-opts-in')
800 self.event.wait(1800)
801 continue
802 if not self.enabled:
803 self.log.debug('Not sending report until configured to do so')
804 self.event.wait(1800)
805 continue
806
807 now = int(time.time())
808 if not self.last_upload or (now - self.last_upload) > \
809 self.interval * 3600:
810 self.log.info('Compiling and sending report to %s',
811 self.url)
812
813 try:
814 self.last_report = self.compile_report()
815 except:
816 self.log.exception('Exception while compiling report:')
817
818 self.send(self.last_report)
819 else:
820 self.log.debug('Interval for sending new report has not expired')
821
822 sleep = 3600
823 self.log.debug('Sleeping for %d seconds', sleep)
824 self.event.wait(sleep)
825
826 def self_test(self):
827 self.compile_report()
828 return True
829
830 @staticmethod
831 def can_run():
832 return True, ''