]>
Commit | Line | Data |
---|---|---|
224ce89b WB |
1 | """ |
2 | Zabbix module for ceph-mgr | |
3 | ||
4 | Collect statistics from Ceph cluster and every X seconds send data to a Zabbix | |
5 | server using the zabbix_sender executable. | |
6 | """ | |
7 | import json | |
8 | import errno | |
92f5a8d4 | 9 | import re |
224ce89b WB |
10 | from subprocess import Popen, PIPE |
11 | from threading import Event | |
12 | from mgr_module import MgrModule | |
13 | ||
14 | ||
15 | def avg(data): | |
3efd9988 FG |
16 | if len(data): |
17 | return sum(data) / float(len(data)) | |
18 | else: | |
19 | return 0 | |
224ce89b WB |
20 | |
21 | ||
22 | class ZabbixSender(object): | |
23 | def __init__(self, sender, host, port, log): | |
24 | self.sender = sender | |
25 | self.host = host | |
26 | self.port = port | |
27 | self.log = log | |
28 | ||
29 | def send(self, hostname, data): | |
30 | if len(data) == 0: | |
31 | return | |
32 | ||
33 | cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s', | |
34 | hostname, '-vv', '-i', '-'] | |
35 | ||
11fdf7f2 TL |
36 | self.log.debug('Executing: %s', cmd) |
37 | ||
224ce89b WB |
38 | proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) |
39 | ||
40 | for key, value in data.items(): | |
eafe8130 | 41 | proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value).encode('utf-8')) |
224ce89b WB |
42 | |
43 | stdout, stderr = proc.communicate() | |
44 | if proc.returncode != 0: | |
45 | raise RuntimeError('%s exited non-zero: %s' % (self.sender, | |
46 | stderr)) | |
47 | ||
48 | self.log.debug('Zabbix Sender: %s', stdout.rstrip()) | |
49 | ||
50 | ||
51 | class Module(MgrModule): | |
52 | run = False | |
53 | config = dict() | |
54 | ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} | |
92f5a8d4 | 55 | _zabbix_hosts = list() |
224ce89b | 56 | |
11fdf7f2 TL |
57 | @property |
58 | def config_keys(self): | |
59 | return dict((o['name'], o.get('default', None)) | |
60 | for o in self.MODULE_OPTIONS) | |
61 | ||
62 | MODULE_OPTIONS = [ | |
63 | { | |
64 | 'name': 'zabbix_sender', | |
65 | 'default': '/usr/bin/zabbix_sender' | |
66 | }, | |
67 | { | |
68 | 'name': 'zabbix_host', | |
69 | 'default': None | |
70 | }, | |
71 | { | |
72 | 'name': 'zabbix_port', | |
73 | 'type': 'int', | |
74 | 'default': 10051 | |
75 | }, | |
76 | { | |
77 | 'name': 'identifier', | |
78 | 'default': "" | |
79 | }, | |
80 | { | |
81 | 'name': 'interval', | |
82 | 'type': 'secs', | |
83 | 'default': 60 | |
9f95a23c TL |
84 | }, |
85 | { | |
86 | 'name': 'discovery_interval', | |
87 | 'type': 'count', | |
88 | 'default': 100 | |
11fdf7f2 TL |
89 | } |
90 | ] | |
224ce89b WB |
91 | |
92 | COMMANDS = [ | |
93 | { | |
94 | "cmd": "zabbix config-set name=key,type=CephString " | |
95 | "name=value,type=CephString", | |
96 | "desc": "Set a configuration value", | |
97 | "perm": "rw" | |
98 | }, | |
99 | { | |
100 | "cmd": "zabbix config-show", | |
101 | "desc": "Show current configuration", | |
102 | "perm": "r" | |
103 | }, | |
104 | { | |
105 | "cmd": "zabbix send", | |
94b18763 | 106 | "desc": "Force sending data to Zabbix", |
224ce89b WB |
107 | "perm": "rw" |
108 | }, | |
9f95a23c TL |
109 | { |
110 | "cmd": "zabbix discovery", | |
111 | "desc": "Discovering Zabbix data", | |
112 | "perm": "r" | |
113 | }, | |
224ce89b WB |
114 | ] |
115 | ||
116 | def __init__(self, *args, **kwargs): | |
117 | super(Module, self).__init__(*args, **kwargs) | |
118 | self.event = Event() | |
119 | ||
120 | def init_module_config(self): | |
94b18763 FG |
121 | self.fsid = self.get('mon_map')['fsid'] |
122 | self.log.debug('Found Ceph fsid %s', self.fsid) | |
224ce89b | 123 | |
94b18763 | 124 | for key, default in self.config_keys.items(): |
11fdf7f2 | 125 | self.set_config_option(key, self.get_module_option(key, default)) |
224ce89b | 126 | |
92f5a8d4 TL |
127 | if self.config['zabbix_host']: |
128 | self._parse_zabbix_hosts() | |
129 | ||
224ce89b WB |
130 | def set_config_option(self, option, value): |
131 | if option not in self.config_keys.keys(): | |
132 | raise RuntimeError('{0} is a unknown configuration ' | |
133 | 'option'.format(option)) | |
134 | ||
9f95a23c | 135 | if option in ['zabbix_port', 'interval', 'discovery_interval']: |
224ce89b WB |
136 | try: |
137 | value = int(value) | |
138 | except (ValueError, TypeError): | |
139 | raise RuntimeError('invalid {0} configured. Please specify ' | |
140 | 'a valid integer'.format(option)) | |
141 | ||
142 | if option == 'interval' and value < 10: | |
143 | raise RuntimeError('interval should be set to at least 10 seconds') | |
144 | ||
9f95a23c TL |
145 | if option == 'discovery_interval' and value < 10: |
146 | raise RuntimeError( | |
147 | "discovery_interval should not be more frequent " | |
148 | "than once in 10 regular data collection" | |
149 | ) | |
150 | ||
94b18763 FG |
151 | self.log.debug('Setting in-memory config option %s to: %s', option, |
152 | value) | |
224ce89b | 153 | self.config[option] = value |
94b18763 | 154 | return True |
224ce89b | 155 | |
92f5a8d4 TL |
156 | def _parse_zabbix_hosts(self): |
157 | self._zabbix_hosts = list() | |
158 | servers = self.config['zabbix_host'].split(",") | |
159 | for server in servers: | |
160 | uri = re.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server) | |
161 | if uri: | |
162 | zabbix_host, sep, zabbix_port = uri.groups() | |
163 | zabbix_port = zabbix_port if sep == ':' else self.config['zabbix_port'] | |
164 | self._zabbix_hosts.append({'zabbix_host': zabbix_host, 'zabbix_port': zabbix_port}) | |
165 | else: | |
166 | self.log.error('Zabbix host "%s" is not valid', server) | |
167 | ||
168 | self.log.error('Parsed Zabbix hosts: %s', self._zabbix_hosts) | |
169 | ||
11fdf7f2 TL |
170 | def get_pg_stats(self): |
171 | stats = dict() | |
172 | ||
173 | pg_states = ['active', 'peering', 'clean', 'scrubbing', 'undersized', | |
174 | 'backfilling', 'recovering', 'degraded', 'inconsistent', | |
eafe8130 | 175 | 'remapped', 'backfill_toofull', 'backfill_wait', |
11fdf7f2 TL |
176 | 'recovery_wait'] |
177 | ||
178 | for state in pg_states: | |
179 | stats['num_pg_{0}'.format(state)] = 0 | |
180 | ||
181 | pg_status = self.get('pg_status') | |
182 | ||
183 | stats['num_pg'] = pg_status['num_pgs'] | |
184 | ||
185 | for state in pg_status['pgs_by_state']: | |
186 | states = state['state_name'].split('+') | |
187 | for s in pg_states: | |
188 | key = 'num_pg_{0}'.format(s) | |
189 | if s in states: | |
190 | stats[key] += state['count'] | |
191 | ||
192 | return stats | |
193 | ||
224ce89b WB |
194 | def get_data(self): |
195 | data = dict() | |
196 | ||
197 | health = json.loads(self.get('health')['json']) | |
c07f9fc5 FG |
198 | # 'status' is luminous+, 'overall_status' is legacy mode. |
199 | data['overall_status'] = health.get('status', | |
200 | health.get('overall_status')) | |
224ce89b WB |
201 | data['overall_status_int'] = \ |
202 | self.ceph_health_mapping.get(data['overall_status']) | |
203 | ||
204 | mon_status = json.loads(self.get('mon_status')['json']) | |
205 | data['num_mon'] = len(mon_status['monmap']['mons']) | |
206 | ||
207 | df = self.get('df') | |
208 | data['num_pools'] = len(df['pools']) | |
224ce89b WB |
209 | data['total_used_bytes'] = df['stats']['total_used_bytes'] |
210 | data['total_bytes'] = df['stats']['total_bytes'] | |
211 | data['total_avail_bytes'] = df['stats']['total_avail_bytes'] | |
212 | ||
213 | wr_ops = 0 | |
214 | rd_ops = 0 | |
215 | wr_bytes = 0 | |
216 | rd_bytes = 0 | |
217 | ||
218 | for pool in df['pools']: | |
219 | wr_ops += pool['stats']['wr'] | |
220 | rd_ops += pool['stats']['rd'] | |
221 | wr_bytes += pool['stats']['wr_bytes'] | |
222 | rd_bytes += pool['stats']['rd_bytes'] | |
9f95a23c TL |
223 | data['[{0},rd_bytes]'.format(pool['name'])] = pool['stats']['rd_bytes'] |
224 | data['[{0},wr_bytes]'.format(pool['name'])] = pool['stats']['wr_bytes'] | |
225 | data['[{0},rd_ops]'.format(pool['name'])] = pool['stats']['rd'] | |
226 | data['[{0},wr_ops]'.format(pool['name'])] = pool['stats']['wr'] | |
227 | data['[{0},bytes_used]'.format(pool['name'])] = pool['stats']['bytes_used'] | |
228 | data['[{0},stored_raw]'.format(pool['name'])] = pool['stats']['stored_raw'] | |
229 | data['[{0},percent_used]'.format(pool['name'])] = pool['stats']['percent_used'] | |
224ce89b WB |
230 | |
231 | data['wr_ops'] = wr_ops | |
232 | data['rd_ops'] = rd_ops | |
233 | data['wr_bytes'] = wr_bytes | |
234 | data['rd_bytes'] = rd_bytes | |
235 | ||
236 | osd_map = self.get('osd_map') | |
237 | data['num_osd'] = len(osd_map['osds']) | |
238 | data['osd_nearfull_ratio'] = osd_map['nearfull_ratio'] | |
239 | data['osd_full_ratio'] = osd_map['full_ratio'] | |
240 | data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio'] | |
241 | ||
242 | data['num_pg_temp'] = len(osd_map['pg_temp']) | |
243 | ||
244 | num_up = 0 | |
245 | num_in = 0 | |
246 | for osd in osd_map['osds']: | |
9f95a23c | 247 | data['[osd.{0},up]'.format(int(osd['osd']))] = osd['up'] |
224ce89b WB |
248 | if osd['up'] == 1: |
249 | num_up += 1 | |
250 | ||
9f95a23c | 251 | data['[osd.{0},in]'.format(int(osd['osd']))] = osd['in'] |
224ce89b WB |
252 | if osd['in'] == 1: |
253 | num_in += 1 | |
254 | ||
255 | data['num_osd_up'] = num_up | |
256 | data['num_osd_in'] = num_in | |
257 | ||
258 | osd_fill = list() | |
11fdf7f2 TL |
259 | osd_pgs = list() |
260 | osd_apply_latency_ns = list() | |
261 | osd_commit_latency_ns = list() | |
224ce89b WB |
262 | |
263 | osd_stats = self.get('osd_stats') | |
264 | for osd in osd_stats['osd_stats']: | |
9f95a23c TL |
265 | try: |
266 | osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100) | |
267 | data['[osd.{0},osd_fill]'.format(osd['osd'])] = ( | |
268 | float(osd['kb_used']) / float(osd['kb'])) * 100 | |
269 | except ZeroDivisionError: | |
b32b8144 | 270 | continue |
11fdf7f2 TL |
271 | osd_pgs.append(osd['num_pgs']) |
272 | osd_apply_latency_ns.append(osd['perf_stat']['apply_latency_ns']) | |
273 | osd_commit_latency_ns.append(osd['perf_stat']['commit_latency_ns']) | |
9f95a23c TL |
274 | data['[osd.{0},num_pgs]'.format(osd['osd'])] = osd['num_pgs'] |
275 | data[ | |
276 | '[osd.{0},osd_latency_apply]'.format(osd['osd']) | |
277 | ] = osd['perf_stat']['apply_latency_ns'] / 1000000.0 # ns -> ms | |
278 | data[ | |
279 | '[osd.{0},osd_latency_commit]'.format(osd['osd']) | |
280 | ] = osd['perf_stat']['commit_latency_ns'] / 1000000.0 # ns -> ms | |
224ce89b WB |
281 | |
282 | try: | |
283 | data['osd_max_fill'] = max(osd_fill) | |
284 | data['osd_min_fill'] = min(osd_fill) | |
285 | data['osd_avg_fill'] = avg(osd_fill) | |
11fdf7f2 TL |
286 | data['osd_max_pgs'] = max(osd_pgs) |
287 | data['osd_min_pgs'] = min(osd_pgs) | |
288 | data['osd_avg_pgs'] = avg(osd_pgs) | |
224ce89b WB |
289 | except ValueError: |
290 | pass | |
291 | ||
292 | try: | |
11fdf7f2 TL |
293 | data['osd_latency_apply_max'] = max(osd_apply_latency_ns) / 1000000.0 # ns -> ms |
294 | data['osd_latency_apply_min'] = min(osd_apply_latency_ns) / 1000000.0 # ns -> ms | |
295 | data['osd_latency_apply_avg'] = avg(osd_apply_latency_ns) / 1000000.0 # ns -> ms | |
224ce89b | 296 | |
11fdf7f2 TL |
297 | data['osd_latency_commit_max'] = max(osd_commit_latency_ns) / 1000000.0 # ns -> ms |
298 | data['osd_latency_commit_min'] = min(osd_commit_latency_ns) / 1000000.0 # ns -> ms | |
299 | data['osd_latency_commit_avg'] = avg(osd_commit_latency_ns) / 1000000.0 # ns -> ms | |
224ce89b WB |
300 | except ValueError: |
301 | pass | |
302 | ||
11fdf7f2 | 303 | data.update(self.get_pg_stats()) |
224ce89b WB |
304 | |
305 | return data | |
306 | ||
9f95a23c | 307 | def send(self, data): |
94b18763 FG |
308 | identifier = self.config['identifier'] |
309 | if identifier is None or len(identifier) == 0: | |
310 | identifier = 'ceph-{0}'.format(self.fsid) | |
311 | ||
92f5a8d4 | 312 | if not self.config['zabbix_host'] or not self._zabbix_hosts: |
94b18763 FG |
313 | self.log.error('Zabbix server not set, please configure using: ' |
314 | 'ceph zabbix config-set zabbix_host <zabbix_host>') | |
11fdf7f2 TL |
315 | self.set_health_checks({ |
316 | 'MGR_ZABBIX_NO_SERVER': { | |
317 | 'severity': 'warning', | |
318 | 'summary': 'No Zabbix server configured', | |
319 | 'detail': ['Configuration value zabbix_host not configured'] | |
320 | } | |
321 | }) | |
94b18763 | 322 | return |
224ce89b | 323 | |
92f5a8d4 TL |
324 | result = True |
325 | ||
326 | for server in self._zabbix_hosts: | |
94b18763 | 327 | self.log.info( |
92f5a8d4 TL |
328 | 'Sending data to Zabbix server %s, port %s as host/identifier %s', |
329 | server['zabbix_host'], server['zabbix_port'], identifier) | |
94b18763 FG |
330 | self.log.debug(data) |
331 | ||
92f5a8d4 TL |
332 | try: |
333 | zabbix = ZabbixSender(self.config['zabbix_sender'], | |
334 | server['zabbix_host'], | |
335 | server['zabbix_port'], self.log) | |
336 | zabbix.send(identifier, data) | |
337 | except Exception as exc: | |
338 | self.log.exception('Failed to send.') | |
339 | self.set_health_checks({ | |
340 | 'MGR_ZABBIX_SEND_FAILED': { | |
341 | 'severity': 'warning', | |
342 | 'summary': 'Failed to send data to Zabbix', | |
343 | 'detail': [str(exc)] | |
344 | } | |
345 | }) | |
346 | result = False | |
347 | ||
348 | self.set_health_checks(dict()) | |
349 | return result | |
94b18763 | 350 | |
9f95a23c TL |
351 | def discovery(self): |
352 | osd_map = self.get('osd_map') | |
353 | osd_map_crush = self.get('osd_map_crush') | |
354 | ||
355 | # Discovering ceph pools | |
356 | pool_discovery = { | |
357 | pool['pool_name']: step['item_name'] | |
358 | for pool in osd_map['pools'] | |
359 | for rule in osd_map_crush['rules'] if rule['rule_id'] == pool['crush_rule'] | |
360 | for step in rule['steps'] if step['op'] == "take" | |
361 | } | |
362 | pools_discovery_data = {"data": [ | |
363 | { | |
364 | "{#POOL}": pool, | |
365 | "{#CRUSH_RULE}": rule | |
366 | } | |
367 | for pool, rule in pool_discovery.items() | |
368 | ]} | |
369 | ||
370 | # Discovering OSDs | |
371 | # Getting hosts for found crush rules | |
372 | osd_roots = { | |
373 | step['item_name']: [ | |
374 | item['id'] | |
375 | for item in root_bucket['items'] | |
376 | ] | |
377 | for rule in osd_map_crush['rules'] | |
378 | for step in rule['steps'] if step['op'] == "take" | |
379 | for root_bucket in osd_map_crush['buckets'] | |
380 | if root_bucket['id'] == step['item'] | |
381 | } | |
382 | # Getting osds for hosts with map to crush_rule | |
383 | osd_discovery = { | |
384 | item['id']: crush_rule | |
385 | for crush_rule, roots in osd_roots.items() | |
386 | for root in roots | |
387 | for bucket in osd_map_crush['buckets'] | |
388 | if bucket['id'] == root | |
389 | for item in bucket['items'] | |
390 | } | |
391 | osd_discovery_data = {"data": [ | |
392 | { | |
393 | "{#OSD}": osd, | |
394 | "{#CRUSH_RULE}": rule | |
395 | } | |
396 | for osd, rule in osd_discovery.items() | |
397 | ]} | |
398 | # Preparing recieved data for sending | |
399 | data = { | |
400 | "zabbix.pool.discovery": json.dumps(pools_discovery_data), | |
401 | "zabbix.osd.discovery": json.dumps(osd_discovery_data) | |
402 | } | |
403 | return bool(self.send(data)) | |
404 | ||
11fdf7f2 | 405 | def handle_command(self, inbuf, command): |
224ce89b | 406 | if command['prefix'] == 'zabbix config-show': |
9f95a23c | 407 | return 0, json.dumps(self.config, index=4, sort_keys=True), '' |
224ce89b WB |
408 | elif command['prefix'] == 'zabbix config-set': |
409 | key = command['key'] | |
410 | value = command['value'] | |
411 | if not value: | |
412 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
413 | ||
414 | self.log.debug('Setting configuration option %s to %s', key, value) | |
94b18763 | 415 | if self.set_config_option(key, value): |
11fdf7f2 | 416 | self.set_module_option(key, value) |
92f5a8d4 TL |
417 | if key == 'zabbix_host' or key == 'zabbix_port': |
418 | self._parse_zabbix_hosts() | |
94b18763 FG |
419 | return 0, 'Configuration option {0} updated'.format(key), '' |
420 | ||
421 | return 1,\ | |
422 | 'Failed to update configuration option {0}'.format(key), '' | |
423 | ||
224ce89b | 424 | elif command['prefix'] == 'zabbix send': |
9f95a23c TL |
425 | data = self.get_data() |
426 | if self.send(data): | |
94b18763 FG |
427 | return 0, 'Sending data to Zabbix', '' |
428 | ||
429 | return 1, 'Failed to send data to Zabbix', '' | |
9f95a23c TL |
430 | |
431 | elif command['prefix'] == 'zabbix discovery': | |
432 | if self.discovery(): | |
433 | return 0, 'Sending discovery data to Zabbix', '' | |
434 | ||
435 | return 1, 'Failed to send discovery data to Zabbix', '' | |
436 | ||
224ce89b WB |
437 | else: |
438 | return (-errno.EINVAL, '', | |
439 | "Command not found '{0}'".format(command['prefix'])) | |
440 | ||
441 | def shutdown(self): | |
442 | self.log.info('Stopping zabbix') | |
443 | self.run = False | |
444 | self.event.set() | |
445 | ||
446 | def serve(self): | |
94b18763 | 447 | self.log.info('Zabbix module starting up') |
224ce89b WB |
448 | self.run = True |
449 | ||
450 | self.init_module_config() | |
451 | ||
9f95a23c TL |
452 | discovery_interval = self.config['discovery_interval'] |
453 | # We are sending discovery once plugin is loaded | |
454 | discovery_counter = discovery_interval | |
224ce89b WB |
455 | while self.run: |
456 | self.log.debug('Waking up for new iteration') | |
457 | ||
9f95a23c TL |
458 | if discovery_counter == discovery_interval: |
459 | try: | |
460 | self.discovery() | |
461 | except Exception as exc: | |
462 | # Shouldn't happen, but let's log it and retry next interval, | |
463 | # rather than dying completely. | |
464 | self.log.exception("Unexpected error during discovery():") | |
465 | finally: | |
466 | discovery_counter = 0 | |
467 | ||
224ce89b | 468 | try: |
9f95a23c TL |
469 | data = self.get_data() |
470 | self.send(data) | |
224ce89b | 471 | except Exception as exc: |
3efd9988 FG |
472 | # Shouldn't happen, but let's log it and retry next interval, |
473 | # rather than dying completely. | |
474 | self.log.exception("Unexpected error during send():") | |
224ce89b WB |
475 | |
476 | interval = self.config['interval'] | |
477 | self.log.debug('Sleeping for %d seconds', interval) | |
9f95a23c | 478 | discovery_counter += 1 |
224ce89b WB |
479 | self.event.wait(interval) |
480 | ||
481 | def self_test(self): | |
482 | data = self.get_data() | |
483 | ||
484 | if data['overall_status'] not in self.ceph_health_mapping: | |
485 | raise RuntimeError('No valid overall_status found in data') | |
486 | ||
487 | int(data['overall_status_int']) | |
488 | ||
489 | if data['num_mon'] < 1: | |
490 | raise RuntimeError('num_mon is smaller than 1') |