]>
Commit | Line | Data |
---|---|---|
224ce89b WB |
1 | """ |
2 | Zabbix module for ceph-mgr | |
3 | ||
4 | Collect statistics from Ceph cluster and every X seconds send data to a Zabbix | |
5 | server using the zabbix_sender executable. | |
6 | """ | |
7 | import json | |
8 | import errno | |
9 | from subprocess import Popen, PIPE | |
10 | from threading import Event | |
11 | from mgr_module import MgrModule | |
12 | ||
13 | ||
14 | def avg(data): | |
15 | return sum(data) / float(len(data)) | |
16 | ||
17 | ||
18 | class ZabbixSender(object): | |
19 | def __init__(self, sender, host, port, log): | |
20 | self.sender = sender | |
21 | self.host = host | |
22 | self.port = port | |
23 | self.log = log | |
24 | ||
25 | def send(self, hostname, data): | |
26 | if len(data) == 0: | |
27 | return | |
28 | ||
29 | cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s', | |
30 | hostname, '-vv', '-i', '-'] | |
31 | ||
32 | proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) | |
33 | ||
34 | for key, value in data.items(): | |
35 | proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value)) | |
36 | ||
37 | stdout, stderr = proc.communicate() | |
38 | if proc.returncode != 0: | |
39 | raise RuntimeError('%s exited non-zero: %s' % (self.sender, | |
40 | stderr)) | |
41 | ||
42 | self.log.debug('Zabbix Sender: %s', stdout.rstrip()) | |
43 | ||
44 | ||
45 | class Module(MgrModule): | |
46 | run = False | |
47 | config = dict() | |
48 | ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} | |
49 | ||
50 | config_keys = { | |
51 | 'zabbix_sender': '/usr/bin/zabbix_sender', | |
52 | 'zabbix_host': None, | |
53 | 'zabbix_port': 10051, | |
54 | 'identifier': None, 'interval': 60 | |
55 | } | |
56 | ||
57 | COMMANDS = [ | |
58 | { | |
59 | "cmd": "zabbix config-set name=key,type=CephString " | |
60 | "name=value,type=CephString", | |
61 | "desc": "Set a configuration value", | |
62 | "perm": "rw" | |
63 | }, | |
64 | { | |
65 | "cmd": "zabbix config-show", | |
66 | "desc": "Show current configuration", | |
67 | "perm": "r" | |
68 | }, | |
69 | { | |
70 | "cmd": "zabbix send", | |
71 | "desc": "Force sending data to Zabbux", | |
72 | "perm": "rw" | |
73 | }, | |
74 | { | |
75 | "cmd": "zabbix self-test", | |
76 | "desc": "Run a self-test on the Zabbix module", | |
77 | "perm": "r" | |
78 | } | |
79 | ] | |
80 | ||
81 | def __init__(self, *args, **kwargs): | |
82 | super(Module, self).__init__(*args, **kwargs) | |
83 | self.event = Event() | |
84 | ||
85 | def init_module_config(self): | |
86 | for key, default in self.config_keys.items(): | |
87 | value = self.get_localized_config(key, default) | |
88 | if value is None: | |
89 | raise RuntimeError('Configuration key {0} not set; "ceph ' | |
c07f9fc5 | 90 | 'config-key set mgr/zabbix/{0} ' |
224ce89b WB |
91 | '<value>"'.format(key)) |
92 | ||
93 | self.set_config_option(key, value) | |
94 | ||
95 | def set_config_option(self, option, value): | |
96 | if option not in self.config_keys.keys(): | |
97 | raise RuntimeError('{0} is a unknown configuration ' | |
98 | 'option'.format(option)) | |
99 | ||
100 | if option in ['zabbix_port', 'interval']: | |
101 | try: | |
102 | value = int(value) | |
103 | except (ValueError, TypeError): | |
104 | raise RuntimeError('invalid {0} configured. Please specify ' | |
105 | 'a valid integer'.format(option)) | |
106 | ||
107 | if option == 'interval' and value < 10: | |
108 | raise RuntimeError('interval should be set to at least 10 seconds') | |
109 | ||
110 | self.config[option] = value | |
111 | ||
112 | def get_data(self): | |
113 | data = dict() | |
114 | ||
115 | health = json.loads(self.get('health')['json']) | |
c07f9fc5 FG |
116 | # 'status' is luminous+, 'overall_status' is legacy mode. |
117 | data['overall_status'] = health.get('status', | |
118 | health.get('overall_status')) | |
224ce89b WB |
119 | data['overall_status_int'] = \ |
120 | self.ceph_health_mapping.get(data['overall_status']) | |
121 | ||
122 | mon_status = json.loads(self.get('mon_status')['json']) | |
123 | data['num_mon'] = len(mon_status['monmap']['mons']) | |
124 | ||
125 | df = self.get('df') | |
126 | data['num_pools'] = len(df['pools']) | |
127 | data['total_objects'] = df['stats']['total_objects'] | |
128 | data['total_used_bytes'] = df['stats']['total_used_bytes'] | |
129 | data['total_bytes'] = df['stats']['total_bytes'] | |
130 | data['total_avail_bytes'] = df['stats']['total_avail_bytes'] | |
131 | ||
132 | wr_ops = 0 | |
133 | rd_ops = 0 | |
134 | wr_bytes = 0 | |
135 | rd_bytes = 0 | |
136 | ||
137 | for pool in df['pools']: | |
138 | wr_ops += pool['stats']['wr'] | |
139 | rd_ops += pool['stats']['rd'] | |
140 | wr_bytes += pool['stats']['wr_bytes'] | |
141 | rd_bytes += pool['stats']['rd_bytes'] | |
142 | ||
143 | data['wr_ops'] = wr_ops | |
144 | data['rd_ops'] = rd_ops | |
145 | data['wr_bytes'] = wr_bytes | |
146 | data['rd_bytes'] = rd_bytes | |
147 | ||
148 | osd_map = self.get('osd_map') | |
149 | data['num_osd'] = len(osd_map['osds']) | |
150 | data['osd_nearfull_ratio'] = osd_map['nearfull_ratio'] | |
151 | data['osd_full_ratio'] = osd_map['full_ratio'] | |
152 | data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio'] | |
153 | ||
154 | data['num_pg_temp'] = len(osd_map['pg_temp']) | |
155 | ||
156 | num_up = 0 | |
157 | num_in = 0 | |
158 | for osd in osd_map['osds']: | |
159 | if osd['up'] == 1: | |
160 | num_up += 1 | |
161 | ||
162 | if osd['in'] == 1: | |
163 | num_in += 1 | |
164 | ||
165 | data['num_osd_up'] = num_up | |
166 | data['num_osd_in'] = num_in | |
167 | ||
168 | osd_fill = list() | |
169 | osd_apply_latency = list() | |
170 | osd_commit_latency = list() | |
171 | ||
172 | osd_stats = self.get('osd_stats') | |
173 | for osd in osd_stats['osd_stats']: | |
174 | osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100) | |
175 | osd_apply_latency.append(osd['perf_stat']['apply_latency_ms']) | |
176 | osd_commit_latency.append(osd['perf_stat']['commit_latency_ms']) | |
177 | ||
178 | try: | |
179 | data['osd_max_fill'] = max(osd_fill) | |
180 | data['osd_min_fill'] = min(osd_fill) | |
181 | data['osd_avg_fill'] = avg(osd_fill) | |
182 | except ValueError: | |
183 | pass | |
184 | ||
185 | try: | |
186 | data['osd_latency_apply_max'] = max(osd_apply_latency) | |
187 | data['osd_latency_apply_min'] = min(osd_apply_latency) | |
188 | data['osd_latency_apply_avg'] = avg(osd_apply_latency) | |
189 | ||
190 | data['osd_latency_commit_max'] = max(osd_commit_latency) | |
191 | data['osd_latency_commit_min'] = min(osd_commit_latency) | |
192 | data['osd_latency_commit_avg'] = avg(osd_commit_latency) | |
193 | except ValueError: | |
194 | pass | |
195 | ||
196 | pg_summary = self.get('pg_summary') | |
197 | num_pg = 0 | |
198 | for state, num in pg_summary['all'].items(): | |
199 | num_pg += num | |
200 | ||
201 | data['num_pg'] = num_pg | |
202 | ||
203 | return data | |
204 | ||
205 | def send(self): | |
206 | data = self.get_data() | |
207 | ||
208 | self.log.debug('Sending data to Zabbix server %s', | |
209 | self.config['zabbix_host']) | |
210 | self.log.debug(data) | |
211 | ||
212 | try: | |
213 | zabbix = ZabbixSender(self.config['zabbix_sender'], | |
214 | self.config['zabbix_host'], | |
215 | self.config['zabbix_port'], self.log) | |
216 | zabbix.send(self.config['identifier'], data) | |
217 | except Exception as exc: | |
218 | self.log.error('Exception when sending: %s', exc) | |
219 | ||
220 | def handle_command(self, command): | |
221 | if command['prefix'] == 'zabbix config-show': | |
222 | return 0, json.dumps(self.config), '' | |
223 | elif command['prefix'] == 'zabbix config-set': | |
224 | key = command['key'] | |
225 | value = command['value'] | |
226 | if not value: | |
227 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
228 | ||
229 | self.log.debug('Setting configuration option %s to %s', key, value) | |
230 | self.set_config_option(key, value) | |
231 | self.set_localized_config(key, value) | |
232 | return 0, 'Configuration option {0} updated'.format(key), '' | |
233 | elif command['prefix'] == 'zabbix send': | |
234 | self.send() | |
235 | return 0, 'Sending data to Zabbix', '' | |
236 | elif command['prefix'] == 'zabbix self-test': | |
237 | self.self_test() | |
238 | return 0, 'Self-test succeeded', '' | |
239 | else: | |
240 | return (-errno.EINVAL, '', | |
241 | "Command not found '{0}'".format(command['prefix'])) | |
242 | ||
243 | def shutdown(self): | |
244 | self.log.info('Stopping zabbix') | |
245 | self.run = False | |
246 | self.event.set() | |
247 | ||
248 | def serve(self): | |
249 | self.log.debug('Zabbix module starting up') | |
250 | self.run = True | |
251 | ||
252 | self.init_module_config() | |
253 | ||
254 | for key, value in self.config.items(): | |
255 | self.log.debug('%s: %s', key, value) | |
256 | ||
257 | while self.run: | |
258 | self.log.debug('Waking up for new iteration') | |
259 | ||
260 | # Sometimes fetching data fails, should be fixed by PR #16020 | |
261 | try: | |
262 | self.send() | |
263 | except Exception as exc: | |
264 | self.log.error(exc) | |
265 | ||
266 | interval = self.config['interval'] | |
267 | self.log.debug('Sleeping for %d seconds', interval) | |
268 | self.event.wait(interval) | |
269 | ||
270 | def self_test(self): | |
271 | data = self.get_data() | |
272 | ||
273 | if data['overall_status'] not in self.ceph_health_mapping: | |
274 | raise RuntimeError('No valid overall_status found in data') | |
275 | ||
276 | int(data['overall_status_int']) | |
277 | ||
278 | if data['num_mon'] < 1: | |
279 | raise RuntimeError('num_mon is smaller than 1') |