]>
Commit | Line | Data |
---|---|---|
224ce89b WB |
1 | """ |
2 | Zabbix module for ceph-mgr | |
3 | ||
4 | Collect statistics from Ceph cluster and every X seconds send data to a Zabbix | |
5 | server using the zabbix_sender executable. | |
6 | """ | |
7 | import json | |
8 | import errno | |
9 | from subprocess import Popen, PIPE | |
10 | from threading import Event | |
11 | from mgr_module import MgrModule | |
12 | ||
13 | ||
14 | def avg(data): | |
15 | return sum(data) / float(len(data)) | |
16 | ||
17 | ||
18 | class ZabbixSender(object): | |
19 | def __init__(self, sender, host, port, log): | |
20 | self.sender = sender | |
21 | self.host = host | |
22 | self.port = port | |
23 | self.log = log | |
24 | ||
25 | def send(self, hostname, data): | |
26 | if len(data) == 0: | |
27 | return | |
28 | ||
29 | cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s', | |
30 | hostname, '-vv', '-i', '-'] | |
31 | ||
32 | proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) | |
33 | ||
34 | for key, value in data.items(): | |
35 | proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value)) | |
36 | ||
37 | stdout, stderr = proc.communicate() | |
38 | if proc.returncode != 0: | |
39 | raise RuntimeError('%s exited non-zero: %s' % (self.sender, | |
40 | stderr)) | |
41 | ||
42 | self.log.debug('Zabbix Sender: %s', stdout.rstrip()) | |
43 | ||
44 | ||
45 | class Module(MgrModule): | |
46 | run = False | |
47 | config = dict() | |
48 | ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} | |
49 | ||
50 | config_keys = { | |
51 | 'zabbix_sender': '/usr/bin/zabbix_sender', | |
52 | 'zabbix_host': None, | |
53 | 'zabbix_port': 10051, | |
54 | 'identifier': None, 'interval': 60 | |
55 | } | |
56 | ||
57 | COMMANDS = [ | |
58 | { | |
59 | "cmd": "zabbix config-set name=key,type=CephString " | |
60 | "name=value,type=CephString", | |
61 | "desc": "Set a configuration value", | |
62 | "perm": "rw" | |
63 | }, | |
64 | { | |
65 | "cmd": "zabbix config-show", | |
66 | "desc": "Show current configuration", | |
67 | "perm": "r" | |
68 | }, | |
69 | { | |
70 | "cmd": "zabbix send", | |
71 | "desc": "Force sending data to Zabbux", | |
72 | "perm": "rw" | |
73 | }, | |
74 | { | |
75 | "cmd": "zabbix self-test", | |
76 | "desc": "Run a self-test on the Zabbix module", | |
77 | "perm": "r" | |
78 | } | |
79 | ] | |
80 | ||
81 | def __init__(self, *args, **kwargs): | |
82 | super(Module, self).__init__(*args, **kwargs) | |
83 | self.event = Event() | |
84 | ||
85 | def init_module_config(self): | |
86 | for key, default in self.config_keys.items(): | |
87 | value = self.get_localized_config(key, default) | |
88 | if value is None: | |
89 | raise RuntimeError('Configuration key {0} not set; "ceph ' | |
90 | 'config-key put mgr/zabbix/{0} ' | |
91 | '<value>"'.format(key)) | |
92 | ||
93 | self.set_config_option(key, value) | |
94 | ||
95 | def set_config_option(self, option, value): | |
96 | if option not in self.config_keys.keys(): | |
97 | raise RuntimeError('{0} is a unknown configuration ' | |
98 | 'option'.format(option)) | |
99 | ||
100 | if option in ['zabbix_port', 'interval']: | |
101 | try: | |
102 | value = int(value) | |
103 | except (ValueError, TypeError): | |
104 | raise RuntimeError('invalid {0} configured. Please specify ' | |
105 | 'a valid integer'.format(option)) | |
106 | ||
107 | if option == 'interval' and value < 10: | |
108 | raise RuntimeError('interval should be set to at least 10 seconds') | |
109 | ||
110 | self.config[option] = value | |
111 | ||
112 | def get_data(self): | |
113 | data = dict() | |
114 | ||
115 | health = json.loads(self.get('health')['json']) | |
116 | data['overall_status'] = health['overall_status'] | |
117 | data['overall_status_int'] = \ | |
118 | self.ceph_health_mapping.get(data['overall_status']) | |
119 | ||
120 | mon_status = json.loads(self.get('mon_status')['json']) | |
121 | data['num_mon'] = len(mon_status['monmap']['mons']) | |
122 | ||
123 | df = self.get('df') | |
124 | data['num_pools'] = len(df['pools']) | |
125 | data['total_objects'] = df['stats']['total_objects'] | |
126 | data['total_used_bytes'] = df['stats']['total_used_bytes'] | |
127 | data['total_bytes'] = df['stats']['total_bytes'] | |
128 | data['total_avail_bytes'] = df['stats']['total_avail_bytes'] | |
129 | ||
130 | wr_ops = 0 | |
131 | rd_ops = 0 | |
132 | wr_bytes = 0 | |
133 | rd_bytes = 0 | |
134 | ||
135 | for pool in df['pools']: | |
136 | wr_ops += pool['stats']['wr'] | |
137 | rd_ops += pool['stats']['rd'] | |
138 | wr_bytes += pool['stats']['wr_bytes'] | |
139 | rd_bytes += pool['stats']['rd_bytes'] | |
140 | ||
141 | data['wr_ops'] = wr_ops | |
142 | data['rd_ops'] = rd_ops | |
143 | data['wr_bytes'] = wr_bytes | |
144 | data['rd_bytes'] = rd_bytes | |
145 | ||
146 | osd_map = self.get('osd_map') | |
147 | data['num_osd'] = len(osd_map['osds']) | |
148 | data['osd_nearfull_ratio'] = osd_map['nearfull_ratio'] | |
149 | data['osd_full_ratio'] = osd_map['full_ratio'] | |
150 | data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio'] | |
151 | ||
152 | data['num_pg_temp'] = len(osd_map['pg_temp']) | |
153 | ||
154 | num_up = 0 | |
155 | num_in = 0 | |
156 | for osd in osd_map['osds']: | |
157 | if osd['up'] == 1: | |
158 | num_up += 1 | |
159 | ||
160 | if osd['in'] == 1: | |
161 | num_in += 1 | |
162 | ||
163 | data['num_osd_up'] = num_up | |
164 | data['num_osd_in'] = num_in | |
165 | ||
166 | osd_fill = list() | |
167 | osd_apply_latency = list() | |
168 | osd_commit_latency = list() | |
169 | ||
170 | osd_stats = self.get('osd_stats') | |
171 | for osd in osd_stats['osd_stats']: | |
172 | osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100) | |
173 | osd_apply_latency.append(osd['perf_stat']['apply_latency_ms']) | |
174 | osd_commit_latency.append(osd['perf_stat']['commit_latency_ms']) | |
175 | ||
176 | try: | |
177 | data['osd_max_fill'] = max(osd_fill) | |
178 | data['osd_min_fill'] = min(osd_fill) | |
179 | data['osd_avg_fill'] = avg(osd_fill) | |
180 | except ValueError: | |
181 | pass | |
182 | ||
183 | try: | |
184 | data['osd_latency_apply_max'] = max(osd_apply_latency) | |
185 | data['osd_latency_apply_min'] = min(osd_apply_latency) | |
186 | data['osd_latency_apply_avg'] = avg(osd_apply_latency) | |
187 | ||
188 | data['osd_latency_commit_max'] = max(osd_commit_latency) | |
189 | data['osd_latency_commit_min'] = min(osd_commit_latency) | |
190 | data['osd_latency_commit_avg'] = avg(osd_commit_latency) | |
191 | except ValueError: | |
192 | pass | |
193 | ||
194 | pg_summary = self.get('pg_summary') | |
195 | num_pg = 0 | |
196 | for state, num in pg_summary['all'].items(): | |
197 | num_pg += num | |
198 | ||
199 | data['num_pg'] = num_pg | |
200 | ||
201 | return data | |
202 | ||
203 | def send(self): | |
204 | data = self.get_data() | |
205 | ||
206 | self.log.debug('Sending data to Zabbix server %s', | |
207 | self.config['zabbix_host']) | |
208 | self.log.debug(data) | |
209 | ||
210 | try: | |
211 | zabbix = ZabbixSender(self.config['zabbix_sender'], | |
212 | self.config['zabbix_host'], | |
213 | self.config['zabbix_port'], self.log) | |
214 | zabbix.send(self.config['identifier'], data) | |
215 | except Exception as exc: | |
216 | self.log.error('Exception when sending: %s', exc) | |
217 | ||
218 | def handle_command(self, command): | |
219 | if command['prefix'] == 'zabbix config-show': | |
220 | return 0, json.dumps(self.config), '' | |
221 | elif command['prefix'] == 'zabbix config-set': | |
222 | key = command['key'] | |
223 | value = command['value'] | |
224 | if not value: | |
225 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
226 | ||
227 | self.log.debug('Setting configuration option %s to %s', key, value) | |
228 | self.set_config_option(key, value) | |
229 | self.set_localized_config(key, value) | |
230 | return 0, 'Configuration option {0} updated'.format(key), '' | |
231 | elif command['prefix'] == 'zabbix send': | |
232 | self.send() | |
233 | return 0, 'Sending data to Zabbix', '' | |
234 | elif command['prefix'] == 'zabbix self-test': | |
235 | self.self_test() | |
236 | return 0, 'Self-test succeeded', '' | |
237 | else: | |
238 | return (-errno.EINVAL, '', | |
239 | "Command not found '{0}'".format(command['prefix'])) | |
240 | ||
241 | def shutdown(self): | |
242 | self.log.info('Stopping zabbix') | |
243 | self.run = False | |
244 | self.event.set() | |
245 | ||
246 | def serve(self): | |
247 | self.log.debug('Zabbix module starting up') | |
248 | self.run = True | |
249 | ||
250 | self.init_module_config() | |
251 | ||
252 | for key, value in self.config.items(): | |
253 | self.log.debug('%s: %s', key, value) | |
254 | ||
255 | while self.run: | |
256 | self.log.debug('Waking up for new iteration') | |
257 | ||
258 | # Sometimes fetching data fails, should be fixed by PR #16020 | |
259 | try: | |
260 | self.send() | |
261 | except Exception as exc: | |
262 | self.log.error(exc) | |
263 | ||
264 | interval = self.config['interval'] | |
265 | self.log.debug('Sleeping for %d seconds', interval) | |
266 | self.event.wait(interval) | |
267 | ||
268 | def self_test(self): | |
269 | data = self.get_data() | |
270 | ||
271 | if data['overall_status'] not in self.ceph_health_mapping: | |
272 | raise RuntimeError('No valid overall_status found in data') | |
273 | ||
274 | int(data['overall_status_int']) | |
275 | ||
276 | if data['num_mon'] < 1: | |
277 | raise RuntimeError('num_mon is smaller than 1') |