]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telegraf/module.py
8264fdf32573075afd02c5f0016e03b90cdb651e
7 from threading
import Event
9 from telegraf
.basesocket
import BaseSocket
10 from telegraf
.protocol
import Line
11 from mgr_module
import MgrModule
, PG_STATES
14 from urllib
.parse
import urlparse
16 from urlparse
import urlparse
19 class Module(MgrModule
):
22 "cmd": "telegraf config-set name=key,type=CephString "
23 "name=value,type=CephString",
24 "desc": "Set a configuration value",
28 "cmd": "telegraf config-show",
29 "desc": "Show current configuration",
33 "cmd": "telegraf send",
34 "desc": "Force sending data to Telegraf",
42 'default': 'unixgram:///tmp/telegraf.sock',
51 ceph_health_mapping
= {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
54 def config_keys(self
):
55 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
57 def __init__(self
, *args
, **kwargs
):
58 super(Module
, self
).__init
__(*args
, **kwargs
)
66 self
.fsid
= self
.get('mon_map')['fsid']
70 def get_pool_stats(self
):
88 for df_type
in df_types
:
89 for pool
in df
['pools']:
91 'measurement': 'ceph_pool_stats',
93 'pool_name': pool
['name'],
94 'pool_id': pool
['id'],
95 'type_instance': df_type
,
96 'fsid': self
.get_fsid()
98 'value': pool
['stats'][df_type
],
101 def get_daemon_stats(self
):
102 for daemon
, counters
in six
.iteritems(self
.get_all_perf_counters()):
103 svc_type
, svc_id
= daemon
.split('.', 1)
104 metadata
= self
.get_metadata(svc_type
, svc_id
)
108 for path
, counter_info
in counters
.items():
109 if counter_info
['type'] & self
.PERFCOUNTER_HISTOGRAM
:
113 'measurement': 'ceph_daemon_stats',
115 'ceph_daemon': daemon
,
116 'type_instance': path
,
117 'host': metadata
['hostname'],
118 'fsid': self
.get_fsid()
120 'value': counter_info
['value']
123 def get_pg_stats(self
):
126 pg_status
= self
.get('pg_status')
127 for key
in ['bytes_total', 'data_bytes', 'bytes_used', 'bytes_avail',
128 'num_pgs', 'num_objects', 'num_pools']:
129 stats
[key
] = pg_status
[key
]
131 for state
in PG_STATES
:
132 stats
['num_pgs_{0}'.format(state
)] = 0
134 stats
['num_pgs'] = pg_status
['num_pgs']
135 for state
in pg_status
['pgs_by_state']:
136 states
= state
['state_name'].split('+')
138 key
= 'num_pgs_{0}'.format(s
)
140 stats
[key
] += state
['count']
144 def get_cluster_stats(self
):
147 health
= json
.loads(self
.get('health')['json'])
148 stats
['health'] = self
.ceph_health_mapping
.get(health
['status'])
150 mon_status
= json
.loads(self
.get('mon_status')['json'])
151 stats
['num_mon'] = len(mon_status
['monmap']['mons'])
153 stats
['mon_election_epoch'] = mon_status
['election_epoch']
154 stats
['mon_outside_quorum'] = len(mon_status
['outside_quorum'])
155 stats
['mon_quorum'] = len(mon_status
['quorum'])
157 osd_map
= self
.get('osd_map')
158 stats
['num_osd'] = len(osd_map
['osds'])
159 stats
['num_pg_temp'] = len(osd_map
['pg_temp'])
160 stats
['osd_epoch'] = osd_map
['epoch']
162 mgr_map
= self
.get('mgr_map')
163 stats
['mgr_available'] = int(mgr_map
['available'])
164 stats
['num_mgr_standby'] = len(mgr_map
['standbys'])
165 stats
['mgr_epoch'] = mgr_map
['epoch']
169 for osd
in osd_map
['osds']:
176 stats
['num_osd_up'] = num_up
177 stats
['num_osd_in'] = num_in
179 fs_map
= self
.get('fs_map')
180 stats
['num_mds_standby'] = len(fs_map
['standbys'])
181 stats
['num_fs'] = len(fs_map
['filesystems'])
182 stats
['mds_epoch'] = fs_map
['epoch']
185 for fs
in fs_map
['filesystems']:
186 num_mds_up
+= len(fs
['mdsmap']['up'])
188 stats
['num_mds_up'] = num_mds_up
189 stats
['num_mds'] = num_mds_up
+ stats
['num_mds_standby']
191 stats
.update(self
.get_pg_stats())
193 for key
, value
in stats
.items():
195 'measurement': 'ceph_cluster_stats',
197 'type_instance': key
,
198 'fsid': self
.get_fsid()
203 def set_config_option(self
, option
, value
):
204 if option
not in self
.config_keys
.keys():
205 raise RuntimeError('{0} is a unknown configuration '
206 'option'.format(option
))
208 if option
in ['interval']:
211 except (ValueError, TypeError):
212 raise RuntimeError('invalid {0} configured. Please specify '
213 'a valid integer'.format(option
))
215 if option
== 'interval' and value
< 5:
216 raise RuntimeError('interval should be set to at least 5 seconds')
218 self
.config
[option
] = value
220 def init_module_config(self
):
221 self
.config
['address'] = \
222 self
.get_module_option("address", default
=self
.config_keys
['address'])
223 self
.config
['interval'] = \
224 int(self
.get_module_option("interval",
225 default
=self
.config_keys
['interval']))
228 return int(round(time
.time() * 1000000000))
230 def gather_measurements(self
):
231 return itertools
.chain(
232 self
.get_pool_stats(),
233 self
.get_daemon_stats(),
234 self
.get_cluster_stats()
237 def send_to_telegraf(self
):
238 url
= urlparse(self
.config
['address'])
240 sock
= BaseSocket(url
)
241 self
.log
.debug('Sending data to Telegraf at %s', sock
.address
)
245 for measurement
in self
.gather_measurements():
246 self
.log
.debug(measurement
)
247 line
= Line(measurement
['measurement'],
248 measurement
['value'],
249 measurement
['tags'], now
)
250 self
.log
.debug(line
.to_line_protocol())
251 s
.send(line
.to_line_protocol())
252 except (socket
.error
, RuntimeError, IOError, OSError):
253 self
.log
.exception('Failed to send statistics to Telegraf:')
254 except FileNotFoundError
:
255 self
.log
.exception('Failed to open Telegraf at: %s', url
.geturl())
258 self
.log
.info('Stopping Telegraf module')
262 def handle_command(self
, inbuf
, cmd
):
263 if cmd
['prefix'] == 'telegraf config-show':
264 return 0, json
.dumps(self
.config
), ''
265 elif cmd
['prefix'] == 'telegraf config-set':
269 return -errno
.EINVAL
, '', 'Value should not be empty or None'
271 self
.log
.debug('Setting configuration option %s to %s', key
, value
)
272 self
.set_config_option(key
, value
)
273 self
.set_module_option(key
, value
)
274 return 0, 'Configuration option {0} updated'.format(key
), ''
275 elif cmd
['prefix'] == 'telegraf send':
276 self
.send_to_telegraf()
277 return 0, 'Sending data to Telegraf', ''
279 return (-errno
.EINVAL
, '',
280 "Command not found '{0}'".format(cmd
['prefix']))
283 measurements
= list(self
.gather_measurements())
284 if len(measurements
) == 0:
285 raise RuntimeError('No measurements found')
288 self
.log
.info('Starting Telegraf module')
289 self
.init_module_config()
292 self
.log
.debug('Waiting 10 seconds before starting')
297 self
.send_to_telegraf()
298 runtime
= (self
.now() - start
) / 1000000
299 self
.log
.debug('Sending data to Telegraf took %d ms', runtime
)
300 self
.log
.debug("Sleeping for %d seconds", self
.config
['interval'])
301 self
.event
.wait(self
.config
['interval'])