]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/zabbix/module.py
d427038fa3107d2cbedf98c28798f4203196f696
2 Zabbix module for ceph-mgr
4 Collect statistics from Ceph cluster and every X seconds send data to a Zabbix
5 server using the zabbix_sender executable.
10 from subprocess
import Popen
, PIPE
11 from threading
import Event
12 from mgr_module
import MgrModule
17 return sum(data
) / float(len(data
))
22 class ZabbixSender(object):
23 def __init__(self
, sender
, host
, port
, log
):
29 def send(self
, hostname
, data
):
33 cmd
= [self
.sender
, '-z', self
.host
, '-p', str(self
.port
), '-s',
34 hostname
, '-vv', '-i', '-']
36 self
.log
.debug('Executing: %s', cmd
)
38 proc
= Popen(cmd
, stdin
=PIPE
, stdout
=PIPE
, stderr
=PIPE
)
40 for key
, value
in data
.items():
41 proc
.stdin
.write('{0} ceph.{1} {2}\n'.format(hostname
, key
, value
).encode('utf-8'))
43 stdout
, stderr
= proc
.communicate()
44 if proc
.returncode
!= 0:
45 raise RuntimeError('%s exited non-zero: %s' % (self
.sender
,
48 self
.log
.debug('Zabbix Sender: %s', stdout
.rstrip())
51 class Module(MgrModule
):
54 ceph_health_mapping
= {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
55 _zabbix_hosts
= list()
58 def config_keys(self
):
59 return dict((o
['name'], o
.get('default', None))
60 for o
in self
.MODULE_OPTIONS
)
64 'name': 'zabbix_sender',
65 'default': '/usr/bin/zabbix_sender'
68 'name': 'zabbix_host',
72 'name': 'zabbix_port',
86 'name': 'discovery_interval',
94 "cmd": "zabbix config-set name=key,type=CephString "
95 "name=value,type=CephString",
96 "desc": "Set a configuration value",
100 "cmd": "zabbix config-show",
101 "desc": "Show current configuration",
105 "cmd": "zabbix send",
106 "desc": "Force sending data to Zabbix",
110 "cmd": "zabbix discovery",
111 "desc": "Discovering Zabbix data",
116 def __init__(self
, *args
, **kwargs
):
117 super(Module
, self
).__init
__(*args
, **kwargs
)
120 def init_module_config(self
):
121 self
.fsid
= self
.get('mon_map')['fsid']
122 self
.log
.debug('Found Ceph fsid %s', self
.fsid
)
124 for key
, default
in self
.config_keys
.items():
125 self
.set_config_option(key
, self
.get_module_option(key
, default
))
127 if self
.config
['zabbix_host']:
128 self
._parse
_zabbix
_hosts
()
130 def set_config_option(self
, option
, value
):
131 if option
not in self
.config_keys
.keys():
132 raise RuntimeError('{0} is a unknown configuration '
133 'option'.format(option
))
135 if option
in ['zabbix_port', 'interval', 'discovery_interval']:
138 except (ValueError, TypeError):
139 raise RuntimeError('invalid {0} configured. Please specify '
140 'a valid integer'.format(option
))
142 if option
== 'interval' and value
< 10:
143 raise RuntimeError('interval should be set to at least 10 seconds')
145 if option
== 'discovery_interval' and value
< 10:
147 "discovery_interval should not be more frequent "
148 "than once in 10 regular data collection"
151 self
.log
.debug('Setting in-memory config option %s to: %s', option
,
153 self
.config
[option
] = value
156 def _parse_zabbix_hosts(self
):
157 self
._zabbix
_hosts
= list()
158 servers
= self
.config
['zabbix_host'].split(",")
159 for server
in servers
:
160 uri
= re
.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server
)
162 zabbix_host
, sep
, zabbix_port
= uri
.groups()
163 zabbix_port
= zabbix_port
if sep
== ':' else self
.config
['zabbix_port']
164 self
._zabbix
_hosts
.append({'zabbix_host': zabbix_host
, 'zabbix_port': zabbix_port
})
166 self
.log
.error('Zabbix host "%s" is not valid', server
)
168 self
.log
.error('Parsed Zabbix hosts: %s', self
._zabbix
_hosts
)
170 def get_pg_stats(self
):
173 pg_states
= ['active', 'peering', 'clean', 'scrubbing', 'undersized',
174 'backfilling', 'recovering', 'degraded', 'inconsistent',
175 'remapped', 'backfill_toofull', 'backfill_wait',
178 for state
in pg_states
:
179 stats
['num_pg_{0}'.format(state
)] = 0
181 pg_status
= self
.get('pg_status')
183 stats
['num_pg'] = pg_status
['num_pgs']
185 for state
in pg_status
['pgs_by_state']:
186 states
= state
['state_name'].split('+')
188 key
= 'num_pg_{0}'.format(s
)
190 stats
[key
] += state
['count']
197 health
= json
.loads(self
.get('health')['json'])
198 # 'status' is luminous+, 'overall_status' is legacy mode.
199 data
['overall_status'] = health
.get('status',
200 health
.get('overall_status'))
201 data
['overall_status_int'] = \
202 self
.ceph_health_mapping
.get(data
['overall_status'])
204 mon_status
= json
.loads(self
.get('mon_status')['json'])
205 data
['num_mon'] = len(mon_status
['monmap']['mons'])
208 data
['num_pools'] = len(df
['pools'])
209 data
['total_used_bytes'] = df
['stats']['total_used_bytes']
210 data
['total_bytes'] = df
['stats']['total_bytes']
211 data
['total_avail_bytes'] = df
['stats']['total_avail_bytes']
218 for pool
in df
['pools']:
219 wr_ops
+= pool
['stats']['wr']
220 rd_ops
+= pool
['stats']['rd']
221 wr_bytes
+= pool
['stats']['wr_bytes']
222 rd_bytes
+= pool
['stats']['rd_bytes']
223 data
['[{0},rd_bytes]'.format(pool
['name'])] = pool
['stats']['rd_bytes']
224 data
['[{0},wr_bytes]'.format(pool
['name'])] = pool
['stats']['wr_bytes']
225 data
['[{0},rd_ops]'.format(pool
['name'])] = pool
['stats']['rd']
226 data
['[{0},wr_ops]'.format(pool
['name'])] = pool
['stats']['wr']
227 data
['[{0},bytes_used]'.format(pool
['name'])] = pool
['stats']['bytes_used']
228 data
['[{0},stored_raw]'.format(pool
['name'])] = pool
['stats']['stored_raw']
229 data
['[{0},percent_used]'.format(pool
['name'])] = pool
['stats']['percent_used']
231 data
['wr_ops'] = wr_ops
232 data
['rd_ops'] = rd_ops
233 data
['wr_bytes'] = wr_bytes
234 data
['rd_bytes'] = rd_bytes
236 osd_map
= self
.get('osd_map')
237 data
['num_osd'] = len(osd_map
['osds'])
238 data
['osd_nearfull_ratio'] = osd_map
['nearfull_ratio']
239 data
['osd_full_ratio'] = osd_map
['full_ratio']
240 data
['osd_backfillfull_ratio'] = osd_map
['backfillfull_ratio']
242 data
['num_pg_temp'] = len(osd_map
['pg_temp'])
246 for osd
in osd_map
['osds']:
247 data
['[osd.{0},up]'.format(int(osd
['osd']))] = osd
['up']
251 data
['[osd.{0},in]'.format(int(osd
['osd']))] = osd
['in']
255 data
['num_osd_up'] = num_up
256 data
['num_osd_in'] = num_in
260 osd_apply_latency_ns
= list()
261 osd_commit_latency_ns
= list()
263 osd_stats
= self
.get('osd_stats')
264 for osd
in osd_stats
['osd_stats']:
266 osd_fill
.append((float(osd
['kb_used']) / float(osd
['kb'])) * 100)
267 data
['[osd.{0},osd_fill]'.format(osd
['osd'])] = (
268 float(osd
['kb_used']) / float(osd
['kb'])) * 100
269 except ZeroDivisionError:
271 osd_pgs
.append(osd
['num_pgs'])
272 osd_apply_latency_ns
.append(osd
['perf_stat']['apply_latency_ns'])
273 osd_commit_latency_ns
.append(osd
['perf_stat']['commit_latency_ns'])
274 data
['[osd.{0},num_pgs]'.format(osd
['osd'])] = osd
['num_pgs']
276 '[osd.{0},osd_latency_apply]'.format(osd
['osd'])
277 ] = osd
['perf_stat']['apply_latency_ns'] / 1000000.0 # ns -> ms
279 '[osd.{0},osd_latency_commit]'.format(osd
['osd'])
280 ] = osd
['perf_stat']['commit_latency_ns'] / 1000000.0 # ns -> ms
283 data
['osd_max_fill'] = max(osd_fill
)
284 data
['osd_min_fill'] = min(osd_fill
)
285 data
['osd_avg_fill'] = avg(osd_fill
)
286 data
['osd_max_pgs'] = max(osd_pgs
)
287 data
['osd_min_pgs'] = min(osd_pgs
)
288 data
['osd_avg_pgs'] = avg(osd_pgs
)
293 data
['osd_latency_apply_max'] = max(osd_apply_latency_ns
) / 1000000.0 # ns -> ms
294 data
['osd_latency_apply_min'] = min(osd_apply_latency_ns
) / 1000000.0 # ns -> ms
295 data
['osd_latency_apply_avg'] = avg(osd_apply_latency_ns
) / 1000000.0 # ns -> ms
297 data
['osd_latency_commit_max'] = max(osd_commit_latency_ns
) / 1000000.0 # ns -> ms
298 data
['osd_latency_commit_min'] = min(osd_commit_latency_ns
) / 1000000.0 # ns -> ms
299 data
['osd_latency_commit_avg'] = avg(osd_commit_latency_ns
) / 1000000.0 # ns -> ms
303 data
.update(self
.get_pg_stats())
307 def send(self
, data
):
308 identifier
= self
.config
['identifier']
309 if identifier
is None or len(identifier
) == 0:
310 identifier
= 'ceph-{0}'.format(self
.fsid
)
312 if not self
.config
['zabbix_host'] or not self
._zabbix
_hosts
:
313 self
.log
.error('Zabbix server not set, please configure using: '
314 'ceph zabbix config-set zabbix_host <zabbix_host>')
315 self
.set_health_checks({
316 'MGR_ZABBIX_NO_SERVER': {
317 'severity': 'warning',
318 'summary': 'No Zabbix server configured',
319 'detail': ['Configuration value zabbix_host not configured']
326 for server
in self
._zabbix
_hosts
:
328 'Sending data to Zabbix server %s, port %s as host/identifier %s',
329 server
['zabbix_host'], server
['zabbix_port'], identifier
)
333 zabbix
= ZabbixSender(self
.config
['zabbix_sender'],
334 server
['zabbix_host'],
335 server
['zabbix_port'], self
.log
)
336 zabbix
.send(identifier
, data
)
337 except Exception as exc
:
338 self
.log
.exception('Failed to send.')
339 self
.set_health_checks({
340 'MGR_ZABBIX_SEND_FAILED': {
341 'severity': 'warning',
342 'summary': 'Failed to send data to Zabbix',
348 self
.set_health_checks(dict())
352 osd_map
= self
.get('osd_map')
353 osd_map_crush
= self
.get('osd_map_crush')
355 # Discovering ceph pools
357 pool
['pool_name']: step
['item_name']
358 for pool
in osd_map
['pools']
359 for rule
in osd_map_crush
['rules'] if rule
['rule_id'] == pool
['crush_rule']
360 for step
in rule
['steps'] if step
['op'] == "take"
362 pools_discovery_data
= {"data": [
365 "{#CRUSH_RULE}": rule
367 for pool
, rule
in pool_discovery
.items()
371 # Getting hosts for found crush rules
375 for item
in root_bucket
['items']
377 for rule
in osd_map_crush
['rules']
378 for step
in rule
['steps'] if step
['op'] == "take"
379 for root_bucket
in osd_map_crush
['buckets']
380 if root_bucket
['id'] == step
['item']
382 # Getting osds for hosts with map to crush_rule
384 item
['id']: crush_rule
385 for crush_rule
, roots
in osd_roots
.items()
387 for bucket
in osd_map_crush
['buckets']
388 if bucket
['id'] == root
389 for item
in bucket
['items']
391 osd_discovery_data
= {"data": [
394 "{#CRUSH_RULE}": rule
396 for osd
, rule
in osd_discovery
.items()
398 # Preparing recieved data for sending
400 "zabbix.pool.discovery": json
.dumps(pools_discovery_data
),
401 "zabbix.osd.discovery": json
.dumps(osd_discovery_data
)
403 return bool(self
.send(data
))
405 def handle_command(self
, inbuf
, command
):
406 if command
['prefix'] == 'zabbix config-show':
407 return 0, json
.dumps(self
.config
, indent
=4, sort_keys
=True), ''
408 elif command
['prefix'] == 'zabbix config-set':
410 value
= command
['value']
412 return -errno
.EINVAL
, '', 'Value should not be empty or None'
414 self
.log
.debug('Setting configuration option %s to %s', key
, value
)
415 if self
.set_config_option(key
, value
):
416 self
.set_module_option(key
, value
)
417 if key
== 'zabbix_host' or key
== 'zabbix_port':
418 self
._parse
_zabbix
_hosts
()
419 return 0, 'Configuration option {0} updated'.format(key
), ''
422 'Failed to update configuration option {0}'.format(key
), ''
424 elif command
['prefix'] == 'zabbix send':
425 data
= self
.get_data()
427 return 0, 'Sending data to Zabbix', ''
429 return 1, 'Failed to send data to Zabbix', ''
431 elif command
['prefix'] == 'zabbix discovery':
433 return 0, 'Sending discovery data to Zabbix', ''
435 return 1, 'Failed to send discovery data to Zabbix', ''
438 return (-errno
.EINVAL
, '',
439 "Command not found '{0}'".format(command
['prefix']))
442 self
.log
.info('Stopping zabbix')
447 self
.log
.info('Zabbix module starting up')
450 self
.init_module_config()
452 discovery_interval
= self
.config
['discovery_interval']
453 # We are sending discovery once plugin is loaded
454 discovery_counter
= discovery_interval
456 self
.log
.debug('Waking up for new iteration')
458 if discovery_counter
== discovery_interval
:
461 except Exception as exc
:
462 # Shouldn't happen, but let's log it and retry next interval,
463 # rather than dying completely.
464 self
.log
.exception("Unexpected error during discovery():")
466 discovery_counter
= 0
469 data
= self
.get_data()
471 except Exception as exc
:
472 # Shouldn't happen, but let's log it and retry next interval,
473 # rather than dying completely.
474 self
.log
.exception("Unexpected error during send():")
476 interval
= self
.config
['interval']
477 self
.log
.debug('Sleeping for %d seconds', interval
)
478 discovery_counter
+= 1
479 self
.event
.wait(interval
)
482 data
= self
.get_data()
484 if data
['overall_status'] not in self
.ceph_health_mapping
:
485 raise RuntimeError('No valid overall_status found in data')
487 int(data
['overall_status_int'])
489 if data
['num_mon'] < 1:
490 raise RuntimeError('num_mon is smaller than 1')