]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/zabbix/module.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / zabbix / module.py
1 """
2 Zabbix module for ceph-mgr
3
4 Collect statistics from Ceph cluster and every X seconds send data to a Zabbix
5 server using the zabbix_sender executable.
6 """
7 import logging
8 import json
9 import errno
10 import re
11 from subprocess import Popen, PIPE
12 from threading import Event
13 from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue
14 from typing import cast, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union
15
16
17 def avg(data: Sequence[Union[int, float]]) -> float:
18 if len(data):
19 return sum(data) / float(len(data))
20 else:
21 return 0
22
23
24 class ZabbixSender(object):
25 def __init__(self, sender: str, host: str, port: int, log: logging.Logger) -> None:
26 self.sender = sender
27 self.host = host
28 self.port = port
29 self.log = log
30
31 def send(self, hostname: str, data: Mapping[str, Union[int, float, str]]) -> None:
32 if len(data) == 0:
33 return
34
35 cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s',
36 hostname, '-vv', '-i', '-']
37
38 self.log.debug('Executing: %s', cmd)
39
40 proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, encoding='utf-8')
41
42 for key, value in data.items():
43 assert proc.stdin
44 proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value))
45
46 stdout, stderr = proc.communicate()
47 if proc.returncode != 0:
48 raise RuntimeError('%s exited non-zero: %s' % (self.sender,
49 stderr))
50
51 self.log.debug('Zabbix Sender: %s', stdout.rstrip())
52
53
54 class Module(MgrModule):
55 run = False
56 config: Dict[str, OptionValue] = {}
57 ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
58 _zabbix_hosts: List[Dict[str, Union[str, int]]] = list()
59
60 @property
61 def config_keys(self) -> Dict[str, OptionValue]:
62 return dict((o['name'], o.get('default', None))
63 for o in self.MODULE_OPTIONS)
64
65 MODULE_OPTIONS = [
66 Option(
67 name='zabbix_sender',
68 default='/usr/bin/zabbix_sender'),
69 Option(
70 name='zabbix_host',
71 type='str',
72 default=None),
73 Option(
74 name='zabbix_port',
75 type='int',
76 default=10051),
77 Option(
78 name='identifier',
79 default=""),
80 Option(
81 name='interval',
82 type='secs',
83 default=60),
84 Option(
85 name='discovery_interval',
86 type='uint',
87 default=100)
88 ]
89
90 def __init__(self, *args: Any, **kwargs: Any) -> None:
91 super(Module, self).__init__(*args, **kwargs)
92 self.event = Event()
93
94 def init_module_config(self) -> None:
95 self.fsid = self.get('mon_map')['fsid']
96 self.log.debug('Found Ceph fsid %s', self.fsid)
97
98 for key, default in self.config_keys.items():
99 self.set_config_option(key, self.get_module_option(key, default))
100
101 if self.config['zabbix_host']:
102 self._parse_zabbix_hosts()
103
104 def set_config_option(self, option: str, value: OptionValue) -> bool:
105 if option not in self.config_keys.keys():
106 raise RuntimeError('{0} is a unknown configuration '
107 'option'.format(option))
108
109 if option in ['zabbix_port', 'interval', 'discovery_interval']:
110 try:
111 int_value = int(value) # type: ignore
112 except (ValueError, TypeError):
113 raise RuntimeError('invalid {0} configured. Please specify '
114 'a valid integer'.format(option))
115
116 if option == 'interval' and int_value < 10:
117 raise RuntimeError('interval should be set to at least 10 seconds')
118
119 if option == 'discovery_interval' and int_value < 10:
120 raise RuntimeError(
121 "discovery_interval should not be more frequent "
122 "than once in 10 regular data collection"
123 )
124
125 self.log.debug('Setting in-memory config option %s to: %s', option,
126 value)
127 self.config[option] = value
128 return True
129
130 def _parse_zabbix_hosts(self) -> None:
131 self._zabbix_hosts = list()
132 servers = cast(str, self.config['zabbix_host']).split(",")
133 for server in servers:
134 uri = re.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server)
135 if uri:
136 zabbix_host, sep, opt_zabbix_port = uri.groups()
137 if sep == ':':
138 zabbix_port = int(opt_zabbix_port)
139 else:
140 zabbix_port = cast(int, self.config['zabbix_port'])
141 self._zabbix_hosts.append({'zabbix_host': zabbix_host, 'zabbix_port': zabbix_port})
142 else:
143 self.log.error('Zabbix host "%s" is not valid', server)
144
145 self.log.error('Parsed Zabbix hosts: %s', self._zabbix_hosts)
146
147 def get_pg_stats(self) -> Dict[str, int]:
148 stats = dict()
149
150 pg_states = ['active', 'peering', 'clean', 'scrubbing', 'undersized',
151 'backfilling', 'recovering', 'degraded', 'inconsistent',
152 'remapped', 'backfill_toofull', 'backfill_wait',
153 'recovery_wait']
154
155 for state in pg_states:
156 stats['num_pg_{0}'.format(state)] = 0
157
158 pg_status = self.get('pg_status')
159
160 stats['num_pg'] = pg_status['num_pgs']
161
162 for state in pg_status['pgs_by_state']:
163 states = state['state_name'].split('+')
164 for s in pg_states:
165 key = 'num_pg_{0}'.format(s)
166 if s in states:
167 stats[key] += state['count']
168
169 return stats
170
171 def get_data(self) -> Dict[str, Union[int, float]]:
172 data = dict()
173
174 health = json.loads(self.get('health')['json'])
175 # 'status' is luminous+, 'overall_status' is legacy mode.
176 data['overall_status'] = health.get('status',
177 health.get('overall_status'))
178 data['overall_status_int'] = \
179 self.ceph_health_mapping.get(data['overall_status'])
180
181 mon_status = json.loads(self.get('mon_status')['json'])
182 data['num_mon'] = len(mon_status['monmap']['mons'])
183
184 df = self.get('df')
185 data['num_pools'] = len(df['pools'])
186 data['total_used_bytes'] = df['stats']['total_used_bytes']
187 data['total_bytes'] = df['stats']['total_bytes']
188 data['total_avail_bytes'] = df['stats']['total_avail_bytes']
189
190 wr_ops = 0
191 rd_ops = 0
192 wr_bytes = 0
193 rd_bytes = 0
194
195 for pool in df['pools']:
196 wr_ops += pool['stats']['wr']
197 rd_ops += pool['stats']['rd']
198 wr_bytes += pool['stats']['wr_bytes']
199 rd_bytes += pool['stats']['rd_bytes']
200 data['[{0},rd_bytes]'.format(pool['name'])] = pool['stats']['rd_bytes']
201 data['[{0},wr_bytes]'.format(pool['name'])] = pool['stats']['wr_bytes']
202 data['[{0},rd_ops]'.format(pool['name'])] = pool['stats']['rd']
203 data['[{0},wr_ops]'.format(pool['name'])] = pool['stats']['wr']
204 data['[{0},bytes_used]'.format(pool['name'])] = pool['stats']['bytes_used']
205 data['[{0},stored_raw]'.format(pool['name'])] = pool['stats']['stored_raw']
206 data['[{0},percent_used]'.format(pool['name'])] = pool['stats']['percent_used'] * 100
207
208 data['wr_ops'] = wr_ops
209 data['rd_ops'] = rd_ops
210 data['wr_bytes'] = wr_bytes
211 data['rd_bytes'] = rd_bytes
212
213 osd_map = self.get('osd_map')
214 data['num_osd'] = len(osd_map['osds'])
215 data['osd_nearfull_ratio'] = osd_map['nearfull_ratio']
216 data['osd_full_ratio'] = osd_map['full_ratio']
217 data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio']
218
219 data['num_pg_temp'] = len(osd_map['pg_temp'])
220
221 num_up = 0
222 num_in = 0
223 for osd in osd_map['osds']:
224 data['[osd.{0},up]'.format(int(osd['osd']))] = osd['up']
225 if osd['up'] == 1:
226 num_up += 1
227
228 data['[osd.{0},in]'.format(int(osd['osd']))] = osd['in']
229 if osd['in'] == 1:
230 num_in += 1
231
232 data['num_osd_up'] = num_up
233 data['num_osd_in'] = num_in
234
235 osd_fill = list()
236 osd_pgs = list()
237 osd_apply_latency_ns = list()
238 osd_commit_latency_ns = list()
239
240 osd_stats = self.get('osd_stats')
241 for osd in osd_stats['osd_stats']:
242 try:
243 osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100)
244 data['[osd.{0},osd_fill]'.format(osd['osd'])] = (
245 float(osd['kb_used']) / float(osd['kb'])) * 100
246 except ZeroDivisionError:
247 continue
248 osd_pgs.append(osd['num_pgs'])
249 osd_apply_latency_ns.append(osd['perf_stat']['apply_latency_ns'])
250 osd_commit_latency_ns.append(osd['perf_stat']['commit_latency_ns'])
251 data['[osd.{0},num_pgs]'.format(osd['osd'])] = osd['num_pgs']
252 data[
253 '[osd.{0},osd_latency_apply]'.format(osd['osd'])
254 ] = osd['perf_stat']['apply_latency_ns'] / 1000000.0 # ns -> ms
255 data[
256 '[osd.{0},osd_latency_commit]'.format(osd['osd'])
257 ] = osd['perf_stat']['commit_latency_ns'] / 1000000.0 # ns -> ms
258
259 try:
260 data['osd_max_fill'] = max(osd_fill)
261 data['osd_min_fill'] = min(osd_fill)
262 data['osd_avg_fill'] = avg(osd_fill)
263 data['osd_max_pgs'] = max(osd_pgs)
264 data['osd_min_pgs'] = min(osd_pgs)
265 data['osd_avg_pgs'] = avg(osd_pgs)
266 except ValueError:
267 pass
268
269 try:
270 data['osd_latency_apply_max'] = max(osd_apply_latency_ns) / 1000000.0 # ns -> ms
271 data['osd_latency_apply_min'] = min(osd_apply_latency_ns) / 1000000.0 # ns -> ms
272 data['osd_latency_apply_avg'] = avg(osd_apply_latency_ns) / 1000000.0 # ns -> ms
273
274 data['osd_latency_commit_max'] = max(osd_commit_latency_ns) / 1000000.0 # ns -> ms
275 data['osd_latency_commit_min'] = min(osd_commit_latency_ns) / 1000000.0 # ns -> ms
276 data['osd_latency_commit_avg'] = avg(osd_commit_latency_ns) / 1000000.0 # ns -> ms
277 except ValueError:
278 pass
279
280 data.update(self.get_pg_stats())
281
282 return data
283
284 def send(self, data: Mapping[str, Union[int, float, str]]) -> bool:
285 identifier = cast(Optional[str], self.config['identifier'])
286 if identifier is None or len(identifier) == 0:
287 identifier = 'ceph-{0}'.format(self.fsid)
288
289 if not self.config['zabbix_host'] or not self._zabbix_hosts:
290 self.log.error('Zabbix server not set, please configure using: '
291 'ceph zabbix config-set zabbix_host <zabbix_host>')
292 self.set_health_checks({
293 'MGR_ZABBIX_NO_SERVER': {
294 'severity': 'warning',
295 'summary': 'No Zabbix server configured',
296 'detail': ['Configuration value zabbix_host not configured']
297 }
298 })
299 return False
300
301 result = True
302
303 for server in self._zabbix_hosts:
304 self.log.info(
305 'Sending data to Zabbix server %s, port %s as host/identifier %s',
306 server['zabbix_host'], server['zabbix_port'], identifier)
307 self.log.debug(data)
308
309 try:
310 zabbix = ZabbixSender(cast(str, self.config['zabbix_sender']),
311 cast(str, server['zabbix_host']),
312 cast(int, server['zabbix_port']), self.log)
313 zabbix.send(identifier, data)
314 except Exception as exc:
315 self.log.exception('Failed to send.')
316 self.set_health_checks({
317 'MGR_ZABBIX_SEND_FAILED': {
318 'severity': 'warning',
319 'summary': 'Failed to send data to Zabbix',
320 'detail': [str(exc)]
321 }
322 })
323 result = False
324
325 self.set_health_checks(dict())
326 return result
327
328 def discovery(self) -> bool:
329 osd_map = self.get('osd_map')
330 osd_map_crush = self.get('osd_map_crush')
331
332 # Discovering ceph pools
333 pool_discovery = {
334 pool['pool_name']: step['item_name']
335 for pool in osd_map['pools']
336 for rule in osd_map_crush['rules'] if rule['rule_id'] == pool['crush_rule']
337 for step in rule['steps'] if step['op'] == "take"
338 }
339 pools_discovery_data = {"data": [
340 {
341 "{#POOL}": pool,
342 "{#CRUSH_RULE}": rule
343 }
344 for pool, rule in pool_discovery.items()
345 ]}
346
347 # Discovering OSDs
348 # Getting hosts for found crush rules
349 osd_roots = {
350 step['item_name']: [
351 item['id']
352 for item in root_bucket['items']
353 ]
354 for rule in osd_map_crush['rules']
355 for step in rule['steps'] if step['op'] == "take"
356 for root_bucket in osd_map_crush['buckets']
357 if root_bucket['id'] == step['item']
358 }
359 # Getting osds for hosts with map to crush_rule
360 osd_discovery = {
361 item['id']: crush_rule
362 for crush_rule, roots in osd_roots.items()
363 for root in roots
364 for bucket in osd_map_crush['buckets']
365 if bucket['id'] == root
366 for item in bucket['items']
367 }
368 osd_discovery_data = {"data": [
369 {
370 "{#OSD}": osd,
371 "{#CRUSH_RULE}": rule
372 }
373 for osd, rule in osd_discovery.items()
374 ]}
375 # Preparing recieved data for sending
376 data = {
377 "zabbix.pool.discovery": json.dumps(pools_discovery_data),
378 "zabbix.osd.discovery": json.dumps(osd_discovery_data)
379 }
380 return bool(self.send(data))
381
382 @CLIReadCommand('zabbix config-show')
383 def config_show(self) -> Tuple[int, str, str]:
384 """
385 Show current configuration
386 """
387 return 0, json.dumps(self.config, indent=4, sort_keys=True), ''
388
389 @CLIWriteCommand('zabbix config-set')
390 def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
391 """
392 Set a configuration value
393 """
394 if not value:
395 return -errno.EINVAL, '', 'Value should not be empty or None'
396
397 self.log.debug('Setting configuration option %s to %s', key, value)
398 if self.set_config_option(key, value):
399 self.set_module_option(key, value)
400 if key == 'zabbix_host' or key == 'zabbix_port':
401 self._parse_zabbix_hosts()
402 return 0, 'Configuration option {0} updated'.format(key), ''
403 return 1,\
404 'Failed to update configuration option {0}'.format(key), ''
405
406 @CLIReadCommand('zabbix send')
407 def do_send(self) -> Tuple[int, str, str]:
408 """
409 Force sending data to Zabbix
410 """
411 data = self.get_data()
412 if self.send(data):
413 return 0, 'Sending data to Zabbix', ''
414
415 return 1, 'Failed to send data to Zabbix', ''
416
417 @CLIReadCommand('zabbix discovery')
418 def do_discovery(self) -> Tuple[int, str, str]:
419 """
420 Discovering Zabbix data
421 """
422 if self.discovery():
423 return 0, 'Sending discovery data to Zabbix', ''
424
425 return 1, 'Failed to send discovery data to Zabbix', ''
426
427 def shutdown(self) -> None:
428 self.log.info('Stopping zabbix')
429 self.run = False
430 self.event.set()
431
432 def serve(self) -> None:
433 self.log.info('Zabbix module starting up')
434 self.run = True
435
436 self.init_module_config()
437
438 discovery_interval = self.config['discovery_interval']
439 # We are sending discovery once plugin is loaded
440 discovery_counter = cast(int, discovery_interval)
441 while self.run:
442 self.log.debug('Waking up for new iteration')
443
444 if discovery_counter == discovery_interval:
445 try:
446 self.discovery()
447 except Exception:
448 # Shouldn't happen, but let's log it and retry next interval,
449 # rather than dying completely.
450 self.log.exception("Unexpected error during discovery():")
451 finally:
452 discovery_counter = 0
453
454 try:
455 data = self.get_data()
456 self.send(data)
457 except Exception:
458 # Shouldn't happen, but let's log it and retry next interval,
459 # rather than dying completely.
460 self.log.exception("Unexpected error during send():")
461
462 interval = cast(float, self.config['interval'])
463 self.log.debug('Sleeping for %d seconds', interval)
464 discovery_counter += 1
465 self.event.wait(interval)
466
467 def self_test(self) -> None:
468 data = self.get_data()
469
470 if data['overall_status'] not in self.ceph_health_mapping:
471 raise RuntimeError('No valid overall_status found in data')
472
473 int(data['overall_status_int'])
474
475 if data['num_mon'] < 1:
476 raise RuntimeError('num_mon is smaller than 1')