]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telemetry/module.py
2 Telemetry module for ceph-mgr
4 Collect statistics from Ceph cluster and send this back to the Ceph project
15 from datetime
import datetime
, timedelta
16 from threading
import Event
17 from collections
import defaultdict
19 from mgr_module
import MgrModule
22 ALL_CHANNELS
= [ 'basic' , 'ident' , 'crash' , 'device' ]
25 LICENSE_NAME
= 'Community Data License Agreement - Sharing - Version 1.0'
26 LICENSE_URL
= 'https://cdla.io/sharing-1-0/'
28 # If the telemetry revision has changed since this point, re-require
29 # an opt-in. This should happen each time we add new information to
30 # the telemetry report.
31 LAST_REVISION_RE_OPT_IN
= 2
33 # Latest revision of the telemetry report. Bump this each time we make
37 # History of revisions
38 # --------------------
41 # Mimic and/or nautilus are lumped together here, since
42 # we didn't track revisions yet.
45 # - added revision tracking, nagging, etc.
46 # - added config option changes
48 # - added explicit license acknowledgement to the opt-in process
51 # - added device health metrics (i.e., SMART data, minus serial number)
53 # - added CephFS metadata (how many MDSs, fs features, how many data pools,
54 # how much metadata is cached, rfiles, rbytes, rsnapshots)
55 # - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
56 # - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
57 # - whether an OSD cluster network is in use
58 # - rbd pool and image count, and rbd mirror mode (pool-level)
59 # - rgw daemons, zones, zonegroups; which rgw frontends
62 class Module ( MgrModule
):
80 'default' : 'https://telemetry.ceph.com/report'
85 'default' : 'https://telemetry.ceph.com/device'
93 'name' : 'last_opt_revision' ,
98 'name' : 'leaderboard' ,
103 'name' : 'description' ,
113 'name' : 'organization' ,
129 'name' : 'channel_basic' ,
132 'desc' : 'Share basic cluster information (size, version)' ,
135 'name' : 'channel_ident' ,
138 'description' : 'Share a user-provided description and/or contact email for the cluster' ,
141 'name' : 'channel_crash' ,
144 'description' : 'Share metadata about Ceph daemon crashes (version, stack straces, etc)' ,
147 'name' : 'channel_device' ,
150 'description' : 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)' ,
156 "cmd" : "telemetry status" ,
157 "desc" : "Show current configuration" ,
161 "cmd" : "telemetry send "
162 "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false" ,
163 "desc" : "Force sending data to Ceph telemetry" ,
167 "cmd" : "telemetry show "
168 "name=channels,type=CephString,n=N,req=False" ,
169 "desc" : "Show last report or report to be sent" ,
173 "cmd" : "telemetry on name=license,type=CephString,req=false" ,
174 "desc" : "Enable telemetry reports from this cluster" ,
178 "cmd" : "telemetry off" ,
179 "desc" : "Disable telemetry reports from this cluster" ,
185 def config_keys ( self
):
186 return dict (( o
[ 'name' ], o
. get ( 'default' , None )) for o
in self
. MODULE_OPTIONS
)
188 def __init__ ( self
, * args
, ** kwargs
):
189 super ( Module
, self
) .__ init
__ (* args
, ** kwargs
)
192 self
. last_upload
= None
193 self
. last_report
= dict ()
194 self
. report_id
= None
197 def config_notify ( self
):
198 for opt
in self
. MODULE_OPTIONS
:
201 self
. get_module_option ( opt
[ 'name' ]))
202 self
. log
. debug ( ' %s = %s ' , opt
[ 'name' ], getattr ( self
, opt
[ 'name' ]))
203 # wake up serve() thread
207 def parse_timestamp ( timestamp
):
208 return datetime
. strptime ( timestamp
, '%Y-%m- %d %H:%M:%S. %f ' )
211 self
. last_upload
= self
. get_store ( 'last_upload' , None )
212 if self
. last_upload
is not None :
213 self
. last_upload
= int ( self
. last_upload
)
215 self
. report_id
= self
. get_store ( 'report_id' , None )
216 if self
. report_id
is None :
217 self
. report_id
= str ( uuid
. uuid4 ())
218 self
. set_store ( 'report_id' , self
. report_id
)
220 self
. salt
= self
. get_store ( 'salt' , None )
222 self
. salt
= str ( uuid
. uuid4 ())
223 self
. set_store ( 'salt' , self
. salt
)
225 def gather_osd_metadata ( self
, osd_map
):
226 keys
= [ "osd_objectstore" , "rotational" ]
227 keys
+= self
. metadata_keys
231 metadata
[ key
] = defaultdict ( int )
233 for osd
in osd_map
[ 'osds' ]:
234 for k
, v
in self
. get_metadata ( 'osd' , str ( osd
[ 'osd' ])). items ():
242 def gather_mon_metadata ( self
, mon_map
):
244 keys
+= self
. metadata_keys
248 metadata
[ key
] = defaultdict ( int )
250 for mon
in mon_map
[ 'mons' ]:
251 for k
, v
in self
. get_metadata ( 'mon' , mon
[ 'name' ]). items ():
259 def gather_crush_info ( self
):
260 osdmap
= self
. get_osdmap ()
261 crush_raw
= osdmap
. get_crush ()
262 crush
= crush_raw
. dump ()
271 for dev
in crush
[ 'devices' ]:
272 inc ( device_classes
, dev
. get ( 'class' , '' ))
277 for bucket
in crush
[ 'buckets' ]:
278 if '~' in bucket
[ 'name' ]: # ignore shadow buckets
280 inc ( bucket_algs
, bucket
[ 'alg' ])
281 inc ( bucket_types
, bucket
[ 'type_id' ])
282 inc ( bucket_sizes
, len ( bucket
[ 'items' ]))
285 'num_devices' : len ( crush
[ 'devices' ]),
286 'num_types' : len ( crush
[ 'types' ]),
287 'num_buckets' : len ( crush
[ 'buckets' ]),
288 'num_rules' : len ( crush
[ 'rules' ]),
289 'device_classes' : list ( device_classes
. values ()),
290 'tunables' : crush
[ 'tunables' ],
291 'compat_weight_set' : '-1' in crush
[ 'choose_args' ],
292 'num_weight_sets' : len ( crush
[ 'choose_args' ]),
293 'bucket_algs' : bucket_algs
,
294 'bucket_sizes' : bucket_sizes
,
295 'bucket_types' : bucket_types
,
298 def gather_configs ( self
):
299 # cluster config options
301 r
, outb
, outs
= self
. mon_command ({
302 'prefix' : 'config dump' ,
308 dump
= json
. loads ( outb
)
309 except json
. decoder
. JSONDecodeError
:
312 name
= opt
. get ( 'name' )
315 # daemon-reported options (which may include ceph.conf)
317 ls
= self
. get ( "modified_config_options" );
318 for opt
in ls
. get ( 'options' , {}):
321 'cluster_changed' : sorted ( list ( cluster
)),
322 'active_changed' : sorted ( list ( active
)),
325 def gather_crashinfo ( self
):
327 errno
, crashids
, err
= self
. remote ( 'crash' , 'ls' )
330 for crashid
in crashids
. split ():
331 cmd
= { 'id' : crashid
}
332 errno
, crashinfo
, err
= self
. remote ( 'crash' , 'do_info' , cmd
, '' )
335 c
= json
. loads ( crashinfo
)
336 del c
[ 'utsname_hostname' ]
337 ( etype
, eid
) = c
. get ( 'entity_name' , '' ). split ( '.' )
339 m
. update ( self
. salt
. encode ( 'utf-8' ))
340 m
. update ( eid
. encode ( 'utf-8' ))
341 m
. update ( self
. salt
. encode ( 'utf-8' ))
342 c
[ 'entity_name' ] = etype
+ '.' + m
. hexdigest ()
346 def get_active_channels ( self
):
348 if self
. channel_basic
:
350 if self
. channel_crash
:
352 if self
. channel_device
:
356 def gather_device_report ( self
):
358 time_format
= self
. remote ( 'devicehealth' , 'get_time_format' )
361 cutoff
= datetime
. utcnow () - timedelta ( hours
= self
. interval
* 2 )
362 min_sample
= cutoff
. strftime ( time_format
)
364 devices
= self
. get ( 'devices' )[ 'devices' ]
366 res
= {} # anon-host-id -> anon-devid -> { timestamp -> record }
370 # this is a map of stamp -> {device info}
371 m
= self
. remote ( 'devicehealth' , 'get_recent_device_metrics' ,
378 host
= d
[ 'location' ][ 0 ][ 'host' ]
381 anon_host
= self
. get_store ( 'host-id/ %s ' % host
)
383 anon_host
= str ( uuid
. uuid1 ())
384 self
. set_store ( 'host-id/ %s ' % host
, anon_host
)
385 for dev
, rep
in m
. items ():
386 rep
[ 'host_id' ] = anon_host
388 # anonymize device id
389 ( vendor
, model
, serial
) = devid
. split ( '_' )
390 anon_devid
= self
. get_store ( 'devid-id/ %s ' % devid
)
392 anon_devid
= ' %s _ %s _ %s ' % ( vendor
, model
, uuid
. uuid1 ())
393 self
. set_store ( 'devid-id/ %s ' % devid
, anon_devid
)
394 self
. log
. info ( 'devid %s / %s , host %s / %s ' % ( devid
, anon_devid
,
397 # anonymize the smartctl report itself
398 for k
in [ 'serial_number' ]:
402 if anon_host
not in res
:
404 res
[ anon_host
][ anon_devid
] = m
407 def get_latest ( self
, daemon_type
, daemon_name
, stat
):
408 data
= self
. get_counter ( daemon_type
, daemon_name
, stat
)[ stat
]
409 #self.log.error("get_latest {0} data={1}".format(stat, data))
415 def compile_report ( self
, channels
=[]):
417 channels
= self
. get_active_channels ()
419 'leaderboard' : False ,
421 'report_timestamp' : datetime
. utcnow (). isoformat (),
422 'report_id' : self
. report_id
,
423 'channels' : channels
,
424 'channels_available' : ALL_CHANNELS
,
428 if 'ident' in channels
:
430 report
[ 'leaderboard' ] = True
431 for option
in [ 'description' , 'contact' , 'organization' ]:
432 report
[ option
] = getattr ( self
, option
)
434 if 'basic' in channels
:
435 mon_map
= self
. get ( 'mon_map' )
436 osd_map
= self
. get ( 'osd_map' )
437 service_map
= self
. get ( 'service_map' )
438 fs_map
= self
. get ( 'fs_map' )
441 report
[ 'created' ] = self
. parse_timestamp ( mon_map
[ 'created' ]). isoformat ()
448 for mon
in mon_map
[ 'mons' ]:
449 for a
in mon
[ 'public_addrs' ][ 'addrvec' ]:
450 if a
[ 'type' ] == 'v2' :
452 elif a
[ 'type' ] == 'v1' :
454 if a
[ 'addr' ]. startswith ( '[' ):
459 'count' : len ( mon_map
[ 'mons' ]),
460 'features' : mon_map
[ 'features' ],
461 'min_mon_release' : mon_map
[ 'min_mon_release' ],
462 'v1_addr_mons' : v1_mons
,
463 'v2_addr_mons' : v2_mons
,
464 'ipv4_addr_mons' : ipv4_mons
,
465 'ipv6_addr_mons' : ipv6_mons
,
468 report
[ 'config' ] = self
. gather_configs ()
473 'num_images_by_pool' : [],
474 'mirroring_by_pool' : [],
477 report
[ 'pools' ] = list ()
478 for pool
in osd_map
[ 'pools' ]:
479 num_pg
+= pool
[ 'pg_num' ]
481 if pool
[ 'erasure_code_profile' ]:
482 orig
= osd_map
[ 'erasure_code_profiles' ]. get (
483 pool
[ 'erasure_code_profile' ], {})
485 k
: orig
[ k
] for k
in orig
. keys ()
486 if k
in [ 'k' , 'm' , 'plugin' , 'technique' ,
487 'crush-failure-domain' , 'l' ]
489 report
[ 'pools' ]. append (
491 'pool' : pool
[ 'pool' ],
492 'type' : pool
[ 'type' ],
493 'pg_num' : pool
[ 'pg_num' ],
494 'pgp_num' : pool
[ 'pg_placement_num' ],
495 'size' : pool
[ 'size' ],
496 'min_size' : pool
[ 'min_size' ],
497 'pg_autoscale_mode' : pool
[ 'pg_autoscale_mode' ],
498 'target_max_bytes' : pool
[ 'target_max_bytes' ],
499 'target_max_objects' : pool
[ 'target_max_objects' ],
500 'type' : [ '' , 'replicated' , '' , 'erasure' ][ pool
[ 'type' ]],
501 'erasure_code_profile' : ec_profile
,
502 'cache_mode' : pool
[ 'cache_mode' ],
505 if 'rbd' in pool
[ 'application_metadata' ]:
506 report
[ 'rbd' ][ 'num_pools' ] += 1
507 ioctx
= self
. rados
. open_ioctx ( pool
[ 'pool_name' ])
508 report
[ 'rbd' ][ 'num_images_by_pool' ]. append (
509 sum ( 1 for _
in rbd
. RBD (). list2 ( ioctx
)))
510 report
[ 'rbd' ][ 'mirroring_by_pool' ]. append (
511 rbd
. RBD (). mirror_mode_get ( ioctx
) != rbd
. RBD_MIRROR_MODE_DISABLED
)
514 cluster_network
= False
515 for osd
in osd_map
[ 'osds' ]:
516 if osd
[ 'up' ] and not cluster_network
:
517 front_ip
= osd
[ 'public_addrs' ][ 'addrvec' ][ 0 ][ 'addr' ]. split ( ':' )[ 0 ]
518 back_ip
= osd
[ 'public_addrs' ][ 'addrvec' ][ 0 ][ 'addr' ]. split ( ':' )[ 0 ]
519 if front_ip
!= back_ip
:
520 cluster_network
= True
522 'count' : len ( osd_map
[ 'osds' ]),
523 'require_osd_release' : osd_map
[ 'require_osd_release' ],
524 'require_min_compat_client' : osd_map
[ 'require_min_compat_client' ],
525 'cluster_network' : cluster_network
,
529 report
[ 'crush' ] = self
. gather_crush_info ()
533 'count' : len ( fs_map
[ 'filesystems' ]),
534 'feature_flags' : fs_map
[ 'feature_flags' ],
535 'num_standby_mds' : len ( fs_map
[ 'standbys' ]),
538 num_mds
= len ( fs_map
[ 'standbys' ])
539 for fsm
in fs_map
[ 'filesystems' ]:
549 for gid
, mds
in fs
[ 'info' ]. items ():
550 num_sessions
+= self
. get_latest ( 'mds' , mds
[ 'name' ],
551 'mds_sessions.session_count' )
552 cached_ino
+= self
. get_latest ( 'mds' , mds
[ 'name' ],
554 cached_dn
+= self
. get_latest ( 'mds' , mds
[ 'name' ],
556 cached_cap
+= self
. get_latest ( 'mds' , mds
[ 'name' ],
558 subtrees
+= self
. get_latest ( 'mds' , mds
[ 'name' ],
561 rfiles
= self
. get_latest ( 'mds' , mds
[ 'name' ],
563 rbytes
= self
. get_latest ( 'mds' , mds
[ 'name' ],
565 rsnaps
= self
. get_latest ( 'mds' , mds
[ 'name' ],
567 report
[ 'fs' ][ 'filesystems' ]. append ({
568 'max_mds' : fs
[ 'max_mds' ],
569 'ever_allowed_features' : fs
[ 'ever_allowed_features' ],
570 'explicitly_allowed_features' : fs
[ 'explicitly_allowed_features' ],
571 'num_in' : len ( fs
[ 'in' ]),
572 'num_up' : len ( fs
[ 'up' ]),
573 'num_standby_replay' : len (
574 [ mds
for gid
, mds
in fs
[ 'info' ]. items ()
575 if mds
[ 'state' ] == 'up:standby-replay' ]),
576 'num_mds' : len ( fs
[ 'info' ]),
577 'num_sessions' : num_sessions
,
578 'cached_inos' : cached_ino
,
579 'cached_dns' : cached_dn
,
580 'cached_caps' : cached_cap
,
581 'cached_subtrees' : subtrees
,
582 'balancer_enabled' : len ( fs
[ 'balancer' ]) > 0 ,
583 'num_data_pools' : len ( fs
[ 'data_pools' ]),
584 'standby_count_wanted' : fs
[ 'standby_count_wanted' ],
585 'approx_ctime' : fs
[ 'created' ][ 0 : 7 ],
590 num_mds
+= len ( fs
[ 'info' ])
591 report
[ 'fs' ][ 'total_num_mds' ] = num_mds
594 report
[ 'metadata' ] = dict ()
595 report
[ 'metadata' ][ 'osd' ] = self
. gather_osd_metadata ( osd_map
)
596 report
[ 'metadata' ][ 'mon' ] = self
. gather_mon_metadata ( mon_map
)
599 servers
= self
. list_servers ()
600 self
. log
. debug ( 'servers %s ' % servers
)
602 'num' : len ([ h
for h
in servers
if h
[ 'hostname' ]]),
604 for t
in [ 'mon' , 'mds' , 'osd' , 'mgr' ]:
605 report
[ 'hosts' ][ 'num_with_' + t
] = len (
607 if len ([ s
for s
in h
[ 'services' ] if s
[ 'type' ] == t
])]
611 'pools' : len ( df
[ 'pools' ]),
613 'total_used_bytes' : df
[ 'stats' ][ 'total_used_bytes' ],
614 'total_bytes' : df
[ 'stats' ][ 'total_bytes' ],
615 'total_avail_bytes' : df
[ 'stats' ][ 'total_avail_bytes' ]
618 report
[ 'services' ] = defaultdict ( int )
619 for key
, value
in service_map
[ 'services' ]. items ():
620 report
[ 'services' ][ key
] += 1
629 d
= value
. get ( 'daemons' , dict ())
631 for k
, v
in d
. items ():
632 if k
== 'summary' and v
:
634 elif isinstance ( v
, dict ) and 'metadata' in v
:
635 report
[ 'rgw' ][ 'count' ] += 1
636 zones
. add ( v
[ 'metadata' ][ 'zone_id' ])
637 zonegroups
. add ( v
[ 'metadata' ][ 'zonegroup_id' ])
638 frontends
. add ( v
[ 'metadata' ][ 'frontend_type#0' ])
640 # we could actually iterate over all the keys of
641 # the dict and check for how many frontends there
642 # are, but it is unlikely that one would be running
643 # more than 2 supported ones
644 f2
= v
[ 'metadata' ]. get ( 'frontend_type#1' , None )
648 report
[ 'rgw' ][ 'zones' ] = len ( zones
)
649 report
[ 'rgw' ][ 'zonegroups' ] = len ( zonegroups
)
650 report
[ 'rgw' ][ 'frontends' ] = list ( frontends
) # sets aren't json-serializable
653 report
[ 'balancer' ] = self
. remote ( 'balancer' , 'gather_telemetry' )
655 report
[ 'balancer' ] = {
659 if 'crash' in channels
:
660 report
[ 'crashes' ] = self
. gather_crashinfo ()
662 # NOTE: We do not include the 'device' channel in this report; it is
663 # sent to a different endpoint.
667 def send ( self
, report
, endpoint
= None ):
669 endpoint
= [ 'ceph' , 'device' ]
673 self
. log
. debug ( 'Send endpoints %s ' % endpoint
)
675 self
. log
. info ( 'Send using HTTP(S) proxy: %s ' , self
. proxy
)
676 proxies
[ 'http' ] = self
. proxy
677 proxies
[ 'https' ] = self
. proxy
680 self
. log
. info ( 'Sending ceph report to: %s ' , self
. url
)
681 resp
= requests
. put ( url
= self
. url
, json
= report
, proxies
= proxies
)
683 self
. log
. error ( "Report send failed: %d %s %s " %
684 ( resp
. status_code
, resp
. reason
, resp
. text
))
685 failed
. append ( 'Failed to send report to %s : %d %s %s ' % (
692 now
= int ( time
. time ())
693 self
. last_upload
= now
694 self
. set_store ( 'last_upload' , str ( now
))
695 success
. append ( 'Ceph report sent to {0} ' . format ( self
. url
))
696 self
. log
. info ( 'Sent report to {0} ' . format ( self
. url
))
698 if 'device' in self
. get_active_channels ():
700 self
. log
. info ( 'Sending device report to: %s ' ,
702 devices
= self
. gather_device_report ()
705 for host
, ls
in devices
. items ():
706 self
. log
. debug ( 'host %s devices %s ' % ( host
, ls
))
709 resp
= requests
. put ( url
= self
. device_url
, json
= ls
,
713 "Device report failed: %d %s %s " %
714 ( resp
. status_code
, resp
. reason
, resp
. text
))
716 'Failed to send devices to %s : %d %s %s ' % (
726 success
. append ( 'Reported %d devices across %d hosts' % (
727 num_devs
, len ( devices
)))
729 return 1 , '' , ' \n ' . join ( success
+ failed
)
730 return 0 , '' , ' \n ' . join ( success
)
732 def handle_command ( self
, inbuf
, command
):
733 if command
[ 'prefix' ] == 'telemetry status' :
735 for opt
in self
. MODULE_OPTIONS
:
736 r
[ opt
[ 'name' ]] = getattr ( self
, opt
[ 'name' ])
737 return 0 , json
. dumps ( r
, indent
= 4 ), ''
738 elif command
[ 'prefix' ] == 'telemetry on' :
739 if command
. get ( 'license' ) != LICENSE
:
740 return - errno
. EPERM
, '' , "Telemetry data is licensed under the " + LICENSE_NAME
+ " (" + LICENSE_URL
+ "). \n To enable, add '--license " + LICENSE
+ "' to the 'ceph telemetry on' command."
741 self
. set_module_option ( 'enabled' , True )
742 self
. set_module_option ( 'last_opt_revision' , REVISION
)
744 elif command
[ 'prefix' ] == 'telemetry off' :
745 self
. set_module_option ( 'enabled' , False )
746 self
. set_module_option ( 'last_opt_revision' , REVISION
)
748 elif command
[ 'prefix' ] == 'telemetry send' :
749 self
. last_report
= self
. compile_report ()
750 return self
. send ( self
. last_report
, command
. get ( 'endpoint' ))
752 elif command
[ 'prefix' ] == 'telemetry show' :
753 report
= self
. compile_report (
754 channels
= command
. get ( 'channels' , None )
756 return 0 , json
. dumps ( report
, indent
= 4 ), ''
758 return (- errno
. EINVAL
, '' ,
759 "Command not found ' {0} '" . format ( command
[ 'prefix' ]))
762 report
= self
. compile_report ()
764 raise RuntimeError ( 'Report is empty' )
766 if 'report_id' not in report
:
767 raise RuntimeError ( 'report_id not found in report' )
773 def refresh_health_checks ( self
):
775 if self
. enabled
and self
. last_opt_revision
< LAST_REVISION_RE_OPT_IN
:
776 health_checks
[ 'TELEMETRY_CHANGED' ] = {
777 'severity' : 'warning' ,
778 'summary' : 'Telemetry requires re-opt-in' ,
780 'telemetry report includes new information; must re-opt-in (or out)'
783 self
. set_health_checks ( health_checks
)
790 self
. log
. debug ( 'Waiting for mgr to warm up' )
796 self
. refresh_health_checks ()
798 if self
. last_opt_revision
< LAST_REVISION_RE_OPT_IN
:
799 self
. log
. debug ( 'Not sending report until user re-opts-in' )
800 self
. event
. wait ( 1800 )
803 self
. log
. debug ( 'Not sending report until configured to do so' )
804 self
. event
. wait ( 1800 )
807 now
= int ( time
. time ())
808 if not self
. last_upload
or ( now
- self
. last_upload
) > \
809 self
. interval
* 3600 :
810 self
. log
. info ( 'Compiling and sending report to %s ' ,
814 self
. last_report
= self
. compile_report ()
816 self
. log
. exception ( 'Exception while compiling report:' )
818 self
. send ( self
. last_report
)
820 self
. log
. debug ( 'Interval for sending new report has not expired' )
823 self
. log
. debug ( 'Sleeping for %d seconds' , sleep
)
824 self
. event
. wait ( sleep
)
827 self
. compile_report ()