]>
Commit | Line | Data |
---|---|---|
494da23a | 1 | from contextlib import contextmanager |
3efd9988 | 2 | from datetime import datetime |
11fdf7f2 TL |
3 | from threading import Event, Thread |
4 | from itertools import chain | |
5 | from six import next | |
6 | from six.moves import queue | |
7 | from six.moves import xrange as range | |
3efd9988 FG |
8 | import json |
9 | import errno | |
1adf2230 | 10 | import six |
28e407b8 | 11 | import time |
3efd9988 FG |
12 | |
13 | from mgr_module import MgrModule | |
14 | ||
15 | try: | |
16 | from influxdb import InfluxDBClient | |
17 | from influxdb.exceptions import InfluxDBClientError | |
11fdf7f2 | 18 | from requests.exceptions import RequestException |
3efd9988 FG |
19 | except ImportError: |
20 | InfluxDBClient = None | |
21 | ||
28e407b8 | 22 | |
3efd9988 | 23 | class Module(MgrModule): |
11fdf7f2 TL |
24 | MODULE_OPTIONS = [ |
25 | { | |
26 | 'name': 'hostname', | |
27 | 'default': None | |
28 | }, | |
29 | { | |
30 | 'name': 'port', | |
31 | 'default': 8086 | |
32 | }, | |
33 | { | |
34 | 'name': 'database', | |
35 | 'default': 'ceph' | |
36 | }, | |
37 | { | |
38 | 'name': 'username', | |
39 | 'default': None | |
40 | }, | |
41 | { | |
42 | 'name': 'password', | |
43 | 'default': None | |
44 | }, | |
45 | { | |
46 | 'name': 'interval', | |
47 | 'default': 30 | |
48 | }, | |
49 | { | |
50 | 'name': 'ssl', | |
51 | 'default': 'false' | |
52 | }, | |
53 | { | |
54 | 'name': 'verify_ssl', | |
55 | 'default': 'true' | |
56 | }, | |
57 | { | |
58 | 'name': 'threads', | |
59 | 'default': 5 | |
60 | }, | |
61 | { | |
62 | 'name': 'batch_size', | |
63 | 'default': 5000 | |
64 | } | |
65 | ] | |
66 | ||
67 | @property | |
68 | def config_keys(self): | |
69 | return dict((o['name'], o.get('default', None)) | |
70 | for o in self.MODULE_OPTIONS) | |
71 | ||
3efd9988 | 72 | COMMANDS = [ |
28e407b8 AA |
73 | { |
74 | "cmd": "influx config-set name=key,type=CephString " | |
75 | "name=value,type=CephString", | |
76 | "desc": "Set a configuration value", | |
77 | "perm": "rw" | |
78 | }, | |
79 | { | |
80 | "cmd": "influx config-show", | |
81 | "desc": "Show current configuration", | |
82 | "perm": "r" | |
83 | }, | |
84 | { | |
85 | "cmd": "influx send", | |
86 | "desc": "Force sending data to Influx", | |
87 | "perm": "rw" | |
11fdf7f2 | 88 | } |
3efd9988 FG |
89 | ] |
90 | ||
3efd9988 FG |
91 | def __init__(self, *args, **kwargs): |
92 | super(Module, self).__init__(*args, **kwargs) | |
93 | self.event = Event() | |
28e407b8 AA |
94 | self.run = True |
95 | self.config = dict() | |
11fdf7f2 TL |
96 | self.workers = list() |
97 | self.queue = queue.Queue(maxsize=100) | |
98 | self.health_checks = dict() | |
3efd9988 | 99 | |
28e407b8 AA |
100 | def get_fsid(self): |
101 | return self.get('mon_map')['fsid'] | |
3efd9988 | 102 | |
11fdf7f2 TL |
103 | @staticmethod |
104 | def can_run(): | |
105 | if InfluxDBClient is not None: | |
106 | return True, "" | |
107 | else: | |
108 | return False, "influxdb python module not found" | |
109 | ||
110 | @staticmethod | |
111 | def get_timestamp(): | |
112 | return datetime.utcnow().isoformat() + 'Z' | |
113 | ||
114 | @staticmethod | |
115 | def chunk(l, n): | |
116 | try: | |
117 | while True: | |
118 | xs = [] | |
119 | for _ in range(n): | |
120 | xs.append(next(l)) | |
121 | yield xs | |
122 | except StopIteration: | |
123 | yield xs | |
124 | ||
125 | def queue_worker(self): | |
126 | while True: | |
127 | try: | |
128 | points = self.queue.get() | |
129 | if points is None: | |
130 | self.log.debug('Worker shutting down') | |
131 | break | |
132 | ||
133 | start = time.time() | |
494da23a TL |
134 | with self.get_influx_client() as client: |
135 | client.write_points(points, time_precision='ms') | |
11fdf7f2 TL |
136 | runtime = time.time() - start |
137 | self.log.debug('Writing points %d to Influx took %.3f seconds', | |
138 | len(points), runtime) | |
139 | except RequestException as e: | |
140 | self.log.exception("Failed to connect to Influx host %s:%d", | |
141 | self.config['hostname'], self.config['port']) | |
142 | self.health_checks.update({ | |
143 | 'MGR_INFLUX_SEND_FAILED': { | |
144 | 'severity': 'warning', | |
145 | 'summary': 'Failed to send data to InfluxDB server ' | |
146 | 'at %s:%d due to an connection error' | |
147 | % (self.config['hostname'], | |
148 | self.config['port']), | |
149 | 'detail': [str(e)] | |
150 | } | |
151 | }) | |
152 | except InfluxDBClientError as e: | |
153 | self.health_checks.update({ | |
154 | 'MGR_INFLUX_SEND_FAILED': { | |
155 | 'severity': 'warning', | |
156 | 'summary': 'Failed to send data to InfluxDB', | |
157 | 'detail': [str(e)] | |
158 | } | |
159 | }) | |
160 | self.log.exception('Failed to send data to InfluxDB') | |
161 | except queue.Empty: | |
162 | continue | |
163 | except: | |
164 | self.log.exception('Unhandled Exception while sending to Influx') | |
165 | finally: | |
166 | self.queue.task_done() | |
167 | ||
3efd9988 FG |
168 | def get_latest(self, daemon_type, daemon_name, stat): |
169 | data = self.get_counter(daemon_type, daemon_name, stat)[stat] | |
170 | if data: | |
171 | return data[-1][1] | |
3efd9988 | 172 | |
28e407b8 | 173 | return 0 |
3efd9988 | 174 | |
11fdf7f2 | 175 | def get_df_stats(self, now): |
3efd9988 FG |
176 | df = self.get("df") |
177 | data = [] | |
11fdf7f2 | 178 | pool_info = {} |
28e407b8 | 179 | |
3efd9988 | 180 | df_types = [ |
11fdf7f2 | 181 | 'stored', |
28e407b8 | 182 | 'kb_used', |
3efd9988 | 183 | 'dirty', |
28e407b8 | 184 | 'rd', |
3efd9988 | 185 | 'rd_bytes', |
11fdf7f2 | 186 | 'stored_raw', |
28e407b8 | 187 | 'wr', |
3efd9988 FG |
188 | 'wr_bytes', |
189 | 'objects', | |
28e407b8 AA |
190 | 'max_avail', |
191 | 'quota_objects', | |
192 | 'quota_bytes' | |
3efd9988 | 193 | ] |
11fdf7f2 | 194 | |
3efd9988 FG |
195 | for df_type in df_types: |
196 | for pool in df['pools']: | |
197 | point = { | |
198 | "measurement": "ceph_pool_stats", | |
199 | "tags": { | |
28e407b8 AA |
200 | "pool_name": pool['name'], |
201 | "pool_id": pool['id'], | |
202 | "type_instance": df_type, | |
203 | "fsid": self.get_fsid() | |
3efd9988 | 204 | }, |
28e407b8 AA |
205 | "time": now, |
206 | "fields": { | |
207 | "value": pool['stats'][df_type], | |
208 | } | |
3efd9988 FG |
209 | } |
210 | data.append(point) | |
11fdf7f2 TL |
211 | pool_info.update({str(pool['id']):pool['name']}) |
212 | return data, pool_info | |
213 | ||
214 | def get_pg_summary_osd(self, pool_info, now): | |
215 | pg_sum = self.get('pg_summary') | |
216 | osd_sum = pg_sum['by_osd'] | |
217 | for osd_id, stats in six.iteritems(osd_sum): | |
218 | metadata = self.get_metadata('osd', "%s" % osd_id) | |
219 | if not metadata: | |
220 | continue | |
3efd9988 | 221 | |
11fdf7f2 TL |
222 | for stat in stats: |
223 | yield { | |
224 | "measurement": "ceph_pg_summary_osd", | |
225 | "tags": { | |
226 | "ceph_daemon": "osd." + str(osd_id), | |
227 | "type_instance": stat, | |
228 | "host": metadata['hostname'] | |
229 | }, | |
230 | "time" : now, | |
231 | "fields" : { | |
232 | "value": stats[stat] | |
233 | } | |
234 | } | |
3efd9988 | 235 | |
11fdf7f2 TL |
236 | def get_pg_summary_pool(self, pool_info, now): |
237 | pool_sum = self.get('pg_summary')['by_pool'] | |
238 | for pool_id, stats in six.iteritems(pool_sum): | |
239 | for stat in stats: | |
240 | yield { | |
241 | "measurement": "ceph_pg_summary_pool", | |
242 | "tags": { | |
243 | "pool_name" : pool_info[pool_id], | |
244 | "pool_id" : pool_id, | |
245 | "type_instance" : stat, | |
246 | }, | |
247 | "time" : now, | |
248 | "fields": { | |
249 | "value" : stats[stat], | |
250 | } | |
251 | } | |
28e407b8 | 252 | |
11fdf7f2 | 253 | def get_daemon_stats(self, now): |
1adf2230 | 254 | for daemon, counters in six.iteritems(self.get_all_perf_counters()): |
28e407b8 | 255 | svc_type, svc_id = daemon.split(".", 1) |
3efd9988 FG |
256 | metadata = self.get_metadata(svc_type, svc_id) |
257 | ||
258 | for path, counter_info in counters.items(): | |
259 | if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM: | |
260 | continue | |
261 | ||
262 | value = counter_info['value'] | |
263 | ||
11fdf7f2 | 264 | yield { |
3efd9988 FG |
265 | "measurement": "ceph_daemon_stats", |
266 | "tags": { | |
267 | "ceph_daemon": daemon, | |
268 | "type_instance": path, | |
28e407b8 AA |
269 | "host": metadata['hostname'], |
270 | "fsid": self.get_fsid() | |
3efd9988 | 271 | }, |
28e407b8 | 272 | "time": now, |
3efd9988 FG |
273 | "fields": { |
274 | "value": value | |
275 | } | |
11fdf7f2 | 276 | } |
3efd9988 | 277 | |
28e407b8 AA |
278 | def set_config_option(self, option, value): |
279 | if option not in self.config_keys.keys(): | |
280 | raise RuntimeError('{0} is a unknown configuration ' | |
281 | 'option'.format(option)) | |
282 | ||
11fdf7f2 | 283 | if option in ['port', 'interval', 'threads', 'batch_size']: |
28e407b8 AA |
284 | try: |
285 | value = int(value) | |
286 | except (ValueError, TypeError): | |
287 | raise RuntimeError('invalid {0} configured. Please specify ' | |
288 | 'a valid integer'.format(option)) | |
289 | ||
290 | if option == 'interval' and value < 5: | |
291 | raise RuntimeError('interval should be set to at least 5 seconds') | |
292 | ||
293 | if option in ['ssl', 'verify_ssl']: | |
294 | value = value.lower() == 'true' | |
295 | ||
11fdf7f2 TL |
296 | if option == 'threads': |
297 | if 1 > value > 32: | |
298 | raise RuntimeError('threads should be in range 1-32') | |
299 | ||
28e407b8 AA |
300 | self.config[option] = value |
301 | ||
302 | def init_module_config(self): | |
303 | self.config['hostname'] = \ | |
11fdf7f2 | 304 | self.get_module_option("hostname", default=self.config_keys['hostname']) |
28e407b8 | 305 | self.config['port'] = \ |
11fdf7f2 | 306 | int(self.get_module_option("port", default=self.config_keys['port'])) |
28e407b8 | 307 | self.config['database'] = \ |
11fdf7f2 | 308 | self.get_module_option("database", default=self.config_keys['database']) |
28e407b8 | 309 | self.config['username'] = \ |
11fdf7f2 | 310 | self.get_module_option("username", default=self.config_keys['username']) |
28e407b8 | 311 | self.config['password'] = \ |
11fdf7f2 | 312 | self.get_module_option("password", default=self.config_keys['password']) |
28e407b8 | 313 | self.config['interval'] = \ |
11fdf7f2 | 314 | int(self.get_module_option("interval", |
28e407b8 | 315 | default=self.config_keys['interval'])) |
11fdf7f2 TL |
316 | self.config['threads'] = \ |
317 | int(self.get_module_option("threads", | |
318 | default=self.config_keys['threads'])) | |
319 | self.config['batch_size'] = \ | |
320 | int(self.get_module_option("batch_size", | |
321 | default=self.config_keys['batch_size'])) | |
322 | ssl = self.get_module_option("ssl", default=self.config_keys['ssl']) | |
28e407b8 AA |
323 | self.config['ssl'] = ssl.lower() == 'true' |
324 | verify_ssl = \ | |
11fdf7f2 | 325 | self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']) |
28e407b8 AA |
326 | self.config['verify_ssl'] = verify_ssl.lower() == 'true' |
327 | ||
11fdf7f2 TL |
328 | def gather_statistics(self): |
329 | now = self.get_timestamp() | |
330 | df_stats, pools = self.get_df_stats(now) | |
331 | return chain(df_stats, self.get_daemon_stats(now), | |
332 | self.get_pg_summary_osd(pools, now), | |
333 | self.get_pg_summary_pool(pools, now)) | |
334 | ||
494da23a | 335 | @contextmanager |
11fdf7f2 | 336 | def get_influx_client(self): |
494da23a TL |
337 | client = InfluxDBClient(self.config['hostname'], |
338 | self.config['port'], | |
339 | self.config['username'], | |
340 | self.config['password'], | |
341 | self.config['database'], | |
342 | self.config['ssl'], | |
343 | self.config['verify_ssl']) | |
344 | try: | |
345 | yield client | |
346 | finally: | |
347 | try: | |
348 | client.close() | |
349 | except AttributeError: | |
350 | # influxdb older than v5.0.0 | |
351 | pass | |
11fdf7f2 | 352 | |
3efd9988 | 353 | def send_to_influx(self): |
28e407b8 AA |
354 | if not self.config['hostname']: |
355 | self.log.error("No Influx server configured, please set one using: " | |
356 | "ceph influx config-set hostname <hostname>") | |
11fdf7f2 | 357 | |
28e407b8 AA |
358 | self.set_health_checks({ |
359 | 'MGR_INFLUX_NO_SERVER': { | |
360 | 'severity': 'warning', | |
361 | 'summary': 'No InfluxDB server configured', | |
362 | 'detail': ['Configuration option hostname not set'] | |
363 | } | |
364 | }) | |
11fdf7f2 TL |
365 | return False |
366 | ||
367 | self.health_checks = dict() | |
3efd9988 | 368 | |
28e407b8 AA |
369 | self.log.debug("Sending data to Influx host: %s", |
370 | self.config['hostname']) | |
3efd9988 | 371 | try: |
494da23a TL |
372 | with self.get_influx_client() as client: |
373 | databases = client.get_list_database() | |
374 | if {'name': self.config['database']} not in databases: | |
375 | self.log.info("Database '%s' not found, trying to create " | |
376 | "(requires admin privs). You can also create " | |
377 | "manually and grant write privs to user " | |
378 | "'%s'", self.config['database'], | |
379 | self.config['database']) | |
380 | client.create_database(self.config['database']) | |
381 | client.create_retention_policy(name='8_weeks', | |
382 | duration='8w', | |
383 | replication='1', | |
384 | default=True, | |
385 | database=self.config['database']) | |
11fdf7f2 TL |
386 | |
387 | self.log.debug('Gathering statistics') | |
388 | points = self.gather_statistics() | |
389 | for chunk in self.chunk(points, self.config['batch_size']): | |
390 | self.queue.put(chunk, block=False) | |
391 | ||
392 | self.log.debug('Queue currently contains %d items', | |
393 | self.queue.qsize()) | |
394 | except queue.Full: | |
395 | self.health_checks.update({ | |
396 | 'MGR_INFLUX_QUEUE_FULL': { | |
397 | 'severity': 'warning', | |
398 | 'summary': 'Failed to chunk to InfluxDB Queue', | |
399 | 'detail': ['Queue is full. InfluxDB might be slow with ' | |
400 | 'processing data'] | |
401 | } | |
402 | }) | |
403 | self.log.error('Queue is full, failed to add chunk') | |
404 | except (RequestException, InfluxDBClientError) as e: | |
405 | self.health_checks.update({ | |
406 | 'MGR_INFLUX_DB_LIST_FAILED': { | |
407 | 'severity': 'warning', | |
408 | 'summary': 'Failed to list/create InfluxDB database', | |
409 | 'detail': [str(e)] | |
410 | } | |
411 | }) | |
412 | self.log.exception('Failed to list/create InfluxDB database') | |
413 | return False | |
414 | finally: | |
415 | self.set_health_checks(self.health_checks) | |
3efd9988 FG |
416 | |
417 | def shutdown(self): | |
418 | self.log.info('Stopping influx module') | |
419 | self.run = False | |
420 | self.event.set() | |
11fdf7f2 | 421 | self.log.debug('Shutting down queue workers') |
3efd9988 | 422 | |
11fdf7f2 TL |
423 | for _ in self.workers: |
424 | self.queue.put(None) | |
425 | ||
426 | self.queue.join() | |
427 | ||
428 | for worker in self.workers: | |
429 | worker.join() | |
430 | ||
431 | def self_test(self): | |
432 | now = self.get_timestamp() | |
433 | daemon_stats = list(self.get_daemon_stats(now)) | |
434 | assert len(daemon_stats) | |
435 | df_stats, pools = self.get_df_stats(now) | |
436 | ||
437 | result = { | |
438 | 'daemon_stats': daemon_stats, | |
439 | 'df_stats': df_stats | |
440 | } | |
441 | ||
442 | return json.dumps(result, indent=2) | |
443 | ||
444 | def handle_command(self, inbuf, cmd): | |
28e407b8 AA |
445 | if cmd['prefix'] == 'influx config-show': |
446 | return 0, json.dumps(self.config), '' | |
447 | elif cmd['prefix'] == 'influx config-set': | |
448 | key = cmd['key'] | |
449 | value = cmd['value'] | |
450 | if not value: | |
451 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
452 | ||
453 | self.log.debug('Setting configuration option %s to %s', key, value) | |
454 | self.set_config_option(key, value) | |
11fdf7f2 | 455 | self.set_module_option(key, value) |
28e407b8 AA |
456 | return 0, 'Configuration option {0} updated'.format(key), '' |
457 | elif cmd['prefix'] == 'influx send': | |
458 | self.send_to_influx() | |
459 | return 0, 'Sending data to Influx', '' | |
28e407b8 AA |
460 | |
461 | return (-errno.EINVAL, '', | |
462 | "Command not found '{0}'".format(cmd['prefix'])) | |
3efd9988 FG |
463 | |
464 | def serve(self): | |
465 | if InfluxDBClient is None: | |
466 | self.log.error("Cannot transmit statistics: influxdb python " | |
467 | "module not found. Did you install it?") | |
468 | return | |
469 | ||
470 | self.log.info('Starting influx module') | |
28e407b8 | 471 | self.init_module_config() |
3efd9988 | 472 | self.run = True |
28e407b8 | 473 | |
11fdf7f2 TL |
474 | self.log.debug('Starting %d queue worker threads', |
475 | self.config['threads']) | |
476 | for i in range(self.config['threads']): | |
477 | worker = Thread(target=self.queue_worker, args=()) | |
478 | worker.setDaemon(True) | |
479 | worker.start() | |
480 | self.workers.append(worker) | |
481 | ||
3efd9988 | 482 | while self.run: |
28e407b8 | 483 | start = time.time() |
3efd9988 | 484 | self.send_to_influx() |
28e407b8 | 485 | runtime = time.time() - start |
11fdf7f2 | 486 | self.log.debug('Finished sending data to Influx in %.3f seconds', |
28e407b8 AA |
487 | runtime) |
488 | self.log.debug("Sleeping for %d seconds", self.config['interval']) | |
489 | self.event.wait(self.config['interval']) |