]>
Commit | Line | Data |
---|---|---|
3efd9988 FG |
1 | from datetime import datetime |
2 | from threading import Event | |
3 | import json | |
4 | import errno | |
28e407b8 | 5 | import time |
3efd9988 FG |
6 | |
7 | from mgr_module import MgrModule | |
8 | ||
9 | try: | |
10 | from influxdb import InfluxDBClient | |
11 | from influxdb.exceptions import InfluxDBClientError | |
28e407b8 | 12 | from requests.exceptions import ConnectionError |
3efd9988 FG |
13 | except ImportError: |
14 | InfluxDBClient = None | |
15 | ||
28e407b8 | 16 | |
3efd9988 FG |
17 | class Module(MgrModule): |
18 | COMMANDS = [ | |
28e407b8 AA |
19 | { |
20 | "cmd": "influx config-set name=key,type=CephString " | |
21 | "name=value,type=CephString", | |
22 | "desc": "Set a configuration value", | |
23 | "perm": "rw" | |
24 | }, | |
25 | { | |
26 | "cmd": "influx config-show", | |
27 | "desc": "Show current configuration", | |
28 | "perm": "r" | |
29 | }, | |
30 | { | |
31 | "cmd": "influx send", | |
32 | "desc": "Force sending data to Influx", | |
33 | "perm": "rw" | |
34 | }, | |
3efd9988 FG |
35 | { |
36 | "cmd": "influx self-test", | |
37 | "desc": "debug the module", | |
28e407b8 | 38 | "perm": "rw" |
3efd9988 FG |
39 | }, |
40 | ] | |
41 | ||
28e407b8 AA |
42 | config_keys = { |
43 | 'hostname': None, | |
44 | 'port': 8086, | |
45 | 'database': 'ceph', | |
46 | 'username': None, | |
47 | 'password': None, | |
48 | 'interval': 5, | |
49 | 'ssl': 'false', | |
50 | 'verify_ssl': 'true' | |
51 | } | |
3efd9988 FG |
52 | |
53 | def __init__(self, *args, **kwargs): | |
54 | super(Module, self).__init__(*args, **kwargs) | |
55 | self.event = Event() | |
28e407b8 AA |
56 | self.run = True |
57 | self.config = dict() | |
3efd9988 | 58 | |
28e407b8 AA |
59 | def get_fsid(self): |
60 | return self.get('mon_map')['fsid'] | |
3efd9988 FG |
61 | |
62 | def get_latest(self, daemon_type, daemon_name, stat): | |
63 | data = self.get_counter(daemon_type, daemon_name, stat)[stat] | |
64 | if data: | |
65 | return data[-1][1] | |
3efd9988 | 66 | |
28e407b8 | 67 | return 0 |
3efd9988 FG |
68 | |
69 | def get_df_stats(self): | |
70 | df = self.get("df") | |
71 | data = [] | |
72 | ||
28e407b8 AA |
73 | now = datetime.utcnow().isoformat() + 'Z' |
74 | ||
3efd9988 FG |
75 | df_types = [ |
76 | 'bytes_used', | |
28e407b8 | 77 | 'kb_used', |
3efd9988 | 78 | 'dirty', |
28e407b8 | 79 | 'rd', |
3efd9988 FG |
80 | 'rd_bytes', |
81 | 'raw_bytes_used', | |
28e407b8 | 82 | 'wr', |
3efd9988 FG |
83 | 'wr_bytes', |
84 | 'objects', | |
28e407b8 AA |
85 | 'max_avail', |
86 | 'quota_objects', | |
87 | 'quota_bytes' | |
3efd9988 FG |
88 | ] |
89 | ||
90 | for df_type in df_types: | |
91 | for pool in df['pools']: | |
92 | point = { | |
93 | "measurement": "ceph_pool_stats", | |
94 | "tags": { | |
28e407b8 AA |
95 | "pool_name": pool['name'], |
96 | "pool_id": pool['id'], | |
97 | "type_instance": df_type, | |
98 | "fsid": self.get_fsid() | |
3efd9988 | 99 | }, |
28e407b8 AA |
100 | "time": now, |
101 | "fields": { | |
102 | "value": pool['stats'][df_type], | |
103 | } | |
3efd9988 FG |
104 | } |
105 | data.append(point) | |
106 | return data | |
107 | ||
108 | def get_daemon_stats(self): | |
109 | data = [] | |
110 | ||
28e407b8 AA |
111 | now = datetime.utcnow().isoformat() + 'Z' |
112 | ||
3efd9988 | 113 | for daemon, counters in self.get_all_perf_counters().iteritems(): |
28e407b8 | 114 | svc_type, svc_id = daemon.split(".", 1) |
3efd9988 FG |
115 | metadata = self.get_metadata(svc_type, svc_id) |
116 | ||
117 | for path, counter_info in counters.items(): | |
118 | if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM: | |
119 | continue | |
120 | ||
121 | value = counter_info['value'] | |
122 | ||
123 | data.append({ | |
124 | "measurement": "ceph_daemon_stats", | |
125 | "tags": { | |
126 | "ceph_daemon": daemon, | |
127 | "type_instance": path, | |
28e407b8 AA |
128 | "host": metadata['hostname'], |
129 | "fsid": self.get_fsid() | |
3efd9988 | 130 | }, |
28e407b8 | 131 | "time": now, |
3efd9988 FG |
132 | "fields": { |
133 | "value": value | |
134 | } | |
135 | }) | |
136 | ||
137 | return data | |
138 | ||
28e407b8 AA |
139 | def set_config_option(self, option, value): |
140 | if option not in self.config_keys.keys(): | |
141 | raise RuntimeError('{0} is a unknown configuration ' | |
142 | 'option'.format(option)) | |
143 | ||
144 | if option in ['port', 'interval']: | |
145 | try: | |
146 | value = int(value) | |
147 | except (ValueError, TypeError): | |
148 | raise RuntimeError('invalid {0} configured. Please specify ' | |
149 | 'a valid integer'.format(option)) | |
150 | ||
151 | if option == 'interval' and value < 5: | |
152 | raise RuntimeError('interval should be set to at least 5 seconds') | |
153 | ||
154 | if option in ['ssl', 'verify_ssl']: | |
155 | value = value.lower() == 'true' | |
156 | ||
157 | self.config[option] = value | |
158 | ||
159 | def init_module_config(self): | |
160 | self.config['hostname'] = \ | |
161 | self.get_config("hostname", default=self.config_keys['hostname']) | |
162 | self.config['port'] = \ | |
163 | int(self.get_config("port", default=self.config_keys['port'])) | |
164 | self.config['database'] = \ | |
165 | self.get_config("database", default=self.config_keys['database']) | |
166 | self.config['username'] = \ | |
167 | self.get_config("username", default=self.config_keys['username']) | |
168 | self.config['password'] = \ | |
169 | self.get_config("password", default=self.config_keys['password']) | |
170 | self.config['interval'] = \ | |
171 | int(self.get_config("interval", | |
172 | default=self.config_keys['interval'])) | |
173 | ssl = self.get_config("ssl", default=self.config_keys['ssl']) | |
174 | self.config['ssl'] = ssl.lower() == 'true' | |
175 | verify_ssl = \ | |
176 | self.get_config("verify_ssl", default=self.config_keys['verify_ssl']) | |
177 | self.config['verify_ssl'] = verify_ssl.lower() == 'true' | |
178 | ||
3efd9988 | 179 | def send_to_influx(self): |
28e407b8 AA |
180 | if not self.config['hostname']: |
181 | self.log.error("No Influx server configured, please set one using: " | |
182 | "ceph influx config-set hostname <hostname>") | |
183 | self.set_health_checks({ | |
184 | 'MGR_INFLUX_NO_SERVER': { | |
185 | 'severity': 'warning', | |
186 | 'summary': 'No InfluxDB server configured', | |
187 | 'detail': ['Configuration option hostname not set'] | |
188 | } | |
189 | }) | |
3efd9988 FG |
190 | return |
191 | ||
3efd9988 FG |
192 | # If influx server has authentication turned off then |
193 | # missing username/password is valid. | |
28e407b8 AA |
194 | self.log.debug("Sending data to Influx host: %s", |
195 | self.config['hostname']) | |
196 | client = InfluxDBClient(self.config['hostname'], self.config['port'], | |
197 | self.config['username'], | |
198 | self.config['password'], | |
199 | self.config['database'], | |
200 | self.config['ssl'], | |
201 | self.config['verify_ssl']) | |
3efd9988 | 202 | |
28e407b8 AA |
203 | # using influx client get_list_database requires admin privs, |
204 | # instead we'll catch the not found exception and inform the user if | |
205 | # db can not be created | |
3efd9988 FG |
206 | try: |
207 | client.write_points(self.get_df_stats(), 'ms') | |
208 | client.write_points(self.get_daemon_stats(), 'ms') | |
28e407b8 AA |
209 | self.set_health_checks(dict()) |
210 | except ConnectionError as e: | |
211 | self.log.exception("Failed to connect to Influx host %s:%d", | |
212 | self.config['hostname'], self.config['port']) | |
213 | self.set_health_checks({ | |
214 | 'MGR_INFLUX_SEND_FAILED': { | |
215 | 'severity': 'warning', | |
216 | 'summary': 'Failed to send data to InfluxDB server at %s:%d' | |
217 | ' due to an connection error' | |
218 | % (self.config['hostname'], self.config['port']), | |
219 | 'detail': [str(e)] | |
220 | } | |
221 | }) | |
3efd9988 FG |
222 | except InfluxDBClientError as e: |
223 | if e.code == 404: | |
28e407b8 AA |
224 | self.log.info("Database '%s' not found, trying to create " |
225 | "(requires admin privs). You can also create " | |
226 | "manually and grant write privs to user " | |
227 | "'%s'", self.config['database'], | |
228 | self.config['username']) | |
229 | client.create_database(self.config['database']) | |
3efd9988 | 230 | else: |
28e407b8 AA |
231 | self.set_health_checks({ |
232 | 'MGR_INFLUX_SEND_FAILED': { | |
233 | 'severity': 'warning', | |
234 | 'summary': 'Failed to send data to InfluxDB', | |
235 | 'detail': [str(e)] | |
236 | } | |
237 | }) | |
3efd9988 FG |
238 | raise |
239 | ||
240 | def shutdown(self): | |
241 | self.log.info('Stopping influx module') | |
242 | self.run = False | |
243 | self.event.set() | |
244 | ||
245 | def handle_command(self, cmd): | |
28e407b8 AA |
246 | if cmd['prefix'] == 'influx config-show': |
247 | return 0, json.dumps(self.config), '' | |
248 | elif cmd['prefix'] == 'influx config-set': | |
249 | key = cmd['key'] | |
250 | value = cmd['value'] | |
251 | if not value: | |
252 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
253 | ||
254 | self.log.debug('Setting configuration option %s to %s', key, value) | |
255 | self.set_config_option(key, value) | |
256 | self.set_config(key, value) | |
257 | return 0, 'Configuration option {0} updated'.format(key), '' | |
258 | elif cmd['prefix'] == 'influx send': | |
259 | self.send_to_influx() | |
260 | return 0, 'Sending data to Influx', '' | |
3efd9988 FG |
261 | if cmd['prefix'] == 'influx self-test': |
262 | daemon_stats = self.get_daemon_stats() | |
263 | assert len(daemon_stats) | |
264 | df_stats = self.get_df_stats() | |
28e407b8 | 265 | |
3efd9988 FG |
266 | result = { |
267 | 'daemon_stats': daemon_stats, | |
268 | 'df_stats': df_stats | |
269 | } | |
28e407b8 | 270 | |
3efd9988 | 271 | return 0, json.dumps(result, indent=2), 'Self-test OK' |
28e407b8 AA |
272 | |
273 | return (-errno.EINVAL, '', | |
274 | "Command not found '{0}'".format(cmd['prefix'])) | |
3efd9988 FG |
275 | |
276 | def serve(self): | |
277 | if InfluxDBClient is None: | |
278 | self.log.error("Cannot transmit statistics: influxdb python " | |
279 | "module not found. Did you install it?") | |
280 | return | |
281 | ||
282 | self.log.info('Starting influx module') | |
28e407b8 | 283 | self.init_module_config() |
3efd9988 | 284 | self.run = True |
28e407b8 | 285 | |
3efd9988 | 286 | while self.run: |
28e407b8 | 287 | start = time.time() |
3efd9988 | 288 | self.send_to_influx() |
28e407b8 AA |
289 | runtime = time.time() - start |
290 | self.log.debug('Finished sending data in Influx in %.3f seconds', | |
291 | runtime) | |
292 | self.log.debug("Sleeping for %d seconds", self.config['interval']) | |
293 | self.event.wait(self.config['interval']) |