]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/zabbix/module.py
2 Zabbix module for ceph-mgr
4 Collect statistics from Ceph cluster and every X seconds send data to a Zabbix
5 server using the zabbix_sender executable.
11 from subprocess
import Popen
, PIPE
12 from threading
import Event
13 from mgr_module
import CLIReadCommand
, CLIWriteCommand
, MgrModule
, Option
, OptionValue
14 from typing
import cast
, Any
, Dict
, List
, Mapping
, Optional
, Sequence
, Tuple
, Union
17 def avg(data
: Sequence
[Union
[int, float]]) -> float:
19 return sum(data
) / float(len(data
))
24 class ZabbixSender(object):
25 def __init__(self
, sender
: str, host
: str, port
: int, log
: logging
.Logger
) -> None:
31 def send(self
, hostname
: str, data
: Mapping
[str, Union
[int, float, str]]) -> None:
35 cmd
= [self
.sender
, '-z', self
.host
, '-p', str(self
.port
), '-s',
36 hostname
, '-vv', '-i', '-']
38 self
.log
.debug('Executing: %s', cmd
)
40 proc
= Popen(cmd
, stdin
=PIPE
, stdout
=PIPE
, stderr
=PIPE
, encoding
='utf-8')
42 for key
, value
in data
.items():
44 proc
.stdin
.write('{0} ceph.{1} {2}\n'.format(hostname
, key
, value
))
46 stdout
, stderr
= proc
.communicate()
47 if proc
.returncode
!= 0:
48 raise RuntimeError('%s exited non-zero: %s' % (self
.sender
,
51 self
.log
.debug('Zabbix Sender: %s', stdout
.rstrip())
54 class Module(MgrModule
):
56 config
: Dict
[str, OptionValue
] = {}
57 ceph_health_mapping
= {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
58 _zabbix_hosts
: List
[Dict
[str, Union
[str, int]]] = list()
61 def config_keys(self
) -> Dict
[str, OptionValue
]:
62 return dict((o
['name'], o
.get('default', None))
63 for o
in self
.MODULE_OPTIONS
)
68 default
='/usr/bin/zabbix_sender'),
85 name
='discovery_interval',
90 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
91 super(Module
, self
).__init
__(*args
, **kwargs
)
94 def init_module_config(self
) -> None:
95 self
.fsid
= self
.get('mon_map')['fsid']
96 self
.log
.debug('Found Ceph fsid %s', self
.fsid
)
98 for key
, default
in self
.config_keys
.items():
99 self
.set_config_option(key
, self
.get_module_option(key
, default
))
101 if self
.config
['zabbix_host']:
102 self
._parse
_zabbix
_hosts
()
104 def set_config_option(self
, option
: str, value
: OptionValue
) -> bool:
105 if option
not in self
.config_keys
.keys():
106 raise RuntimeError('{0} is a unknown configuration '
107 'option'.format(option
))
109 if option
in ['zabbix_port', 'interval', 'discovery_interval']:
111 int_value
= int(value
) # type: ignore
112 except (ValueError, TypeError):
113 raise RuntimeError('invalid {0} configured. Please specify '
114 'a valid integer'.format(option
))
116 if option
== 'interval' and int_value
< 10:
117 raise RuntimeError('interval should be set to at least 10 seconds')
119 if option
== 'discovery_interval' and int_value
< 10:
121 "discovery_interval should not be more frequent "
122 "than once in 10 regular data collection"
125 self
.log
.debug('Setting in-memory config option %s to: %s', option
,
127 self
.config
[option
] = value
130 def _parse_zabbix_hosts(self
) -> None:
131 self
._zabbix
_hosts
= list()
132 servers
= cast(str, self
.config
['zabbix_host']).split(",")
133 for server
in servers
:
134 uri
= re
.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server
)
136 zabbix_host
, sep
, opt_zabbix_port
= uri
.groups()
138 zabbix_port
= int(opt_zabbix_port
)
140 zabbix_port
= cast(int, self
.config
['zabbix_port'])
141 self
._zabbix
_hosts
.append({'zabbix_host': zabbix_host
, 'zabbix_port': zabbix_port
})
143 self
.log
.error('Zabbix host "%s" is not valid', server
)
145 self
.log
.error('Parsed Zabbix hosts: %s', self
._zabbix
_hosts
)
147 def get_pg_stats(self
) -> Dict
[str, int]:
150 pg_states
= ['active', 'peering', 'clean', 'scrubbing', 'undersized',
151 'backfilling', 'recovering', 'degraded', 'inconsistent',
152 'remapped', 'backfill_toofull', 'backfill_wait',
155 for state
in pg_states
:
156 stats
['num_pg_{0}'.format(state
)] = 0
158 pg_status
= self
.get('pg_status')
160 stats
['num_pg'] = pg_status
['num_pgs']
162 for state
in pg_status
['pgs_by_state']:
163 states
= state
['state_name'].split('+')
165 key
= 'num_pg_{0}'.format(s
)
167 stats
[key
] += state
['count']
171 def get_data(self
) -> Dict
[str, Union
[int, float]]:
174 health
= json
.loads(self
.get('health')['json'])
175 # 'status' is luminous+, 'overall_status' is legacy mode.
176 data
['overall_status'] = health
.get('status',
177 health
.get('overall_status'))
178 data
['overall_status_int'] = \
179 self
.ceph_health_mapping
.get(data
['overall_status'])
181 mon_status
= json
.loads(self
.get('mon_status')['json'])
182 data
['num_mon'] = len(mon_status
['monmap']['mons'])
185 data
['num_pools'] = len(df
['pools'])
186 data
['total_used_bytes'] = df
['stats']['total_used_bytes']
187 data
['total_bytes'] = df
['stats']['total_bytes']
188 data
['total_avail_bytes'] = df
['stats']['total_avail_bytes']
195 for pool
in df
['pools']:
196 wr_ops
+= pool
['stats']['wr']
197 rd_ops
+= pool
['stats']['rd']
198 wr_bytes
+= pool
['stats']['wr_bytes']
199 rd_bytes
+= pool
['stats']['rd_bytes']
200 data
['[{0},rd_bytes]'.format(pool
['name'])] = pool
['stats']['rd_bytes']
201 data
['[{0},wr_bytes]'.format(pool
['name'])] = pool
['stats']['wr_bytes']
202 data
['[{0},rd_ops]'.format(pool
['name'])] = pool
['stats']['rd']
203 data
['[{0},wr_ops]'.format(pool
['name'])] = pool
['stats']['wr']
204 data
['[{0},bytes_used]'.format(pool
['name'])] = pool
['stats']['bytes_used']
205 data
['[{0},stored_raw]'.format(pool
['name'])] = pool
['stats']['stored_raw']
206 data
['[{0},percent_used]'.format(pool
['name'])] = pool
['stats']['percent_used'] * 100
208 data
['wr_ops'] = wr_ops
209 data
['rd_ops'] = rd_ops
210 data
['wr_bytes'] = wr_bytes
211 data
['rd_bytes'] = rd_bytes
213 osd_map
= self
.get('osd_map')
214 data
['num_osd'] = len(osd_map
['osds'])
215 data
['osd_nearfull_ratio'] = osd_map
['nearfull_ratio']
216 data
['osd_full_ratio'] = osd_map
['full_ratio']
217 data
['osd_backfillfull_ratio'] = osd_map
['backfillfull_ratio']
219 data
['num_pg_temp'] = len(osd_map
['pg_temp'])
223 for osd
in osd_map
['osds']:
224 data
['[osd.{0},up]'.format(int(osd
['osd']))] = osd
['up']
228 data
['[osd.{0},in]'.format(int(osd
['osd']))] = osd
['in']
232 data
['num_osd_up'] = num_up
233 data
['num_osd_in'] = num_in
237 osd_apply_latency_ns
= list()
238 osd_commit_latency_ns
= list()
240 osd_stats
= self
.get('osd_stats')
241 for osd
in osd_stats
['osd_stats']:
243 osd_fill
.append((float(osd
['kb_used']) / float(osd
['kb'])) * 100)
244 data
['[osd.{0},osd_fill]'.format(osd
['osd'])] = (
245 float(osd
['kb_used']) / float(osd
['kb'])) * 100
246 except ZeroDivisionError:
248 osd_pgs
.append(osd
['num_pgs'])
249 osd_apply_latency_ns
.append(osd
['perf_stat']['apply_latency_ns'])
250 osd_commit_latency_ns
.append(osd
['perf_stat']['commit_latency_ns'])
251 data
['[osd.{0},num_pgs]'.format(osd
['osd'])] = osd
['num_pgs']
253 '[osd.{0},osd_latency_apply]'.format(osd
['osd'])
254 ] = osd
['perf_stat']['apply_latency_ns'] / 1000000.0 # ns -> ms
256 '[osd.{0},osd_latency_commit]'.format(osd
['osd'])
257 ] = osd
['perf_stat']['commit_latency_ns'] / 1000000.0 # ns -> ms
260 data
['osd_max_fill'] = max(osd_fill
)
261 data
['osd_min_fill'] = min(osd_fill
)
262 data
['osd_avg_fill'] = avg(osd_fill
)
263 data
['osd_max_pgs'] = max(osd_pgs
)
264 data
['osd_min_pgs'] = min(osd_pgs
)
265 data
['osd_avg_pgs'] = avg(osd_pgs
)
270 data
['osd_latency_apply_max'] = max(osd_apply_latency_ns
) / 1000000.0 # ns -> ms
271 data
['osd_latency_apply_min'] = min(osd_apply_latency_ns
) / 1000000.0 # ns -> ms
272 data
['osd_latency_apply_avg'] = avg(osd_apply_latency_ns
) / 1000000.0 # ns -> ms
274 data
['osd_latency_commit_max'] = max(osd_commit_latency_ns
) / 1000000.0 # ns -> ms
275 data
['osd_latency_commit_min'] = min(osd_commit_latency_ns
) / 1000000.0 # ns -> ms
276 data
['osd_latency_commit_avg'] = avg(osd_commit_latency_ns
) / 1000000.0 # ns -> ms
280 data
.update(self
.get_pg_stats())
284 def send(self
, data
: Mapping
[str, Union
[int, float, str]]) -> bool:
285 identifier
= cast(Optional
[str], self
.config
['identifier'])
286 if identifier
is None or len(identifier
) == 0:
287 identifier
= 'ceph-{0}'.format(self
.fsid
)
289 if not self
.config
['zabbix_host'] or not self
._zabbix
_hosts
:
290 self
.log
.error('Zabbix server not set, please configure using: '
291 'ceph zabbix config-set zabbix_host <zabbix_host>')
292 self
.set_health_checks({
293 'MGR_ZABBIX_NO_SERVER': {
294 'severity': 'warning',
295 'summary': 'No Zabbix server configured',
296 'detail': ['Configuration value zabbix_host not configured']
303 for server
in self
._zabbix
_hosts
:
305 'Sending data to Zabbix server %s, port %s as host/identifier %s',
306 server
['zabbix_host'], server
['zabbix_port'], identifier
)
310 zabbix
= ZabbixSender(cast(str, self
.config
['zabbix_sender']),
311 cast(str, server
['zabbix_host']),
312 cast(int, server
['zabbix_port']), self
.log
)
313 zabbix
.send(identifier
, data
)
314 except Exception as exc
:
315 self
.log
.exception('Failed to send.')
316 self
.set_health_checks({
317 'MGR_ZABBIX_SEND_FAILED': {
318 'severity': 'warning',
319 'summary': 'Failed to send data to Zabbix',
325 self
.set_health_checks(dict())
328 def discovery(self
) -> bool:
329 osd_map
= self
.get('osd_map')
330 osd_map_crush
= self
.get('osd_map_crush')
332 # Discovering ceph pools
334 pool
['pool_name']: step
['item_name']
335 for pool
in osd_map
['pools']
336 for rule
in osd_map_crush
['rules'] if rule
['rule_id'] == pool
['crush_rule']
337 for step
in rule
['steps'] if step
['op'] == "take"
339 pools_discovery_data
= {"data": [
342 "{#CRUSH_RULE}": rule
344 for pool
, rule
in pool_discovery
.items()
348 # Getting hosts for found crush rules
352 for item
in root_bucket
['items']
354 for rule
in osd_map_crush
['rules']
355 for step
in rule
['steps'] if step
['op'] == "take"
356 for root_bucket
in osd_map_crush
['buckets']
357 if root_bucket
['id'] == step
['item']
359 # Getting osds for hosts with map to crush_rule
361 item
['id']: crush_rule
362 for crush_rule
, roots
in osd_roots
.items()
364 for bucket
in osd_map_crush
['buckets']
365 if bucket
['id'] == root
366 for item
in bucket
['items']
368 osd_discovery_data
= {"data": [
371 "{#CRUSH_RULE}": rule
373 for osd
, rule
in osd_discovery
.items()
375 # Preparing recieved data for sending
377 "zabbix.pool.discovery": json
.dumps(pools_discovery_data
),
378 "zabbix.osd.discovery": json
.dumps(osd_discovery_data
)
380 return bool(self
.send(data
))
382 @CLIReadCommand('zabbix config-show')
383 def config_show(self
) -> Tuple
[int, str, str]:
385 Show current configuration
387 return 0, json
.dumps(self
.config
, indent
=4, sort_keys
=True), ''
389 @CLIWriteCommand('zabbix config-set')
390 def config_set(self
, key
: str, value
: str) -> Tuple
[int, str, str]:
392 Set a configuration value
395 return -errno
.EINVAL
, '', 'Value should not be empty or None'
397 self
.log
.debug('Setting configuration option %s to %s', key
, value
)
398 if self
.set_config_option(key
, value
):
399 self
.set_module_option(key
, value
)
400 if key
== 'zabbix_host' or key
== 'zabbix_port':
401 self
._parse
_zabbix
_hosts
()
402 return 0, 'Configuration option {0} updated'.format(key
), ''
404 'Failed to update configuration option {0}'.format(key
), ''
406 @CLIReadCommand('zabbix send')
407 def do_send(self
) -> Tuple
[int, str, str]:
409 Force sending data to Zabbix
411 data
= self
.get_data()
413 return 0, 'Sending data to Zabbix', ''
415 return 1, 'Failed to send data to Zabbix', ''
417 @CLIReadCommand('zabbix discovery')
418 def do_discovery(self
) -> Tuple
[int, str, str]:
420 Discovering Zabbix data
423 return 0, 'Sending discovery data to Zabbix', ''
425 return 1, 'Failed to send discovery data to Zabbix', ''
427 def shutdown(self
) -> None:
428 self
.log
.info('Stopping zabbix')
432 def serve(self
) -> None:
433 self
.log
.info('Zabbix module starting up')
436 self
.init_module_config()
438 discovery_interval
= self
.config
['discovery_interval']
439 # We are sending discovery once plugin is loaded
440 discovery_counter
= cast(int, discovery_interval
)
442 self
.log
.debug('Waking up for new iteration')
444 if discovery_counter
== discovery_interval
:
448 # Shouldn't happen, but let's log it and retry next interval,
449 # rather than dying completely.
450 self
.log
.exception("Unexpected error during discovery():")
452 discovery_counter
= 0
455 data
= self
.get_data()
458 # Shouldn't happen, but let's log it and retry next interval,
459 # rather than dying completely.
460 self
.log
.exception("Unexpected error during send():")
462 interval
= cast(float, self
.config
['interval'])
463 self
.log
.debug('Sleeping for %d seconds', interval
)
464 discovery_counter
+= 1
465 self
.event
.wait(interval
)
467 def self_test(self
) -> None:
468 data
= self
.get_data()
470 if data
['overall_status'] not in self
.ceph_health_mapping
:
471 raise RuntimeError('No valid overall_status found in data')
473 int(data
['overall_status_int'])
475 if data
['num_mon'] < 1:
476 raise RuntimeError('num_mon is smaller than 1')