]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telegraf/module.py
8264fdf32573075afd02c5f0016e03b90cdb651e
[ceph.git] / ceph / src / pybind / mgr / telegraf / module.py
1 import errno
2 import json
3 import itertools
4 import six
5 import socket
6 import time
7 from threading import Event
8
9 from telegraf.basesocket import BaseSocket
10 from telegraf.protocol import Line
11 from mgr_module import MgrModule, PG_STATES
12
13 try:
14 from urllib.parse import urlparse
15 except ImportError:
16 from urlparse import urlparse
17
18
19 class Module(MgrModule):
20 COMMANDS = [
21 {
22 "cmd": "telegraf config-set name=key,type=CephString "
23 "name=value,type=CephString",
24 "desc": "Set a configuration value",
25 "perm": "rw"
26 },
27 {
28 "cmd": "telegraf config-show",
29 "desc": "Show current configuration",
30 "perm": "r"
31 },
32 {
33 "cmd": "telegraf send",
34 "desc": "Force sending data to Telegraf",
35 "perm": "rw"
36 },
37 ]
38
39 MODULE_OPTIONS = [
40 {
41 'name': 'address',
42 'default': 'unixgram:///tmp/telegraf.sock',
43 },
44 {
45 'name': 'interval',
46 'type': 'secs',
47 'default': 15
48 }
49 ]
50
51 ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
52
53 @property
54 def config_keys(self):
55 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
56
57 def __init__(self, *args, **kwargs):
58 super(Module, self).__init__(*args, **kwargs)
59 self.event = Event()
60 self.run = True
61 self.fsid = None
62 self.config = dict()
63
64 def get_fsid(self):
65 if not self.fsid:
66 self.fsid = self.get('mon_map')['fsid']
67
68 return self.fsid
69
70 def get_pool_stats(self):
71 df = self.get('df')
72
73 df_types = [
74 'bytes_used',
75 'kb_used',
76 'dirty',
77 'rd',
78 'rd_bytes',
79 'stored_raw',
80 'wr',
81 'wr_bytes',
82 'objects',
83 'max_avail',
84 'quota_objects',
85 'quota_bytes'
86 ]
87
88 for df_type in df_types:
89 for pool in df['pools']:
90 yield {
91 'measurement': 'ceph_pool_stats',
92 'tags': {
93 'pool_name': pool['name'],
94 'pool_id': pool['id'],
95 'type_instance': df_type,
96 'fsid': self.get_fsid()
97 },
98 'value': pool['stats'][df_type],
99 }
100
101 def get_daemon_stats(self):
102 for daemon, counters in six.iteritems(self.get_all_perf_counters()):
103 svc_type, svc_id = daemon.split('.', 1)
104 metadata = self.get_metadata(svc_type, svc_id)
105 if not metadata:
106 continue
107
108 for path, counter_info in counters.items():
109 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
110 continue
111
112 yield {
113 'measurement': 'ceph_daemon_stats',
114 'tags': {
115 'ceph_daemon': daemon,
116 'type_instance': path,
117 'host': metadata['hostname'],
118 'fsid': self.get_fsid()
119 },
120 'value': counter_info['value']
121 }
122
123 def get_pg_stats(self):
124 stats = dict()
125
126 pg_status = self.get('pg_status')
127 for key in ['bytes_total', 'data_bytes', 'bytes_used', 'bytes_avail',
128 'num_pgs', 'num_objects', 'num_pools']:
129 stats[key] = pg_status[key]
130
131 for state in PG_STATES:
132 stats['num_pgs_{0}'.format(state)] = 0
133
134 stats['num_pgs'] = pg_status['num_pgs']
135 for state in pg_status['pgs_by_state']:
136 states = state['state_name'].split('+')
137 for s in PG_STATES:
138 key = 'num_pgs_{0}'.format(s)
139 if s in states:
140 stats[key] += state['count']
141
142 return stats
143
144 def get_cluster_stats(self):
145 stats = dict()
146
147 health = json.loads(self.get('health')['json'])
148 stats['health'] = self.ceph_health_mapping.get(health['status'])
149
150 mon_status = json.loads(self.get('mon_status')['json'])
151 stats['num_mon'] = len(mon_status['monmap']['mons'])
152
153 stats['mon_election_epoch'] = mon_status['election_epoch']
154 stats['mon_outside_quorum'] = len(mon_status['outside_quorum'])
155 stats['mon_quorum'] = len(mon_status['quorum'])
156
157 osd_map = self.get('osd_map')
158 stats['num_osd'] = len(osd_map['osds'])
159 stats['num_pg_temp'] = len(osd_map['pg_temp'])
160 stats['osd_epoch'] = osd_map['epoch']
161
162 mgr_map = self.get('mgr_map')
163 stats['mgr_available'] = int(mgr_map['available'])
164 stats['num_mgr_standby'] = len(mgr_map['standbys'])
165 stats['mgr_epoch'] = mgr_map['epoch']
166
167 num_up = 0
168 num_in = 0
169 for osd in osd_map['osds']:
170 if osd['up'] == 1:
171 num_up += 1
172
173 if osd['in'] == 1:
174 num_in += 1
175
176 stats['num_osd_up'] = num_up
177 stats['num_osd_in'] = num_in
178
179 fs_map = self.get('fs_map')
180 stats['num_mds_standby'] = len(fs_map['standbys'])
181 stats['num_fs'] = len(fs_map['filesystems'])
182 stats['mds_epoch'] = fs_map['epoch']
183
184 num_mds_up = 0
185 for fs in fs_map['filesystems']:
186 num_mds_up += len(fs['mdsmap']['up'])
187
188 stats['num_mds_up'] = num_mds_up
189 stats['num_mds'] = num_mds_up + stats['num_mds_standby']
190
191 stats.update(self.get_pg_stats())
192
193 for key, value in stats.items():
194 yield {
195 'measurement': 'ceph_cluster_stats',
196 'tags': {
197 'type_instance': key,
198 'fsid': self.get_fsid()
199 },
200 'value': int(value)
201 }
202
203 def set_config_option(self, option, value):
204 if option not in self.config_keys.keys():
205 raise RuntimeError('{0} is a unknown configuration '
206 'option'.format(option))
207
208 if option in ['interval']:
209 try:
210 value = int(value)
211 except (ValueError, TypeError):
212 raise RuntimeError('invalid {0} configured. Please specify '
213 'a valid integer'.format(option))
214
215 if option == 'interval' and value < 5:
216 raise RuntimeError('interval should be set to at least 5 seconds')
217
218 self.config[option] = value
219
220 def init_module_config(self):
221 self.config['address'] = \
222 self.get_module_option("address", default=self.config_keys['address'])
223 self.config['interval'] = \
224 int(self.get_module_option("interval",
225 default=self.config_keys['interval']))
226
227 def now(self):
228 return int(round(time.time() * 1000000000))
229
230 def gather_measurements(self):
231 return itertools.chain(
232 self.get_pool_stats(),
233 self.get_daemon_stats(),
234 self.get_cluster_stats()
235 )
236
237 def send_to_telegraf(self):
238 url = urlparse(self.config['address'])
239
240 sock = BaseSocket(url)
241 self.log.debug('Sending data to Telegraf at %s', sock.address)
242 now = self.now()
243 try:
244 with sock as s:
245 for measurement in self.gather_measurements():
246 self.log.debug(measurement)
247 line = Line(measurement['measurement'],
248 measurement['value'],
249 measurement['tags'], now)
250 self.log.debug(line.to_line_protocol())
251 s.send(line.to_line_protocol())
252 except (socket.error, RuntimeError, IOError, OSError):
253 self.log.exception('Failed to send statistics to Telegraf:')
254 except FileNotFoundError:
255 self.log.exception('Failed to open Telegraf at: %s', url.geturl())
256
257 def shutdown(self):
258 self.log.info('Stopping Telegraf module')
259 self.run = False
260 self.event.set()
261
262 def handle_command(self, inbuf, cmd):
263 if cmd['prefix'] == 'telegraf config-show':
264 return 0, json.dumps(self.config), ''
265 elif cmd['prefix'] == 'telegraf config-set':
266 key = cmd['key']
267 value = cmd['value']
268 if not value:
269 return -errno.EINVAL, '', 'Value should not be empty or None'
270
271 self.log.debug('Setting configuration option %s to %s', key, value)
272 self.set_config_option(key, value)
273 self.set_module_option(key, value)
274 return 0, 'Configuration option {0} updated'.format(key), ''
275 elif cmd['prefix'] == 'telegraf send':
276 self.send_to_telegraf()
277 return 0, 'Sending data to Telegraf', ''
278
279 return (-errno.EINVAL, '',
280 "Command not found '{0}'".format(cmd['prefix']))
281
282 def self_test(self):
283 measurements = list(self.gather_measurements())
284 if len(measurements) == 0:
285 raise RuntimeError('No measurements found')
286
287 def serve(self):
288 self.log.info('Starting Telegraf module')
289 self.init_module_config()
290 self.run = True
291
292 self.log.debug('Waiting 10 seconds before starting')
293 self.event.wait(10)
294
295 while self.run:
296 start = self.now()
297 self.send_to_telegraf()
298 runtime = (self.now() - start) / 1000000
299 self.log.debug('Sending data to Telegraf took %d ms', runtime)
300 self.log.debug("Sleeping for %d seconds", self.config['interval'])
301 self.event.wait(self.config['interval'])