]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/telemetry/module.py
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / pybind / mgr / telemetry / module.py
CommitLineData
11fdf7f2
TL
1"""
2Telemetry module for ceph-mgr
3
4Collect statistics from Ceph cluster and send this back to the Ceph project
5when user has opted-in
6"""
7import errno
eafe8130 8import hashlib
11fdf7f2 9import json
eafe8130 10import rbd
11fdf7f2
TL
11import re
12import requests
13import uuid
14import time
eafe8130 15from datetime import datetime, timedelta
11fdf7f2
TL
16from threading import Event
17from collections import defaultdict
18
19from mgr_module import MgrModule
20
21
eafe8130
TL
22ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
23
24LICENSE='sharing-1-0'
25LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
26LICENSE_URL='https://cdla.io/sharing-1-0/'
27
28# If the telemetry revision has changed since this point, re-require
29# an opt-in. This should happen each time we add new information to
30# the telemetry report.
31LAST_REVISION_RE_OPT_IN = 2
32
33# Latest revision of the telemetry report. Bump this each time we make
34# *any* change.
35REVISION = 3
36
37# History of revisions
38# --------------------
39#
40# Version 1:
41# Mimic and/or nautilus are lumped together here, since
42# we didn't track revisions yet.
43#
44# Version 2:
45# - added revision tracking, nagging, etc.
46# - added config option changes
47# - added channels
48# - added explicit license acknowledgement to the opt-in process
49#
50# Version 3:
51# - added device health metrics (i.e., SMART data, minus serial number)
52# - remove crush_rule
53# - added CephFS metadata (how many MDSs, fs features, how many data pools,
54# how much metadata is cached, rfiles, rbytes, rsnapshots)
55# - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56# - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57# - whether an OSD cluster network is in use
58# - rbd pool and image count, and rbd mirror mode (pool-level)
59# - rgw daemons, zones, zonegroups; which rgw frontends
60# - crush map stats
61
11fdf7f2
TL
62class Module(MgrModule):
63 config = dict()
64
65 metadata_keys = [
66 "arch",
67 "ceph_version",
68 "os",
69 "cpu",
70 "kernel_description",
71 "kernel_version",
72 "distro_description",
73 "distro"
74 ]
75
76 MODULE_OPTIONS = [
77 {
78 'name': 'url',
79 'type': 'str',
80 'default': 'https://telemetry.ceph.com/report'
81 },
eafe8130
TL
82 {
83 'name': 'device_url',
84 'type': 'str',
85 'default': 'https://telemetry.ceph.com/device'
86 },
11fdf7f2
TL
87 {
88 'name': 'enabled',
89 'type': 'bool',
90 'default': False
91 },
eafe8130
TL
92 {
93 'name': 'last_opt_revision',
94 'type': 'int',
95 'default': 1,
96 },
11fdf7f2
TL
97 {
98 'name': 'leaderboard',
99 'type': 'bool',
100 'default': False
101 },
102 {
103 'name': 'description',
104 'type': 'str',
105 'default': None
106 },
107 {
108 'name': 'contact',
109 'type': 'str',
110 'default': None
111 },
112 {
113 'name': 'organization',
114 'type': 'str',
115 'default': None
116 },
117 {
118 'name': 'proxy',
119 'type': 'str',
120 'default': None
121 },
122 {
123 'name': 'interval',
124 'type': 'int',
125 'default': 24,
126 'min': 8
eafe8130
TL
127 },
128 {
129 'name': 'channel_basic',
130 'type': 'bool',
131 'default': True,
132 'desc': 'Share basic cluster information (size, version)',
133 },
134 {
135 'name': 'channel_ident',
136 'type': 'bool',
137 'default': False,
138 'description': 'Share a user-provided description and/or contact email for the cluster',
139 },
140 {
141 'name': 'channel_crash',
142 'type': 'bool',
143 'default': True,
144 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
145 },
146 {
147 'name': 'channel_device',
148 'type': 'bool',
149 'default': True,
150 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
151 },
11fdf7f2
TL
152 ]
153
154 COMMANDS = [
155 {
156 "cmd": "telemetry status",
157 "desc": "Show current configuration",
158 "perm": "r"
159 },
160 {
eafe8130 161 "cmd": "telemetry send "
9f95a23c
TL
162 "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false "
163 "name=license,type=CephString,req=false",
11fdf7f2
TL
164 "desc": "Force sending data to Ceph telemetry",
165 "perm": "rw"
166 },
167 {
eafe8130
TL
168 "cmd": "telemetry show "
169 "name=channels,type=CephString,n=N,req=False",
11fdf7f2
TL
170 "desc": "Show last report or report to be sent",
171 "perm": "r"
172 },
92f5a8d4
TL
173 {
174 "cmd": "telemetry show-device",
175 "desc": "Show last device report or device report to be sent",
176 "perm": "r"
177 },
e306af50
TL
178 {
179 "cmd": "telemetry show-all",
180 "desc": "Show report of all channels",
181 "perm": "r"
182 },
11fdf7f2 183 {
eafe8130 184 "cmd": "telemetry on name=license,type=CephString,req=false",
11fdf7f2
TL
185 "desc": "Enable telemetry reports from this cluster",
186 "perm": "rw",
187 },
188 {
189 "cmd": "telemetry off",
190 "desc": "Disable telemetry reports from this cluster",
191 "perm": "rw",
192 },
193 ]
194
195 @property
196 def config_keys(self):
197 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
198
199 def __init__(self, *args, **kwargs):
200 super(Module, self).__init__(*args, **kwargs)
201 self.event = Event()
202 self.run = False
203 self.last_upload = None
204 self.last_report = dict()
205 self.report_id = None
eafe8130 206 self.salt = None
11fdf7f2
TL
207
208 def config_notify(self):
209 for opt in self.MODULE_OPTIONS:
210 setattr(self,
211 opt['name'],
212 self.get_module_option(opt['name']))
213 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
eafe8130
TL
214 # wake up serve() thread
215 self.event.set()
11fdf7f2 216
11fdf7f2
TL
217 def load(self):
218 self.last_upload = self.get_store('last_upload', None)
219 if self.last_upload is not None:
220 self.last_upload = int(self.last_upload)
221
222 self.report_id = self.get_store('report_id', None)
223 if self.report_id is None:
224 self.report_id = str(uuid.uuid4())
225 self.set_store('report_id', self.report_id)
226
eafe8130
TL
227 self.salt = self.get_store('salt', None)
228 if not self.salt:
229 self.salt = str(uuid.uuid4())
230 self.set_store('salt', self.salt)
231
11fdf7f2
TL
232 def gather_osd_metadata(self, osd_map):
233 keys = ["osd_objectstore", "rotational"]
234 keys += self.metadata_keys
235
236 metadata = dict()
237 for key in keys:
238 metadata[key] = defaultdict(int)
239
240 for osd in osd_map['osds']:
92f5a8d4
TL
241 res = self.get_metadata('osd', str(osd['osd'])).items()
242 if res is None:
243 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
244 continue
245 for k, v in res:
11fdf7f2
TL
246 if k not in keys:
247 continue
248
249 metadata[k][v] += 1
250
251 return metadata
252
253 def gather_mon_metadata(self, mon_map):
254 keys = list()
255 keys += self.metadata_keys
256
257 metadata = dict()
258 for key in keys:
259 metadata[key] = defaultdict(int)
260
261 for mon in mon_map['mons']:
92f5a8d4
TL
262 res = self.get_metadata('mon', mon['name']).items()
263 if res is None:
264 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
265 continue
266 for k, v in res:
11fdf7f2
TL
267 if k not in keys:
268 continue
269
270 metadata[k][v] += 1
271
272 return metadata
273
eafe8130
TL
274 def gather_crush_info(self):
275 osdmap = self.get_osdmap()
276 crush_raw = osdmap.get_crush()
277 crush = crush_raw.dump()
278
279 def inc(d, k):
280 if k in d:
281 d[k] += 1
282 else:
283 d[k] = 1
284
285 device_classes = {}
286 for dev in crush['devices']:
287 inc(device_classes, dev.get('class', ''))
288
289 bucket_algs = {}
290 bucket_types = {}
291 bucket_sizes = {}
292 for bucket in crush['buckets']:
293 if '~' in bucket['name']: # ignore shadow buckets
294 continue
295 inc(bucket_algs, bucket['alg'])
296 inc(bucket_types, bucket['type_id'])
297 inc(bucket_sizes, len(bucket['items']))
298
299 return {
300 'num_devices': len(crush['devices']),
301 'num_types': len(crush['types']),
302 'num_buckets': len(crush['buckets']),
303 'num_rules': len(crush['rules']),
304 'device_classes': list(device_classes.values()),
305 'tunables': crush['tunables'],
306 'compat_weight_set': '-1' in crush['choose_args'],
307 'num_weight_sets': len(crush['choose_args']),
308 'bucket_algs': bucket_algs,
309 'bucket_sizes': bucket_sizes,
310 'bucket_types': bucket_types,
311 }
312
313 def gather_configs(self):
314 # cluster config options
315 cluster = set()
316 r, outb, outs = self.mon_command({
317 'prefix': 'config dump',
318 'format': 'json'
319 });
320 if r != 0:
321 return {}
322 try:
323 dump = json.loads(outb)
324 except json.decoder.JSONDecodeError:
325 return {}
326 for opt in dump:
327 name = opt.get('name')
328 if name:
329 cluster.add(name)
330 # daemon-reported options (which may include ceph.conf)
331 active = set()
332 ls = self.get("modified_config_options");
333 for opt in ls.get('options', {}):
334 active.add(opt)
335 return {
336 'cluster_changed': sorted(list(cluster)),
337 'active_changed': sorted(list(active)),
338 }
339
11fdf7f2
TL
340 def gather_crashinfo(self):
341 crashlist = list()
eafe8130 342 errno, crashids, err = self.remote('crash', 'ls')
11fdf7f2
TL
343 if errno:
344 return ''
345 for crashid in crashids.split():
346 cmd = {'id': crashid}
347 errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
348 if errno:
349 continue
350 c = json.loads(crashinfo)
351 del c['utsname_hostname']
92f5a8d4
TL
352 # entity_name might have more than one '.', beware
353 (etype, eid) = c.get('entity_name', '').split('.', 1)
eafe8130
TL
354 m = hashlib.sha1()
355 m.update(self.salt.encode('utf-8'))
356 m.update(eid.encode('utf-8'))
357 m.update(self.salt.encode('utf-8'))
358 c['entity_name'] = etype + '.' + m.hexdigest()
11fdf7f2
TL
359 crashlist.append(c)
360 return crashlist
361
eafe8130
TL
362 def get_active_channels(self):
363 r = []
364 if self.channel_basic:
365 r.append('basic')
366 if self.channel_crash:
367 r.append('crash')
368 if self.channel_device:
369 r.append('device')
370 return r
371
372 def gather_device_report(self):
373 try:
374 time_format = self.remote('devicehealth', 'get_time_format')
375 except:
376 return None
377 cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
378 min_sample = cutoff.strftime(time_format)
379
380 devices = self.get('devices')['devices']
381
382 res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
383 for d in devices:
384 devid = d['devid']
385 try:
386 # this is a map of stamp -> {device info}
387 m = self.remote('devicehealth', 'get_recent_device_metrics',
388 devid, min_sample)
389 except:
390 continue
391
392 # anonymize host id
393 try:
394 host = d['location'][0]['host']
395 except:
396 continue
397 anon_host = self.get_store('host-id/%s' % host)
398 if not anon_host:
399 anon_host = str(uuid.uuid1())
400 self.set_store('host-id/%s' % host, anon_host)
f91f0fd5 401 serial = None
eafe8130
TL
402 for dev, rep in m.items():
403 rep['host_id'] = anon_host
f91f0fd5
TL
404 if serial is None and 'serial_number' in rep:
405 serial = rep['serial_number']
eafe8130
TL
406
407 # anonymize device id
eafe8130
TL
408 anon_devid = self.get_store('devid-id/%s' % devid)
409 if not anon_devid:
f91f0fd5
TL
410 # ideally devid is 'vendor_model_serial',
411 # but can also be 'model_serial', 'serial'
412 if '_' in devid:
413 anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
414 else:
415 anon_devid = str(uuid.uuid1())
eafe8130
TL
416 self.set_store('devid-id/%s' % devid, anon_devid)
417 self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
418 host, anon_host))
419
420 # anonymize the smartctl report itself
f91f0fd5
TL
421 if serial:
422 m_str = json.dumps(m)
423 m = json.loads(m_str.replace(serial, 'deleted'))
eafe8130
TL
424
425 if anon_host not in res:
426 res[anon_host] = {}
427 res[anon_host][anon_devid] = m
428 return res
429
430 def get_latest(self, daemon_type, daemon_name, stat):
431 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
432 #self.log.error("get_latest {0} data={1}".format(stat, data))
433 if data:
434 return data[-1][1]
435 else:
436 return 0
437
438 def compile_report(self, channels=[]):
439 if not channels:
440 channels = self.get_active_channels()
11fdf7f2
TL
441 report = {
442 'leaderboard': False,
443 'report_version': 1,
eafe8130
TL
444 'report_timestamp': datetime.utcnow().isoformat(),
445 'report_id': self.report_id,
446 'channels': channels,
447 'channels_available': ALL_CHANNELS,
448 'license': LICENSE,
11fdf7f2
TL
449 }
450
eafe8130
TL
451 if 'ident' in channels:
452 if self.leaderboard:
453 report['leaderboard'] = True
454 for option in ['description', 'contact', 'organization']:
455 report[option] = getattr(self, option)
456
457 if 'basic' in channels:
458 mon_map = self.get('mon_map')
459 osd_map = self.get('osd_map')
460 service_map = self.get('service_map')
461 fs_map = self.get('fs_map')
462 df = self.get('df')
463
9f95a23c 464 report['created'] = mon_map['created']
eafe8130
TL
465
466 # mons
467 v1_mons = 0
468 v2_mons = 0
469 ipv4_mons = 0
470 ipv6_mons = 0
471 for mon in mon_map['mons']:
472 for a in mon['public_addrs']['addrvec']:
473 if a['type'] == 'v2':
474 v2_mons += 1
475 elif a['type'] == 'v1':
476 v1_mons += 1
477 if a['addr'].startswith('['):
478 ipv6_mons += 1
479 else:
480 ipv4_mons += 1
481 report['mon'] = {
482 'count': len(mon_map['mons']),
483 'features': mon_map['features'],
484 'min_mon_release': mon_map['min_mon_release'],
485 'v1_addr_mons': v1_mons,
486 'v2_addr_mons': v2_mons,
487 'ipv4_addr_mons': ipv4_mons,
488 'ipv6_addr_mons': ipv6_mons,
489 }
490
491 report['config'] = self.gather_configs()
492
493 # pools
494 report['rbd'] = {
495 'num_pools': 0,
496 'num_images_by_pool': [],
497 'mirroring_by_pool': [],
498 }
499 num_pg = 0
500 report['pools'] = list()
501 for pool in osd_map['pools']:
502 num_pg += pool['pg_num']
503 ec_profile = {}
504 if pool['erasure_code_profile']:
505 orig = osd_map['erasure_code_profiles'].get(
506 pool['erasure_code_profile'], {})
507 ec_profile = {
508 k: orig[k] for k in orig.keys()
509 if k in ['k', 'm', 'plugin', 'technique',
510 'crush-failure-domain', 'l']
511 }
512 report['pools'].append(
513 {
514 'pool': pool['pool'],
515 'type': pool['type'],
516 'pg_num': pool['pg_num'],
517 'pgp_num': pool['pg_placement_num'],
518 'size': pool['size'],
519 'min_size': pool['min_size'],
520 'pg_autoscale_mode': pool['pg_autoscale_mode'],
521 'target_max_bytes': pool['target_max_bytes'],
522 'target_max_objects': pool['target_max_objects'],
523 'type': ['', 'replicated', '', 'erasure'][pool['type']],
524 'erasure_code_profile': ec_profile,
525 'cache_mode': pool['cache_mode'],
526 }
527 )
528 if 'rbd' in pool['application_metadata']:
529 report['rbd']['num_pools'] += 1
530 ioctx = self.rados.open_ioctx(pool['pool_name'])
531 report['rbd']['num_images_by_pool'].append(
532 sum(1 for _ in rbd.RBD().list2(ioctx)))
533 report['rbd']['mirroring_by_pool'].append(
534 rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
535
536 # osds
537 cluster_network = False
538 for osd in osd_map['osds']:
539 if osd['up'] and not cluster_network:
540 front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
92f5a8d4 541 back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
eafe8130
TL
542 if front_ip != back_ip:
543 cluster_network = True
544 report['osd'] = {
545 'count': len(osd_map['osds']),
546 'require_osd_release': osd_map['require_osd_release'],
547 'require_min_compat_client': osd_map['require_min_compat_client'],
548 'cluster_network': cluster_network,
549 }
550
551 # crush
552 report['crush'] = self.gather_crush_info()
553
554 # cephfs
555 report['fs'] = {
556 'count': len(fs_map['filesystems']),
557 'feature_flags': fs_map['feature_flags'],
558 'num_standby_mds': len(fs_map['standbys']),
559 'filesystems': [],
560 }
561 num_mds = len(fs_map['standbys'])
562 for fsm in fs_map['filesystems']:
563 fs = fsm['mdsmap']
564 num_sessions = 0
565 cached_ino = 0
566 cached_dn = 0
567 cached_cap = 0
568 subtrees = 0
569 rfiles = 0
570 rbytes = 0
571 rsnaps = 0
572 for gid, mds in fs['info'].items():
573 num_sessions += self.get_latest('mds', mds['name'],
574 'mds_sessions.session_count')
575 cached_ino += self.get_latest('mds', mds['name'],
576 'mds_mem.ino')
577 cached_dn += self.get_latest('mds', mds['name'],
578 'mds_mem.dn')
579 cached_cap += self.get_latest('mds', mds['name'],
580 'mds_mem.cap')
581 subtrees += self.get_latest('mds', mds['name'],
582 'mds.subtrees')
583 if mds['rank'] == 0:
584 rfiles = self.get_latest('mds', mds['name'],
585 'mds.root_rfiles')
586 rbytes = self.get_latest('mds', mds['name'],
587 'mds.root_rbytes')
588 rsnaps = self.get_latest('mds', mds['name'],
589 'mds.root_rsnaps')
590 report['fs']['filesystems'].append({
591 'max_mds': fs['max_mds'],
592 'ever_allowed_features': fs['ever_allowed_features'],
593 'explicitly_allowed_features': fs['explicitly_allowed_features'],
594 'num_in': len(fs['in']),
595 'num_up': len(fs['up']),
596 'num_standby_replay': len(
597 [mds for gid, mds in fs['info'].items()
598 if mds['state'] == 'up:standby-replay']),
599 'num_mds': len(fs['info']),
600 'num_sessions': num_sessions,
601 'cached_inos': cached_ino,
602 'cached_dns': cached_dn,
603 'cached_caps': cached_cap,
604 'cached_subtrees': subtrees,
605 'balancer_enabled': len(fs['balancer']) > 0,
606 'num_data_pools': len(fs['data_pools']),
607 'standby_count_wanted': fs['standby_count_wanted'],
608 'approx_ctime': fs['created'][0:7],
609 'files': rfiles,
610 'bytes': rbytes,
611 'snaps': rsnaps,
612 })
613 num_mds += len(fs['info'])
614 report['fs']['total_num_mds'] = num_mds
615
616 # daemons
617 report['metadata'] = dict()
618 report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
619 report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
620
621 # host counts
622 servers = self.list_servers()
623 self.log.debug('servers %s' % servers)
624 report['hosts'] = {
625 'num': len([h for h in servers if h['hostname']]),
626 }
627 for t in ['mon', 'mds', 'osd', 'mgr']:
628 report['hosts']['num_with_' + t] = len(
629 [h for h in servers
630 if len([s for s in h['services'] if s['type'] == t])]
631 )
632
633 report['usage'] = {
634 'pools': len(df['pools']),
92f5a8d4 635 'pg_num': num_pg,
eafe8130
TL
636 'total_used_bytes': df['stats']['total_used_bytes'],
637 'total_bytes': df['stats']['total_bytes'],
638 'total_avail_bytes': df['stats']['total_avail_bytes']
639 }
640
641 report['services'] = defaultdict(int)
642 for key, value in service_map['services'].items():
643 report['services'][key] += 1
644 if key == 'rgw':
645 report['rgw'] = {
646 'count': 0,
647 }
648 zones = set()
649 realms = set()
650 zonegroups = set()
651 frontends = set()
652 d = value.get('daemons', dict())
653
654 for k,v in d.items():
655 if k == 'summary' and v:
656 report['rgw'][k] = v
657 elif isinstance(v, dict) and 'metadata' in v:
658 report['rgw']['count'] += 1
659 zones.add(v['metadata']['zone_id'])
660 zonegroups.add(v['metadata']['zonegroup_id'])
661 frontends.add(v['metadata']['frontend_type#0'])
662
663 # we could actually iterate over all the keys of
664 # the dict and check for how many frontends there
665 # are, but it is unlikely that one would be running
666 # more than 2 supported ones
667 f2 = v['metadata'].get('frontend_type#1', None)
668 if f2:
669 frontends.add(f2)
670
671 report['rgw']['zones'] = len(zones)
672 report['rgw']['zonegroups'] = len(zonegroups)
673 report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
674
675 try:
676 report['balancer'] = self.remote('balancer', 'gather_telemetry')
677 except ImportError:
678 report['balancer'] = {
679 'active': False
11fdf7f2 680 }
11fdf7f2 681
eafe8130
TL
682 if 'crash' in channels:
683 report['crashes'] = self.gather_crashinfo()
11fdf7f2 684
eafe8130
TL
685 # NOTE: We do not include the 'device' channel in this report; it is
686 # sent to a different endpoint.
11fdf7f2
TL
687
688 return report
689
9f95a23c
TL
690 def _try_post(self, what, url, report):
691 self.log.info('Sending %s to: %s' % (what, url))
692 proxies = dict()
693 if self.proxy:
694 self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
695 proxies['http'] = self.proxy
696 proxies['https'] = self.proxy
697 try:
698 resp = requests.put(url=url, json=report, proxies=proxies)
699 resp.raise_for_status()
700 except Exception as e:
701 fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
702 self.log.error(fail_reason)
703 return fail_reason
704 return None
705
eafe8130
TL
706 def send(self, report, endpoint=None):
707 if not endpoint:
708 endpoint = ['ceph', 'device']
709 failed = []
710 success = []
eafe8130 711 self.log.debug('Send endpoints %s' % endpoint)
eafe8130
TL
712 for e in endpoint:
713 if e == 'ceph':
9f95a23c
TL
714 fail_reason = self._try_post('ceph report', self.url, report)
715 if fail_reason:
716 failed.append(fail_reason)
eafe8130
TL
717 else:
718 now = int(time.time())
719 self.last_upload = now
720 self.set_store('last_upload', str(now))
721 success.append('Ceph report sent to {0}'.format(self.url))
722 self.log.info('Sent report to {0}'.format(self.url))
723 elif e == 'device':
724 if 'device' in self.get_active_channels():
eafe8130
TL
725 devices = self.gather_device_report()
726 num_devs = 0
727 num_hosts = 0
728 for host, ls in devices.items():
729 self.log.debug('host %s devices %s' % (host, ls))
730 if not len(ls):
731 continue
9f95a23c
TL
732 fail_reason = self._try_post('devices', self.device_url,
733 ls)
734 if fail_reason:
735 failed.append(fail_reason)
eafe8130
TL
736 else:
737 num_devs += len(ls)
738 num_hosts += 1
739 if num_devs:
740 success.append('Reported %d devices across %d hosts' % (
741 num_devs, len(devices)))
742 if failed:
743 return 1, '', '\n'.join(success + failed)
744 return 0, '', '\n'.join(success)
11fdf7f2
TL
745
746 def handle_command(self, inbuf, command):
747 if command['prefix'] == 'telemetry status':
748 r = {}
749 for opt in self.MODULE_OPTIONS:
750 r[opt['name']] = getattr(self, opt['name'])
9f95a23c
TL
751 r['last_upload'] = time.ctime(self.last_upload) if self.last_upload else self.last_upload
752 return 0, json.dumps(r, indent=4, sort_keys=True), ''
11fdf7f2 753 elif command['prefix'] == 'telemetry on':
eafe8130
TL
754 if command.get('license') != LICENSE:
755 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
e306af50 756 self.on()
11fdf7f2
TL
757 return 0, '', ''
758 elif command['prefix'] == 'telemetry off':
e306af50 759 self.off()
11fdf7f2
TL
760 return 0, '', ''
761 elif command['prefix'] == 'telemetry send':
9f95a23c
TL
762 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and command.get('license') != LICENSE:
763 self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
764 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
11fdf7f2 765 self.last_report = self.compile_report()
eafe8130 766 return self.send(self.last_report, command.get('endpoint'))
11fdf7f2
TL
767
768 elif command['prefix'] == 'telemetry show':
e306af50 769 report = self.get_report(channels=command.get('channels', None))
9f95a23c 770 report = json.dumps(report, indent=4, sort_keys=True)
92f5a8d4 771 if self.channel_device:
e306af50 772 report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.'
92f5a8d4
TL
773 return 0, report, ''
774 elif command['prefix'] == 'telemetry show-device':
e306af50
TL
775 return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), ''
776 elif command['prefix'] == 'telemetry show-all':
777 return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), ''
11fdf7f2
TL
778 else:
779 return (-errno.EINVAL, '',
780 "Command not found '{0}'".format(command['prefix']))
781
e306af50
TL
782 def on(self):
783 self.set_module_option('enabled', True)
784 self.set_module_option('last_opt_revision', REVISION)
785
786 def off(self):
787 self.set_module_option('enabled', False)
788 self.set_module_option('last_opt_revision', 1)
789
790 def get_report(self, report_type='default', channels=None):
791 if report_type == 'default':
792 return self.compile_report(channels=channels)
793 elif report_type == 'device':
794 return self.gather_device_report()
795 elif report_type == 'all':
796 return {'report': self.compile_report(channels=channels),
797 'device_report': self.gather_device_report()}
798 return {}
799
11fdf7f2
TL
800 def self_test(self):
801 report = self.compile_report()
802 if len(report) == 0:
803 raise RuntimeError('Report is empty')
804
805 if 'report_id' not in report:
806 raise RuntimeError('report_id not found in report')
807
808 def shutdown(self):
809 self.run = False
810 self.event.set()
811
eafe8130
TL
812 def refresh_health_checks(self):
813 health_checks = {}
814 if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
815 health_checks['TELEMETRY_CHANGED'] = {
816 'severity': 'warning',
817 'summary': 'Telemetry requires re-opt-in',
818 'detail': [
819 'telemetry report includes new information; must re-opt-in (or out)'
820 ]
821 }
822 self.set_health_checks(health_checks)
823
11fdf7f2
TL
824 def serve(self):
825 self.load()
826 self.config_notify()
827 self.run = True
828
829 self.log.debug('Waiting for mgr to warm up')
830 self.event.wait(10)
831
832 while self.run:
eafe8130
TL
833 self.event.clear()
834
835 self.refresh_health_checks()
836
837 if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
838 self.log.debug('Not sending report until user re-opts-in')
839 self.event.wait(1800)
840 continue
11fdf7f2 841 if not self.enabled:
eafe8130 842 self.log.debug('Not sending report until configured to do so')
11fdf7f2
TL
843 self.event.wait(1800)
844 continue
845
846 now = int(time.time())
847 if not self.last_upload or (now - self.last_upload) > \
848 self.interval * 3600:
849 self.log.info('Compiling and sending report to %s',
850 self.url)
851
852 try:
853 self.last_report = self.compile_report()
854 except:
855 self.log.exception('Exception while compiling report:')
856
eafe8130 857 self.send(self.last_report)
11fdf7f2 858 else:
eafe8130 859 self.log.debug('Interval for sending new report has not expired')
11fdf7f2
TL
860
861 sleep = 3600
862 self.log.debug('Sleeping for %d seconds', sleep)
863 self.event.wait(sleep)
864
865 def self_test(self):
866 self.compile_report()
867 return True
868
869 @staticmethod
870 def can_run():
871 return True, ''