]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/zabbix/module.py
d427038fa3107d2cbedf98c28798f4203196f696
[ceph.git] / ceph / src / pybind / mgr / zabbix / module.py
1 """
2 Zabbix module for ceph-mgr
3
4 Collect statistics from Ceph cluster and every X seconds send data to a Zabbix
5 server using the zabbix_sender executable.
6 """
7 import json
8 import errno
9 import re
10 from subprocess import Popen, PIPE
11 from threading import Event
12 from mgr_module import MgrModule
13
14
15 def avg(data):
16 if len(data):
17 return sum(data) / float(len(data))
18 else:
19 return 0
20
21
22 class ZabbixSender(object):
23 def __init__(self, sender, host, port, log):
24 self.sender = sender
25 self.host = host
26 self.port = port
27 self.log = log
28
29 def send(self, hostname, data):
30 if len(data) == 0:
31 return
32
33 cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s',
34 hostname, '-vv', '-i', '-']
35
36 self.log.debug('Executing: %s', cmd)
37
38 proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
39
40 for key, value in data.items():
41 proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value).encode('utf-8'))
42
43 stdout, stderr = proc.communicate()
44 if proc.returncode != 0:
45 raise RuntimeError('%s exited non-zero: %s' % (self.sender,
46 stderr))
47
48 self.log.debug('Zabbix Sender: %s', stdout.rstrip())
49
50
51 class Module(MgrModule):
52 run = False
53 config = dict()
54 ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
55 _zabbix_hosts = list()
56
57 @property
58 def config_keys(self):
59 return dict((o['name'], o.get('default', None))
60 for o in self.MODULE_OPTIONS)
61
62 MODULE_OPTIONS = [
63 {
64 'name': 'zabbix_sender',
65 'default': '/usr/bin/zabbix_sender'
66 },
67 {
68 'name': 'zabbix_host',
69 'default': None
70 },
71 {
72 'name': 'zabbix_port',
73 'type': 'int',
74 'default': 10051
75 },
76 {
77 'name': 'identifier',
78 'default': ""
79 },
80 {
81 'name': 'interval',
82 'type': 'secs',
83 'default': 60
84 },
85 {
86 'name': 'discovery_interval',
87 'type': 'count',
88 'default': 100
89 }
90 ]
91
92 COMMANDS = [
93 {
94 "cmd": "zabbix config-set name=key,type=CephString "
95 "name=value,type=CephString",
96 "desc": "Set a configuration value",
97 "perm": "rw"
98 },
99 {
100 "cmd": "zabbix config-show",
101 "desc": "Show current configuration",
102 "perm": "r"
103 },
104 {
105 "cmd": "zabbix send",
106 "desc": "Force sending data to Zabbix",
107 "perm": "rw"
108 },
109 {
110 "cmd": "zabbix discovery",
111 "desc": "Discovering Zabbix data",
112 "perm": "r"
113 },
114 ]
115
116 def __init__(self, *args, **kwargs):
117 super(Module, self).__init__(*args, **kwargs)
118 self.event = Event()
119
120 def init_module_config(self):
121 self.fsid = self.get('mon_map')['fsid']
122 self.log.debug('Found Ceph fsid %s', self.fsid)
123
124 for key, default in self.config_keys.items():
125 self.set_config_option(key, self.get_module_option(key, default))
126
127 if self.config['zabbix_host']:
128 self._parse_zabbix_hosts()
129
130 def set_config_option(self, option, value):
131 if option not in self.config_keys.keys():
132 raise RuntimeError('{0} is a unknown configuration '
133 'option'.format(option))
134
135 if option in ['zabbix_port', 'interval', 'discovery_interval']:
136 try:
137 value = int(value)
138 except (ValueError, TypeError):
139 raise RuntimeError('invalid {0} configured. Please specify '
140 'a valid integer'.format(option))
141
142 if option == 'interval' and value < 10:
143 raise RuntimeError('interval should be set to at least 10 seconds')
144
145 if option == 'discovery_interval' and value < 10:
146 raise RuntimeError(
147 "discovery_interval should not be more frequent "
148 "than once in 10 regular data collection"
149 )
150
151 self.log.debug('Setting in-memory config option %s to: %s', option,
152 value)
153 self.config[option] = value
154 return True
155
156 def _parse_zabbix_hosts(self):
157 self._zabbix_hosts = list()
158 servers = self.config['zabbix_host'].split(",")
159 for server in servers:
160 uri = re.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server)
161 if uri:
162 zabbix_host, sep, zabbix_port = uri.groups()
163 zabbix_port = zabbix_port if sep == ':' else self.config['zabbix_port']
164 self._zabbix_hosts.append({'zabbix_host': zabbix_host, 'zabbix_port': zabbix_port})
165 else:
166 self.log.error('Zabbix host "%s" is not valid', server)
167
168 self.log.error('Parsed Zabbix hosts: %s', self._zabbix_hosts)
169
170 def get_pg_stats(self):
171 stats = dict()
172
173 pg_states = ['active', 'peering', 'clean', 'scrubbing', 'undersized',
174 'backfilling', 'recovering', 'degraded', 'inconsistent',
175 'remapped', 'backfill_toofull', 'backfill_wait',
176 'recovery_wait']
177
178 for state in pg_states:
179 stats['num_pg_{0}'.format(state)] = 0
180
181 pg_status = self.get('pg_status')
182
183 stats['num_pg'] = pg_status['num_pgs']
184
185 for state in pg_status['pgs_by_state']:
186 states = state['state_name'].split('+')
187 for s in pg_states:
188 key = 'num_pg_{0}'.format(s)
189 if s in states:
190 stats[key] += state['count']
191
192 return stats
193
194 def get_data(self):
195 data = dict()
196
197 health = json.loads(self.get('health')['json'])
198 # 'status' is luminous+, 'overall_status' is legacy mode.
199 data['overall_status'] = health.get('status',
200 health.get('overall_status'))
201 data['overall_status_int'] = \
202 self.ceph_health_mapping.get(data['overall_status'])
203
204 mon_status = json.loads(self.get('mon_status')['json'])
205 data['num_mon'] = len(mon_status['monmap']['mons'])
206
207 df = self.get('df')
208 data['num_pools'] = len(df['pools'])
209 data['total_used_bytes'] = df['stats']['total_used_bytes']
210 data['total_bytes'] = df['stats']['total_bytes']
211 data['total_avail_bytes'] = df['stats']['total_avail_bytes']
212
213 wr_ops = 0
214 rd_ops = 0
215 wr_bytes = 0
216 rd_bytes = 0
217
218 for pool in df['pools']:
219 wr_ops += pool['stats']['wr']
220 rd_ops += pool['stats']['rd']
221 wr_bytes += pool['stats']['wr_bytes']
222 rd_bytes += pool['stats']['rd_bytes']
223 data['[{0},rd_bytes]'.format(pool['name'])] = pool['stats']['rd_bytes']
224 data['[{0},wr_bytes]'.format(pool['name'])] = pool['stats']['wr_bytes']
225 data['[{0},rd_ops]'.format(pool['name'])] = pool['stats']['rd']
226 data['[{0},wr_ops]'.format(pool['name'])] = pool['stats']['wr']
227 data['[{0},bytes_used]'.format(pool['name'])] = pool['stats']['bytes_used']
228 data['[{0},stored_raw]'.format(pool['name'])] = pool['stats']['stored_raw']
229 data['[{0},percent_used]'.format(pool['name'])] = pool['stats']['percent_used']
230
231 data['wr_ops'] = wr_ops
232 data['rd_ops'] = rd_ops
233 data['wr_bytes'] = wr_bytes
234 data['rd_bytes'] = rd_bytes
235
236 osd_map = self.get('osd_map')
237 data['num_osd'] = len(osd_map['osds'])
238 data['osd_nearfull_ratio'] = osd_map['nearfull_ratio']
239 data['osd_full_ratio'] = osd_map['full_ratio']
240 data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio']
241
242 data['num_pg_temp'] = len(osd_map['pg_temp'])
243
244 num_up = 0
245 num_in = 0
246 for osd in osd_map['osds']:
247 data['[osd.{0},up]'.format(int(osd['osd']))] = osd['up']
248 if osd['up'] == 1:
249 num_up += 1
250
251 data['[osd.{0},in]'.format(int(osd['osd']))] = osd['in']
252 if osd['in'] == 1:
253 num_in += 1
254
255 data['num_osd_up'] = num_up
256 data['num_osd_in'] = num_in
257
258 osd_fill = list()
259 osd_pgs = list()
260 osd_apply_latency_ns = list()
261 osd_commit_latency_ns = list()
262
263 osd_stats = self.get('osd_stats')
264 for osd in osd_stats['osd_stats']:
265 try:
266 osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100)
267 data['[osd.{0},osd_fill]'.format(osd['osd'])] = (
268 float(osd['kb_used']) / float(osd['kb'])) * 100
269 except ZeroDivisionError:
270 continue
271 osd_pgs.append(osd['num_pgs'])
272 osd_apply_latency_ns.append(osd['perf_stat']['apply_latency_ns'])
273 osd_commit_latency_ns.append(osd['perf_stat']['commit_latency_ns'])
274 data['[osd.{0},num_pgs]'.format(osd['osd'])] = osd['num_pgs']
275 data[
276 '[osd.{0},osd_latency_apply]'.format(osd['osd'])
277 ] = osd['perf_stat']['apply_latency_ns'] / 1000000.0 # ns -> ms
278 data[
279 '[osd.{0},osd_latency_commit]'.format(osd['osd'])
280 ] = osd['perf_stat']['commit_latency_ns'] / 1000000.0 # ns -> ms
281
282 try:
283 data['osd_max_fill'] = max(osd_fill)
284 data['osd_min_fill'] = min(osd_fill)
285 data['osd_avg_fill'] = avg(osd_fill)
286 data['osd_max_pgs'] = max(osd_pgs)
287 data['osd_min_pgs'] = min(osd_pgs)
288 data['osd_avg_pgs'] = avg(osd_pgs)
289 except ValueError:
290 pass
291
292 try:
293 data['osd_latency_apply_max'] = max(osd_apply_latency_ns) / 1000000.0 # ns -> ms
294 data['osd_latency_apply_min'] = min(osd_apply_latency_ns) / 1000000.0 # ns -> ms
295 data['osd_latency_apply_avg'] = avg(osd_apply_latency_ns) / 1000000.0 # ns -> ms
296
297 data['osd_latency_commit_max'] = max(osd_commit_latency_ns) / 1000000.0 # ns -> ms
298 data['osd_latency_commit_min'] = min(osd_commit_latency_ns) / 1000000.0 # ns -> ms
299 data['osd_latency_commit_avg'] = avg(osd_commit_latency_ns) / 1000000.0 # ns -> ms
300 except ValueError:
301 pass
302
303 data.update(self.get_pg_stats())
304
305 return data
306
307 def send(self, data):
308 identifier = self.config['identifier']
309 if identifier is None or len(identifier) == 0:
310 identifier = 'ceph-{0}'.format(self.fsid)
311
312 if not self.config['zabbix_host'] or not self._zabbix_hosts:
313 self.log.error('Zabbix server not set, please configure using: '
314 'ceph zabbix config-set zabbix_host <zabbix_host>')
315 self.set_health_checks({
316 'MGR_ZABBIX_NO_SERVER': {
317 'severity': 'warning',
318 'summary': 'No Zabbix server configured',
319 'detail': ['Configuration value zabbix_host not configured']
320 }
321 })
322 return
323
324 result = True
325
326 for server in self._zabbix_hosts:
327 self.log.info(
328 'Sending data to Zabbix server %s, port %s as host/identifier %s',
329 server['zabbix_host'], server['zabbix_port'], identifier)
330 self.log.debug(data)
331
332 try:
333 zabbix = ZabbixSender(self.config['zabbix_sender'],
334 server['zabbix_host'],
335 server['zabbix_port'], self.log)
336 zabbix.send(identifier, data)
337 except Exception as exc:
338 self.log.exception('Failed to send.')
339 self.set_health_checks({
340 'MGR_ZABBIX_SEND_FAILED': {
341 'severity': 'warning',
342 'summary': 'Failed to send data to Zabbix',
343 'detail': [str(exc)]
344 }
345 })
346 result = False
347
348 self.set_health_checks(dict())
349 return result
350
351 def discovery(self):
352 osd_map = self.get('osd_map')
353 osd_map_crush = self.get('osd_map_crush')
354
355 # Discovering ceph pools
356 pool_discovery = {
357 pool['pool_name']: step['item_name']
358 for pool in osd_map['pools']
359 for rule in osd_map_crush['rules'] if rule['rule_id'] == pool['crush_rule']
360 for step in rule['steps'] if step['op'] == "take"
361 }
362 pools_discovery_data = {"data": [
363 {
364 "{#POOL}": pool,
365 "{#CRUSH_RULE}": rule
366 }
367 for pool, rule in pool_discovery.items()
368 ]}
369
370 # Discovering OSDs
371 # Getting hosts for found crush rules
372 osd_roots = {
373 step['item_name']: [
374 item['id']
375 for item in root_bucket['items']
376 ]
377 for rule in osd_map_crush['rules']
378 for step in rule['steps'] if step['op'] == "take"
379 for root_bucket in osd_map_crush['buckets']
380 if root_bucket['id'] == step['item']
381 }
382 # Getting osds for hosts with map to crush_rule
383 osd_discovery = {
384 item['id']: crush_rule
385 for crush_rule, roots in osd_roots.items()
386 for root in roots
387 for bucket in osd_map_crush['buckets']
388 if bucket['id'] == root
389 for item in bucket['items']
390 }
391 osd_discovery_data = {"data": [
392 {
393 "{#OSD}": osd,
394 "{#CRUSH_RULE}": rule
395 }
396 for osd, rule in osd_discovery.items()
397 ]}
398 # Preparing recieved data for sending
399 data = {
400 "zabbix.pool.discovery": json.dumps(pools_discovery_data),
401 "zabbix.osd.discovery": json.dumps(osd_discovery_data)
402 }
403 return bool(self.send(data))
404
405 def handle_command(self, inbuf, command):
406 if command['prefix'] == 'zabbix config-show':
407 return 0, json.dumps(self.config, indent=4, sort_keys=True), ''
408 elif command['prefix'] == 'zabbix config-set':
409 key = command['key']
410 value = command['value']
411 if not value:
412 return -errno.EINVAL, '', 'Value should not be empty or None'
413
414 self.log.debug('Setting configuration option %s to %s', key, value)
415 if self.set_config_option(key, value):
416 self.set_module_option(key, value)
417 if key == 'zabbix_host' or key == 'zabbix_port':
418 self._parse_zabbix_hosts()
419 return 0, 'Configuration option {0} updated'.format(key), ''
420
421 return 1,\
422 'Failed to update configuration option {0}'.format(key), ''
423
424 elif command['prefix'] == 'zabbix send':
425 data = self.get_data()
426 if self.send(data):
427 return 0, 'Sending data to Zabbix', ''
428
429 return 1, 'Failed to send data to Zabbix', ''
430
431 elif command['prefix'] == 'zabbix discovery':
432 if self.discovery():
433 return 0, 'Sending discovery data to Zabbix', ''
434
435 return 1, 'Failed to send discovery data to Zabbix', ''
436
437 else:
438 return (-errno.EINVAL, '',
439 "Command not found '{0}'".format(command['prefix']))
440
441 def shutdown(self):
442 self.log.info('Stopping zabbix')
443 self.run = False
444 self.event.set()
445
446 def serve(self):
447 self.log.info('Zabbix module starting up')
448 self.run = True
449
450 self.init_module_config()
451
452 discovery_interval = self.config['discovery_interval']
453 # We are sending discovery once plugin is loaded
454 discovery_counter = discovery_interval
455 while self.run:
456 self.log.debug('Waking up for new iteration')
457
458 if discovery_counter == discovery_interval:
459 try:
460 self.discovery()
461 except Exception as exc:
462 # Shouldn't happen, but let's log it and retry next interval,
463 # rather than dying completely.
464 self.log.exception("Unexpected error during discovery():")
465 finally:
466 discovery_counter = 0
467
468 try:
469 data = self.get_data()
470 self.send(data)
471 except Exception as exc:
472 # Shouldn't happen, but let's log it and retry next interval,
473 # rather than dying completely.
474 self.log.exception("Unexpected error during send():")
475
476 interval = self.config['interval']
477 self.log.debug('Sleeping for %d seconds', interval)
478 discovery_counter += 1
479 self.event.wait(interval)
480
481 def self_test(self):
482 data = self.get_data()
483
484 if data['overall_status'] not in self.ceph_health_mapping:
485 raise RuntimeError('No valid overall_status found in data')
486
487 int(data['overall_status_int'])
488
489 if data['num_mon'] < 1:
490 raise RuntimeError('num_mon is smaller than 1')