]>
Commit | Line | Data |
---|---|---|
224ce89b WB |
1 | """ |
2 | Zabbix module for ceph-mgr | |
3 | ||
4 | Collect statistics from Ceph cluster and every X seconds send data to a Zabbix | |
5 | server using the zabbix_sender executable. | |
6 | """ | |
20effc67 | 7 | import logging |
224ce89b WB |
8 | import json |
9 | import errno | |
92f5a8d4 | 10 | import re |
224ce89b WB |
11 | from subprocess import Popen, PIPE |
12 | from threading import Event | |
20effc67 TL |
13 | from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue |
14 | from typing import cast, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union | |
224ce89b WB |
15 | |
16 | ||
20effc67 | 17 | def avg(data: Sequence[Union[int, float]]) -> float: |
3efd9988 FG |
18 | if len(data): |
19 | return sum(data) / float(len(data)) | |
20 | else: | |
21 | return 0 | |
224ce89b WB |
22 | |
23 | ||
24 | class ZabbixSender(object): | |
20effc67 | 25 | def __init__(self, sender: str, host: str, port: int, log: logging.Logger) -> None: |
224ce89b WB |
26 | self.sender = sender |
27 | self.host = host | |
28 | self.port = port | |
29 | self.log = log | |
30 | ||
20effc67 | 31 | def send(self, hostname: str, data: Mapping[str, Union[int, float, str]]) -> None: |
224ce89b WB |
32 | if len(data) == 0: |
33 | return | |
34 | ||
35 | cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s', | |
36 | hostname, '-vv', '-i', '-'] | |
37 | ||
11fdf7f2 TL |
38 | self.log.debug('Executing: %s', cmd) |
39 | ||
20effc67 | 40 | proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, encoding='utf-8') |
224ce89b WB |
41 | |
42 | for key, value in data.items(): | |
20effc67 TL |
43 | assert proc.stdin |
44 | proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value)) | |
224ce89b WB |
45 | |
46 | stdout, stderr = proc.communicate() | |
47 | if proc.returncode != 0: | |
48 | raise RuntimeError('%s exited non-zero: %s' % (self.sender, | |
49 | stderr)) | |
50 | ||
51 | self.log.debug('Zabbix Sender: %s', stdout.rstrip()) | |
52 | ||
53 | ||
54 | class Module(MgrModule): | |
55 | run = False | |
20effc67 | 56 | config: Dict[str, OptionValue] = {} |
224ce89b | 57 | ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} |
20effc67 | 58 | _zabbix_hosts: List[Dict[str, Union[str, int]]] = list() |
224ce89b | 59 | |
11fdf7f2 | 60 | @property |
20effc67 | 61 | def config_keys(self) -> Dict[str, OptionValue]: |
11fdf7f2 TL |
62 | return dict((o['name'], o.get('default', None)) |
63 | for o in self.MODULE_OPTIONS) | |
64 | ||
65 | MODULE_OPTIONS = [ | |
20effc67 TL |
66 | Option( |
67 | name='zabbix_sender', | |
68 | default='/usr/bin/zabbix_sender'), | |
69 | Option( | |
70 | name='zabbix_host', | |
71 | type='str', | |
72 | default=None), | |
73 | Option( | |
74 | name='zabbix_port', | |
75 | type='int', | |
76 | default=10051), | |
77 | Option( | |
78 | name='identifier', | |
79 | default=""), | |
80 | Option( | |
81 | name='interval', | |
82 | type='secs', | |
83 | default=60), | |
84 | Option( | |
85 | name='discovery_interval', | |
86 | type='uint', | |
87 | default=100) | |
11fdf7f2 | 88 | ] |
224ce89b | 89 | |
20effc67 | 90 | def __init__(self, *args: Any, **kwargs: Any) -> None: |
224ce89b WB |
91 | super(Module, self).__init__(*args, **kwargs) |
92 | self.event = Event() | |
93 | ||
20effc67 | 94 | def init_module_config(self) -> None: |
94b18763 FG |
95 | self.fsid = self.get('mon_map')['fsid'] |
96 | self.log.debug('Found Ceph fsid %s', self.fsid) | |
224ce89b | 97 | |
94b18763 | 98 | for key, default in self.config_keys.items(): |
11fdf7f2 | 99 | self.set_config_option(key, self.get_module_option(key, default)) |
224ce89b | 100 | |
92f5a8d4 TL |
101 | if self.config['zabbix_host']: |
102 | self._parse_zabbix_hosts() | |
103 | ||
20effc67 | 104 | def set_config_option(self, option: str, value: OptionValue) -> bool: |
224ce89b WB |
105 | if option not in self.config_keys.keys(): |
106 | raise RuntimeError('{0} is a unknown configuration ' | |
107 | 'option'.format(option)) | |
108 | ||
9f95a23c | 109 | if option in ['zabbix_port', 'interval', 'discovery_interval']: |
224ce89b | 110 | try: |
20effc67 | 111 | int_value = int(value) # type: ignore |
224ce89b WB |
112 | except (ValueError, TypeError): |
113 | raise RuntimeError('invalid {0} configured. Please specify ' | |
114 | 'a valid integer'.format(option)) | |
115 | ||
20effc67 | 116 | if option == 'interval' and int_value < 10: |
224ce89b WB |
117 | raise RuntimeError('interval should be set to at least 10 seconds') |
118 | ||
20effc67 | 119 | if option == 'discovery_interval' and int_value < 10: |
9f95a23c TL |
120 | raise RuntimeError( |
121 | "discovery_interval should not be more frequent " | |
122 | "than once in 10 regular data collection" | |
123 | ) | |
124 | ||
94b18763 FG |
125 | self.log.debug('Setting in-memory config option %s to: %s', option, |
126 | value) | |
224ce89b | 127 | self.config[option] = value |
94b18763 | 128 | return True |
224ce89b | 129 | |
20effc67 | 130 | def _parse_zabbix_hosts(self) -> None: |
92f5a8d4 | 131 | self._zabbix_hosts = list() |
20effc67 | 132 | servers = cast(str, self.config['zabbix_host']).split(",") |
92f5a8d4 TL |
133 | for server in servers: |
134 | uri = re.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server) | |
135 | if uri: | |
20effc67 TL |
136 | zabbix_host, sep, opt_zabbix_port = uri.groups() |
137 | if sep == ':': | |
138 | zabbix_port = int(opt_zabbix_port) | |
139 | else: | |
140 | zabbix_port = cast(int, self.config['zabbix_port']) | |
92f5a8d4 TL |
141 | self._zabbix_hosts.append({'zabbix_host': zabbix_host, 'zabbix_port': zabbix_port}) |
142 | else: | |
143 | self.log.error('Zabbix host "%s" is not valid', server) | |
144 | ||
145 | self.log.error('Parsed Zabbix hosts: %s', self._zabbix_hosts) | |
146 | ||
20effc67 | 147 | def get_pg_stats(self) -> Dict[str, int]: |
11fdf7f2 TL |
148 | stats = dict() |
149 | ||
150 | pg_states = ['active', 'peering', 'clean', 'scrubbing', 'undersized', | |
151 | 'backfilling', 'recovering', 'degraded', 'inconsistent', | |
eafe8130 | 152 | 'remapped', 'backfill_toofull', 'backfill_wait', |
11fdf7f2 TL |
153 | 'recovery_wait'] |
154 | ||
155 | for state in pg_states: | |
156 | stats['num_pg_{0}'.format(state)] = 0 | |
157 | ||
158 | pg_status = self.get('pg_status') | |
159 | ||
160 | stats['num_pg'] = pg_status['num_pgs'] | |
161 | ||
162 | for state in pg_status['pgs_by_state']: | |
163 | states = state['state_name'].split('+') | |
164 | for s in pg_states: | |
165 | key = 'num_pg_{0}'.format(s) | |
166 | if s in states: | |
167 | stats[key] += state['count'] | |
168 | ||
169 | return stats | |
170 | ||
20effc67 | 171 | def get_data(self) -> Dict[str, Union[int, float]]: |
224ce89b WB |
172 | data = dict() |
173 | ||
174 | health = json.loads(self.get('health')['json']) | |
c07f9fc5 FG |
175 | # 'status' is luminous+, 'overall_status' is legacy mode. |
176 | data['overall_status'] = health.get('status', | |
177 | health.get('overall_status')) | |
224ce89b WB |
178 | data['overall_status_int'] = \ |
179 | self.ceph_health_mapping.get(data['overall_status']) | |
180 | ||
181 | mon_status = json.loads(self.get('mon_status')['json']) | |
182 | data['num_mon'] = len(mon_status['monmap']['mons']) | |
183 | ||
184 | df = self.get('df') | |
185 | data['num_pools'] = len(df['pools']) | |
224ce89b WB |
186 | data['total_used_bytes'] = df['stats']['total_used_bytes'] |
187 | data['total_bytes'] = df['stats']['total_bytes'] | |
188 | data['total_avail_bytes'] = df['stats']['total_avail_bytes'] | |
189 | ||
190 | wr_ops = 0 | |
191 | rd_ops = 0 | |
192 | wr_bytes = 0 | |
193 | rd_bytes = 0 | |
194 | ||
195 | for pool in df['pools']: | |
196 | wr_ops += pool['stats']['wr'] | |
197 | rd_ops += pool['stats']['rd'] | |
198 | wr_bytes += pool['stats']['wr_bytes'] | |
199 | rd_bytes += pool['stats']['rd_bytes'] | |
9f95a23c TL |
200 | data['[{0},rd_bytes]'.format(pool['name'])] = pool['stats']['rd_bytes'] |
201 | data['[{0},wr_bytes]'.format(pool['name'])] = pool['stats']['wr_bytes'] | |
202 | data['[{0},rd_ops]'.format(pool['name'])] = pool['stats']['rd'] | |
203 | data['[{0},wr_ops]'.format(pool['name'])] = pool['stats']['wr'] | |
204 | data['[{0},bytes_used]'.format(pool['name'])] = pool['stats']['bytes_used'] | |
205 | data['[{0},stored_raw]'.format(pool['name'])] = pool['stats']['stored_raw'] | |
cd265ab1 | 206 | data['[{0},percent_used]'.format(pool['name'])] = pool['stats']['percent_used'] * 100 |
224ce89b WB |
207 | |
208 | data['wr_ops'] = wr_ops | |
209 | data['rd_ops'] = rd_ops | |
210 | data['wr_bytes'] = wr_bytes | |
211 | data['rd_bytes'] = rd_bytes | |
212 | ||
213 | osd_map = self.get('osd_map') | |
214 | data['num_osd'] = len(osd_map['osds']) | |
215 | data['osd_nearfull_ratio'] = osd_map['nearfull_ratio'] | |
216 | data['osd_full_ratio'] = osd_map['full_ratio'] | |
217 | data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio'] | |
218 | ||
219 | data['num_pg_temp'] = len(osd_map['pg_temp']) | |
220 | ||
221 | num_up = 0 | |
222 | num_in = 0 | |
223 | for osd in osd_map['osds']: | |
9f95a23c | 224 | data['[osd.{0},up]'.format(int(osd['osd']))] = osd['up'] |
224ce89b WB |
225 | if osd['up'] == 1: |
226 | num_up += 1 | |
227 | ||
9f95a23c | 228 | data['[osd.{0},in]'.format(int(osd['osd']))] = osd['in'] |
224ce89b WB |
229 | if osd['in'] == 1: |
230 | num_in += 1 | |
231 | ||
232 | data['num_osd_up'] = num_up | |
233 | data['num_osd_in'] = num_in | |
234 | ||
235 | osd_fill = list() | |
11fdf7f2 TL |
236 | osd_pgs = list() |
237 | osd_apply_latency_ns = list() | |
238 | osd_commit_latency_ns = list() | |
224ce89b WB |
239 | |
240 | osd_stats = self.get('osd_stats') | |
241 | for osd in osd_stats['osd_stats']: | |
9f95a23c TL |
242 | try: |
243 | osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100) | |
244 | data['[osd.{0},osd_fill]'.format(osd['osd'])] = ( | |
245 | float(osd['kb_used']) / float(osd['kb'])) * 100 | |
246 | except ZeroDivisionError: | |
b32b8144 | 247 | continue |
11fdf7f2 TL |
248 | osd_pgs.append(osd['num_pgs']) |
249 | osd_apply_latency_ns.append(osd['perf_stat']['apply_latency_ns']) | |
250 | osd_commit_latency_ns.append(osd['perf_stat']['commit_latency_ns']) | |
9f95a23c TL |
251 | data['[osd.{0},num_pgs]'.format(osd['osd'])] = osd['num_pgs'] |
252 | data[ | |
253 | '[osd.{0},osd_latency_apply]'.format(osd['osd']) | |
254 | ] = osd['perf_stat']['apply_latency_ns'] / 1000000.0 # ns -> ms | |
255 | data[ | |
256 | '[osd.{0},osd_latency_commit]'.format(osd['osd']) | |
257 | ] = osd['perf_stat']['commit_latency_ns'] / 1000000.0 # ns -> ms | |
224ce89b WB |
258 | |
259 | try: | |
260 | data['osd_max_fill'] = max(osd_fill) | |
261 | data['osd_min_fill'] = min(osd_fill) | |
262 | data['osd_avg_fill'] = avg(osd_fill) | |
11fdf7f2 TL |
263 | data['osd_max_pgs'] = max(osd_pgs) |
264 | data['osd_min_pgs'] = min(osd_pgs) | |
265 | data['osd_avg_pgs'] = avg(osd_pgs) | |
224ce89b WB |
266 | except ValueError: |
267 | pass | |
268 | ||
269 | try: | |
11fdf7f2 TL |
270 | data['osd_latency_apply_max'] = max(osd_apply_latency_ns) / 1000000.0 # ns -> ms |
271 | data['osd_latency_apply_min'] = min(osd_apply_latency_ns) / 1000000.0 # ns -> ms | |
272 | data['osd_latency_apply_avg'] = avg(osd_apply_latency_ns) / 1000000.0 # ns -> ms | |
224ce89b | 273 | |
11fdf7f2 TL |
274 | data['osd_latency_commit_max'] = max(osd_commit_latency_ns) / 1000000.0 # ns -> ms |
275 | data['osd_latency_commit_min'] = min(osd_commit_latency_ns) / 1000000.0 # ns -> ms | |
276 | data['osd_latency_commit_avg'] = avg(osd_commit_latency_ns) / 1000000.0 # ns -> ms | |
224ce89b WB |
277 | except ValueError: |
278 | pass | |
279 | ||
11fdf7f2 | 280 | data.update(self.get_pg_stats()) |
224ce89b WB |
281 | |
282 | return data | |
283 | ||
20effc67 TL |
284 | def send(self, data: Mapping[str, Union[int, float, str]]) -> bool: |
285 | identifier = cast(Optional[str], self.config['identifier']) | |
94b18763 FG |
286 | if identifier is None or len(identifier) == 0: |
287 | identifier = 'ceph-{0}'.format(self.fsid) | |
288 | ||
92f5a8d4 | 289 | if not self.config['zabbix_host'] or not self._zabbix_hosts: |
94b18763 FG |
290 | self.log.error('Zabbix server not set, please configure using: ' |
291 | 'ceph zabbix config-set zabbix_host <zabbix_host>') | |
11fdf7f2 TL |
292 | self.set_health_checks({ |
293 | 'MGR_ZABBIX_NO_SERVER': { | |
294 | 'severity': 'warning', | |
295 | 'summary': 'No Zabbix server configured', | |
296 | 'detail': ['Configuration value zabbix_host not configured'] | |
297 | } | |
298 | }) | |
20effc67 | 299 | return False |
224ce89b | 300 | |
92f5a8d4 TL |
301 | result = True |
302 | ||
303 | for server in self._zabbix_hosts: | |
94b18763 | 304 | self.log.info( |
92f5a8d4 TL |
305 | 'Sending data to Zabbix server %s, port %s as host/identifier %s', |
306 | server['zabbix_host'], server['zabbix_port'], identifier) | |
94b18763 FG |
307 | self.log.debug(data) |
308 | ||
92f5a8d4 | 309 | try: |
20effc67 TL |
310 | zabbix = ZabbixSender(cast(str, self.config['zabbix_sender']), |
311 | cast(str, server['zabbix_host']), | |
312 | cast(int, server['zabbix_port']), self.log) | |
92f5a8d4 TL |
313 | zabbix.send(identifier, data) |
314 | except Exception as exc: | |
315 | self.log.exception('Failed to send.') | |
316 | self.set_health_checks({ | |
317 | 'MGR_ZABBIX_SEND_FAILED': { | |
318 | 'severity': 'warning', | |
319 | 'summary': 'Failed to send data to Zabbix', | |
320 | 'detail': [str(exc)] | |
321 | } | |
322 | }) | |
323 | result = False | |
324 | ||
325 | self.set_health_checks(dict()) | |
326 | return result | |
94b18763 | 327 | |
20effc67 | 328 | def discovery(self) -> bool: |
9f95a23c TL |
329 | osd_map = self.get('osd_map') |
330 | osd_map_crush = self.get('osd_map_crush') | |
331 | ||
332 | # Discovering ceph pools | |
333 | pool_discovery = { | |
334 | pool['pool_name']: step['item_name'] | |
335 | for pool in osd_map['pools'] | |
336 | for rule in osd_map_crush['rules'] if rule['rule_id'] == pool['crush_rule'] | |
337 | for step in rule['steps'] if step['op'] == "take" | |
338 | } | |
339 | pools_discovery_data = {"data": [ | |
340 | { | |
341 | "{#POOL}": pool, | |
342 | "{#CRUSH_RULE}": rule | |
343 | } | |
344 | for pool, rule in pool_discovery.items() | |
345 | ]} | |
346 | ||
347 | # Discovering OSDs | |
348 | # Getting hosts for found crush rules | |
349 | osd_roots = { | |
350 | step['item_name']: [ | |
351 | item['id'] | |
352 | for item in root_bucket['items'] | |
353 | ] | |
354 | for rule in osd_map_crush['rules'] | |
355 | for step in rule['steps'] if step['op'] == "take" | |
356 | for root_bucket in osd_map_crush['buckets'] | |
357 | if root_bucket['id'] == step['item'] | |
358 | } | |
359 | # Getting osds for hosts with map to crush_rule | |
360 | osd_discovery = { | |
361 | item['id']: crush_rule | |
362 | for crush_rule, roots in osd_roots.items() | |
363 | for root in roots | |
364 | for bucket in osd_map_crush['buckets'] | |
365 | if bucket['id'] == root | |
366 | for item in bucket['items'] | |
367 | } | |
368 | osd_discovery_data = {"data": [ | |
369 | { | |
370 | "{#OSD}": osd, | |
371 | "{#CRUSH_RULE}": rule | |
372 | } | |
373 | for osd, rule in osd_discovery.items() | |
374 | ]} | |
375 | # Preparing recieved data for sending | |
376 | data = { | |
377 | "zabbix.pool.discovery": json.dumps(pools_discovery_data), | |
378 | "zabbix.osd.discovery": json.dumps(osd_discovery_data) | |
379 | } | |
380 | return bool(self.send(data)) | |
381 | ||
20effc67 TL |
382 | @CLIReadCommand('zabbix config-show') |
383 | def config_show(self) -> Tuple[int, str, str]: | |
384 | """ | |
385 | Show current configuration | |
386 | """ | |
387 | return 0, json.dumps(self.config, indent=4, sort_keys=True), '' | |
388 | ||
389 | @CLIWriteCommand('zabbix config-set') | |
390 | def config_set(self, key: str, value: str) -> Tuple[int, str, str]: | |
391 | """ | |
392 | Set a configuration value | |
393 | """ | |
394 | if not value: | |
395 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
396 | ||
397 | self.log.debug('Setting configuration option %s to %s', key, value) | |
398 | if self.set_config_option(key, value): | |
399 | self.set_module_option(key, value) | |
400 | if key == 'zabbix_host' or key == 'zabbix_port': | |
401 | self._parse_zabbix_hosts() | |
94b18763 | 402 | return 0, 'Configuration option {0} updated'.format(key), '' |
20effc67 TL |
403 | return 1,\ |
404 | 'Failed to update configuration option {0}'.format(key), '' | |
405 | ||
406 | @CLIReadCommand('zabbix send') | |
407 | def do_send(self) -> Tuple[int, str, str]: | |
408 | """ | |
409 | Force sending data to Zabbix | |
410 | """ | |
411 | data = self.get_data() | |
412 | if self.send(data): | |
413 | return 0, 'Sending data to Zabbix', '' | |
94b18763 | 414 | |
20effc67 | 415 | return 1, 'Failed to send data to Zabbix', '' |
9f95a23c | 416 | |
20effc67 TL |
417 | @CLIReadCommand('zabbix discovery') |
418 | def do_discovery(self) -> Tuple[int, str, str]: | |
419 | """ | |
420 | Discovering Zabbix data | |
421 | """ | |
422 | if self.discovery(): | |
423 | return 0, 'Sending discovery data to Zabbix', '' | |
9f95a23c | 424 | |
20effc67 | 425 | return 1, 'Failed to send discovery data to Zabbix', '' |
224ce89b | 426 | |
20effc67 | 427 | def shutdown(self) -> None: |
224ce89b WB |
428 | self.log.info('Stopping zabbix') |
429 | self.run = False | |
430 | self.event.set() | |
431 | ||
20effc67 | 432 | def serve(self) -> None: |
94b18763 | 433 | self.log.info('Zabbix module starting up') |
224ce89b WB |
434 | self.run = True |
435 | ||
436 | self.init_module_config() | |
437 | ||
9f95a23c TL |
438 | discovery_interval = self.config['discovery_interval'] |
439 | # We are sending discovery once plugin is loaded | |
20effc67 | 440 | discovery_counter = cast(int, discovery_interval) |
224ce89b WB |
441 | while self.run: |
442 | self.log.debug('Waking up for new iteration') | |
443 | ||
9f95a23c TL |
444 | if discovery_counter == discovery_interval: |
445 | try: | |
446 | self.discovery() | |
20effc67 | 447 | except Exception: |
9f95a23c TL |
448 | # Shouldn't happen, but let's log it and retry next interval, |
449 | # rather than dying completely. | |
450 | self.log.exception("Unexpected error during discovery():") | |
451 | finally: | |
452 | discovery_counter = 0 | |
453 | ||
224ce89b | 454 | try: |
9f95a23c TL |
455 | data = self.get_data() |
456 | self.send(data) | |
20effc67 | 457 | except Exception: |
3efd9988 FG |
458 | # Shouldn't happen, but let's log it and retry next interval, |
459 | # rather than dying completely. | |
460 | self.log.exception("Unexpected error during send():") | |
224ce89b | 461 | |
20effc67 | 462 | interval = cast(float, self.config['interval']) |
224ce89b | 463 | self.log.debug('Sleeping for %d seconds', interval) |
9f95a23c | 464 | discovery_counter += 1 |
224ce89b WB |
465 | self.event.wait(interval) |
466 | ||
20effc67 | 467 | def self_test(self) -> None: |
224ce89b WB |
468 | data = self.get_data() |
469 | ||
470 | if data['overall_status'] not in self.ceph_health_mapping: | |
471 | raise RuntimeError('No valid overall_status found in data') | |
472 | ||
473 | int(data['overall_status_int']) | |
474 | ||
475 | if data['num_mon'] < 1: | |
476 | raise RuntimeError('num_mon is smaller than 1') |