]>
Commit | Line | Data |
---|---|---|
494da23a | 1 | from contextlib import contextmanager |
3efd9988 | 2 | from datetime import datetime |
11fdf7f2 TL |
3 | from threading import Event, Thread |
4 | from itertools import chain | |
f67539c2 | 5 | import queue |
3efd9988 FG |
6 | import json |
7 | import errno | |
28e407b8 | 8 | import time |
3efd9988 FG |
9 | |
10 | from mgr_module import MgrModule | |
11 | ||
12 | try: | |
13 | from influxdb import InfluxDBClient | |
14 | from influxdb.exceptions import InfluxDBClientError | |
11fdf7f2 | 15 | from requests.exceptions import RequestException |
3efd9988 FG |
16 | except ImportError: |
17 | InfluxDBClient = None | |
18 | ||
28e407b8 | 19 | |
3efd9988 | 20 | class Module(MgrModule): |
11fdf7f2 TL |
21 | MODULE_OPTIONS = [ |
22 | { | |
23 | 'name': 'hostname', | |
24 | 'default': None | |
25 | }, | |
26 | { | |
27 | 'name': 'port', | |
28 | 'default': 8086 | |
29 | }, | |
30 | { | |
31 | 'name': 'database', | |
32 | 'default': 'ceph' | |
33 | }, | |
34 | { | |
35 | 'name': 'username', | |
36 | 'default': None | |
37 | }, | |
38 | { | |
39 | 'name': 'password', | |
40 | 'default': None | |
41 | }, | |
42 | { | |
43 | 'name': 'interval', | |
44 | 'default': 30 | |
45 | }, | |
46 | { | |
47 | 'name': 'ssl', | |
48 | 'default': 'false' | |
49 | }, | |
50 | { | |
51 | 'name': 'verify_ssl', | |
52 | 'default': 'true' | |
53 | }, | |
54 | { | |
55 | 'name': 'threads', | |
56 | 'default': 5 | |
57 | }, | |
58 | { | |
59 | 'name': 'batch_size', | |
60 | 'default': 5000 | |
61 | } | |
62 | ] | |
63 | ||
64 | @property | |
65 | def config_keys(self): | |
66 | return dict((o['name'], o.get('default', None)) | |
67 | for o in self.MODULE_OPTIONS) | |
68 | ||
3efd9988 | 69 | COMMANDS = [ |
28e407b8 AA |
70 | { |
71 | "cmd": "influx config-set name=key,type=CephString " | |
72 | "name=value,type=CephString", | |
73 | "desc": "Set a configuration value", | |
74 | "perm": "rw" | |
75 | }, | |
76 | { | |
77 | "cmd": "influx config-show", | |
78 | "desc": "Show current configuration", | |
79 | "perm": "r" | |
80 | }, | |
81 | { | |
82 | "cmd": "influx send", | |
83 | "desc": "Force sending data to Influx", | |
84 | "perm": "rw" | |
11fdf7f2 | 85 | } |
3efd9988 FG |
86 | ] |
87 | ||
3efd9988 FG |
88 | def __init__(self, *args, **kwargs): |
89 | super(Module, self).__init__(*args, **kwargs) | |
90 | self.event = Event() | |
28e407b8 AA |
91 | self.run = True |
92 | self.config = dict() | |
11fdf7f2 TL |
93 | self.workers = list() |
94 | self.queue = queue.Queue(maxsize=100) | |
95 | self.health_checks = dict() | |
3efd9988 | 96 | |
28e407b8 AA |
97 | def get_fsid(self): |
98 | return self.get('mon_map')['fsid'] | |
3efd9988 | 99 | |
11fdf7f2 TL |
100 | @staticmethod |
101 | def can_run(): | |
102 | if InfluxDBClient is not None: | |
103 | return True, "" | |
104 | else: | |
105 | return False, "influxdb python module not found" | |
106 | ||
107 | @staticmethod | |
108 | def get_timestamp(): | |
109 | return datetime.utcnow().isoformat() + 'Z' | |
110 | ||
111 | @staticmethod | |
112 | def chunk(l, n): | |
113 | try: | |
114 | while True: | |
115 | xs = [] | |
116 | for _ in range(n): | |
117 | xs.append(next(l)) | |
118 | yield xs | |
119 | except StopIteration: | |
120 | yield xs | |
121 | ||
122 | def queue_worker(self): | |
123 | while True: | |
124 | try: | |
125 | points = self.queue.get() | |
126 | if points is None: | |
127 | self.log.debug('Worker shutting down') | |
128 | break | |
129 | ||
130 | start = time.time() | |
494da23a TL |
131 | with self.get_influx_client() as client: |
132 | client.write_points(points, time_precision='ms') | |
11fdf7f2 TL |
133 | runtime = time.time() - start |
134 | self.log.debug('Writing points %d to Influx took %.3f seconds', | |
135 | len(points), runtime) | |
136 | except RequestException as e: | |
137 | self.log.exception("Failed to connect to Influx host %s:%d", | |
138 | self.config['hostname'], self.config['port']) | |
139 | self.health_checks.update({ | |
140 | 'MGR_INFLUX_SEND_FAILED': { | |
141 | 'severity': 'warning', | |
142 | 'summary': 'Failed to send data to InfluxDB server ' | |
143 | 'at %s:%d due to an connection error' | |
144 | % (self.config['hostname'], | |
145 | self.config['port']), | |
146 | 'detail': [str(e)] | |
147 | } | |
148 | }) | |
149 | except InfluxDBClientError as e: | |
150 | self.health_checks.update({ | |
151 | 'MGR_INFLUX_SEND_FAILED': { | |
152 | 'severity': 'warning', | |
153 | 'summary': 'Failed to send data to InfluxDB', | |
154 | 'detail': [str(e)] | |
155 | } | |
156 | }) | |
157 | self.log.exception('Failed to send data to InfluxDB') | |
158 | except queue.Empty: | |
159 | continue | |
160 | except: | |
161 | self.log.exception('Unhandled Exception while sending to Influx') | |
162 | finally: | |
163 | self.queue.task_done() | |
164 | ||
3efd9988 FG |
165 | def get_latest(self, daemon_type, daemon_name, stat): |
166 | data = self.get_counter(daemon_type, daemon_name, stat)[stat] | |
167 | if data: | |
168 | return data[-1][1] | |
3efd9988 | 169 | |
28e407b8 | 170 | return 0 |
3efd9988 | 171 | |
11fdf7f2 | 172 | def get_df_stats(self, now): |
3efd9988 FG |
173 | df = self.get("df") |
174 | data = [] | |
11fdf7f2 | 175 | pool_info = {} |
28e407b8 | 176 | |
3efd9988 | 177 | df_types = [ |
11fdf7f2 | 178 | 'stored', |
28e407b8 | 179 | 'kb_used', |
3efd9988 | 180 | 'dirty', |
28e407b8 | 181 | 'rd', |
3efd9988 | 182 | 'rd_bytes', |
11fdf7f2 | 183 | 'stored_raw', |
28e407b8 | 184 | 'wr', |
3efd9988 FG |
185 | 'wr_bytes', |
186 | 'objects', | |
28e407b8 AA |
187 | 'max_avail', |
188 | 'quota_objects', | |
189 | 'quota_bytes' | |
3efd9988 | 190 | ] |
11fdf7f2 | 191 | |
3efd9988 FG |
192 | for df_type in df_types: |
193 | for pool in df['pools']: | |
194 | point = { | |
195 | "measurement": "ceph_pool_stats", | |
196 | "tags": { | |
28e407b8 AA |
197 | "pool_name": pool['name'], |
198 | "pool_id": pool['id'], | |
199 | "type_instance": df_type, | |
200 | "fsid": self.get_fsid() | |
3efd9988 | 201 | }, |
28e407b8 AA |
202 | "time": now, |
203 | "fields": { | |
204 | "value": pool['stats'][df_type], | |
205 | } | |
3efd9988 FG |
206 | } |
207 | data.append(point) | |
11fdf7f2 TL |
208 | pool_info.update({str(pool['id']):pool['name']}) |
209 | return data, pool_info | |
210 | ||
211 | def get_pg_summary_osd(self, pool_info, now): | |
212 | pg_sum = self.get('pg_summary') | |
213 | osd_sum = pg_sum['by_osd'] | |
f67539c2 | 214 | for osd_id, stats in osd_sum.items(): |
11fdf7f2 TL |
215 | metadata = self.get_metadata('osd', "%s" % osd_id) |
216 | if not metadata: | |
217 | continue | |
3efd9988 | 218 | |
11fdf7f2 TL |
219 | for stat in stats: |
220 | yield { | |
221 | "measurement": "ceph_pg_summary_osd", | |
222 | "tags": { | |
223 | "ceph_daemon": "osd." + str(osd_id), | |
224 | "type_instance": stat, | |
225 | "host": metadata['hostname'] | |
226 | }, | |
227 | "time" : now, | |
228 | "fields" : { | |
229 | "value": stats[stat] | |
230 | } | |
231 | } | |
3efd9988 | 232 | |
11fdf7f2 TL |
233 | def get_pg_summary_pool(self, pool_info, now): |
234 | pool_sum = self.get('pg_summary')['by_pool'] | |
f67539c2 | 235 | for pool_id, stats in pool_sum.items(): |
11fdf7f2 TL |
236 | for stat in stats: |
237 | yield { | |
238 | "measurement": "ceph_pg_summary_pool", | |
239 | "tags": { | |
240 | "pool_name" : pool_info[pool_id], | |
241 | "pool_id" : pool_id, | |
242 | "type_instance" : stat, | |
243 | }, | |
244 | "time" : now, | |
245 | "fields": { | |
246 | "value" : stats[stat], | |
247 | } | |
248 | } | |
28e407b8 | 249 | |
11fdf7f2 | 250 | def get_daemon_stats(self, now): |
f67539c2 | 251 | for daemon, counters in self.get_all_perf_counters().items(): |
28e407b8 | 252 | svc_type, svc_id = daemon.split(".", 1) |
3efd9988 | 253 | metadata = self.get_metadata(svc_type, svc_id) |
a4b75251 TL |
254 | if metadata is not None: |
255 | hostname = metadata['hostname'] | |
256 | else: | |
257 | hostname = 'N/A' | |
3efd9988 FG |
258 | |
259 | for path, counter_info in counters.items(): | |
260 | if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM: | |
261 | continue | |
262 | ||
263 | value = counter_info['value'] | |
264 | ||
11fdf7f2 | 265 | yield { |
3efd9988 FG |
266 | "measurement": "ceph_daemon_stats", |
267 | "tags": { | |
268 | "ceph_daemon": daemon, | |
269 | "type_instance": path, | |
a4b75251 | 270 | "host": hostname, |
28e407b8 | 271 | "fsid": self.get_fsid() |
3efd9988 | 272 | }, |
28e407b8 | 273 | "time": now, |
3efd9988 FG |
274 | "fields": { |
275 | "value": value | |
276 | } | |
11fdf7f2 | 277 | } |
3efd9988 | 278 | |
28e407b8 AA |
279 | def set_config_option(self, option, value): |
280 | if option not in self.config_keys.keys(): | |
281 | raise RuntimeError('{0} is a unknown configuration ' | |
282 | 'option'.format(option)) | |
283 | ||
11fdf7f2 | 284 | if option in ['port', 'interval', 'threads', 'batch_size']: |
28e407b8 AA |
285 | try: |
286 | value = int(value) | |
287 | except (ValueError, TypeError): | |
288 | raise RuntimeError('invalid {0} configured. Please specify ' | |
289 | 'a valid integer'.format(option)) | |
290 | ||
291 | if option == 'interval' and value < 5: | |
292 | raise RuntimeError('interval should be set to at least 5 seconds') | |
293 | ||
294 | if option in ['ssl', 'verify_ssl']: | |
295 | value = value.lower() == 'true' | |
296 | ||
11fdf7f2 TL |
297 | if option == 'threads': |
298 | if 1 > value > 32: | |
299 | raise RuntimeError('threads should be in range 1-32') | |
300 | ||
28e407b8 AA |
301 | self.config[option] = value |
302 | ||
303 | def init_module_config(self): | |
304 | self.config['hostname'] = \ | |
11fdf7f2 | 305 | self.get_module_option("hostname", default=self.config_keys['hostname']) |
28e407b8 | 306 | self.config['port'] = \ |
11fdf7f2 | 307 | int(self.get_module_option("port", default=self.config_keys['port'])) |
28e407b8 | 308 | self.config['database'] = \ |
11fdf7f2 | 309 | self.get_module_option("database", default=self.config_keys['database']) |
28e407b8 | 310 | self.config['username'] = \ |
11fdf7f2 | 311 | self.get_module_option("username", default=self.config_keys['username']) |
28e407b8 | 312 | self.config['password'] = \ |
11fdf7f2 | 313 | self.get_module_option("password", default=self.config_keys['password']) |
28e407b8 | 314 | self.config['interval'] = \ |
11fdf7f2 | 315 | int(self.get_module_option("interval", |
28e407b8 | 316 | default=self.config_keys['interval'])) |
11fdf7f2 TL |
317 | self.config['threads'] = \ |
318 | int(self.get_module_option("threads", | |
319 | default=self.config_keys['threads'])) | |
320 | self.config['batch_size'] = \ | |
321 | int(self.get_module_option("batch_size", | |
322 | default=self.config_keys['batch_size'])) | |
323 | ssl = self.get_module_option("ssl", default=self.config_keys['ssl']) | |
28e407b8 AA |
324 | self.config['ssl'] = ssl.lower() == 'true' |
325 | verify_ssl = \ | |
11fdf7f2 | 326 | self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']) |
28e407b8 AA |
327 | self.config['verify_ssl'] = verify_ssl.lower() == 'true' |
328 | ||
11fdf7f2 TL |
329 | def gather_statistics(self): |
330 | now = self.get_timestamp() | |
331 | df_stats, pools = self.get_df_stats(now) | |
332 | return chain(df_stats, self.get_daemon_stats(now), | |
333 | self.get_pg_summary_osd(pools, now), | |
334 | self.get_pg_summary_pool(pools, now)) | |
335 | ||
494da23a | 336 | @contextmanager |
11fdf7f2 | 337 | def get_influx_client(self): |
494da23a TL |
338 | client = InfluxDBClient(self.config['hostname'], |
339 | self.config['port'], | |
340 | self.config['username'], | |
341 | self.config['password'], | |
342 | self.config['database'], | |
343 | self.config['ssl'], | |
344 | self.config['verify_ssl']) | |
345 | try: | |
346 | yield client | |
347 | finally: | |
348 | try: | |
349 | client.close() | |
350 | except AttributeError: | |
351 | # influxdb older than v5.0.0 | |
352 | pass | |
11fdf7f2 | 353 | |
3efd9988 | 354 | def send_to_influx(self): |
28e407b8 AA |
355 | if not self.config['hostname']: |
356 | self.log.error("No Influx server configured, please set one using: " | |
357 | "ceph influx config-set hostname <hostname>") | |
11fdf7f2 | 358 | |
28e407b8 AA |
359 | self.set_health_checks({ |
360 | 'MGR_INFLUX_NO_SERVER': { | |
361 | 'severity': 'warning', | |
362 | 'summary': 'No InfluxDB server configured', | |
363 | 'detail': ['Configuration option hostname not set'] | |
364 | } | |
365 | }) | |
11fdf7f2 TL |
366 | return False |
367 | ||
368 | self.health_checks = dict() | |
3efd9988 | 369 | |
28e407b8 AA |
370 | self.log.debug("Sending data to Influx host: %s", |
371 | self.config['hostname']) | |
3efd9988 | 372 | try: |
494da23a TL |
373 | with self.get_influx_client() as client: |
374 | databases = client.get_list_database() | |
375 | if {'name': self.config['database']} not in databases: | |
376 | self.log.info("Database '%s' not found, trying to create " | |
377 | "(requires admin privs). You can also create " | |
378 | "manually and grant write privs to user " | |
379 | "'%s'", self.config['database'], | |
380 | self.config['database']) | |
381 | client.create_database(self.config['database']) | |
382 | client.create_retention_policy(name='8_weeks', | |
383 | duration='8w', | |
384 | replication='1', | |
385 | default=True, | |
386 | database=self.config['database']) | |
11fdf7f2 TL |
387 | |
388 | self.log.debug('Gathering statistics') | |
389 | points = self.gather_statistics() | |
390 | for chunk in self.chunk(points, self.config['batch_size']): | |
391 | self.queue.put(chunk, block=False) | |
392 | ||
393 | self.log.debug('Queue currently contains %d items', | |
394 | self.queue.qsize()) | |
395 | except queue.Full: | |
396 | self.health_checks.update({ | |
397 | 'MGR_INFLUX_QUEUE_FULL': { | |
398 | 'severity': 'warning', | |
399 | 'summary': 'Failed to chunk to InfluxDB Queue', | |
400 | 'detail': ['Queue is full. InfluxDB might be slow with ' | |
401 | 'processing data'] | |
402 | } | |
403 | }) | |
404 | self.log.error('Queue is full, failed to add chunk') | |
405 | except (RequestException, InfluxDBClientError) as e: | |
406 | self.health_checks.update({ | |
407 | 'MGR_INFLUX_DB_LIST_FAILED': { | |
408 | 'severity': 'warning', | |
409 | 'summary': 'Failed to list/create InfluxDB database', | |
410 | 'detail': [str(e)] | |
411 | } | |
412 | }) | |
413 | self.log.exception('Failed to list/create InfluxDB database') | |
414 | return False | |
415 | finally: | |
416 | self.set_health_checks(self.health_checks) | |
3efd9988 FG |
417 | |
418 | def shutdown(self): | |
419 | self.log.info('Stopping influx module') | |
420 | self.run = False | |
421 | self.event.set() | |
11fdf7f2 | 422 | self.log.debug('Shutting down queue workers') |
3efd9988 | 423 | |
11fdf7f2 TL |
424 | for _ in self.workers: |
425 | self.queue.put(None) | |
426 | ||
427 | self.queue.join() | |
428 | ||
429 | for worker in self.workers: | |
430 | worker.join() | |
431 | ||
432 | def self_test(self): | |
433 | now = self.get_timestamp() | |
434 | daemon_stats = list(self.get_daemon_stats(now)) | |
435 | assert len(daemon_stats) | |
436 | df_stats, pools = self.get_df_stats(now) | |
437 | ||
438 | result = { | |
439 | 'daemon_stats': daemon_stats, | |
440 | 'df_stats': df_stats | |
441 | } | |
442 | ||
9f95a23c | 443 | return json.dumps(result, indent=2, sort_keys=True) |
11fdf7f2 TL |
444 | |
445 | def handle_command(self, inbuf, cmd): | |
28e407b8 | 446 | if cmd['prefix'] == 'influx config-show': |
9f95a23c | 447 | return 0, json.dumps(self.config, sort_keys=True), '' |
28e407b8 AA |
448 | elif cmd['prefix'] == 'influx config-set': |
449 | key = cmd['key'] | |
450 | value = cmd['value'] | |
451 | if not value: | |
452 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
453 | ||
454 | self.log.debug('Setting configuration option %s to %s', key, value) | |
455 | self.set_config_option(key, value) | |
11fdf7f2 | 456 | self.set_module_option(key, value) |
28e407b8 AA |
457 | return 0, 'Configuration option {0} updated'.format(key), '' |
458 | elif cmd['prefix'] == 'influx send': | |
459 | self.send_to_influx() | |
460 | return 0, 'Sending data to Influx', '' | |
28e407b8 AA |
461 | |
462 | return (-errno.EINVAL, '', | |
463 | "Command not found '{0}'".format(cmd['prefix'])) | |
3efd9988 FG |
464 | |
465 | def serve(self): | |
466 | if InfluxDBClient is None: | |
467 | self.log.error("Cannot transmit statistics: influxdb python " | |
468 | "module not found. Did you install it?") | |
469 | return | |
470 | ||
471 | self.log.info('Starting influx module') | |
28e407b8 | 472 | self.init_module_config() |
3efd9988 | 473 | self.run = True |
28e407b8 | 474 | |
11fdf7f2 TL |
475 | self.log.debug('Starting %d queue worker threads', |
476 | self.config['threads']) | |
477 | for i in range(self.config['threads']): | |
478 | worker = Thread(target=self.queue_worker, args=()) | |
479 | worker.setDaemon(True) | |
480 | worker.start() | |
481 | self.workers.append(worker) | |
482 | ||
3efd9988 | 483 | while self.run: |
28e407b8 | 484 | start = time.time() |
3efd9988 | 485 | self.send_to_influx() |
28e407b8 | 486 | runtime = time.time() - start |
11fdf7f2 | 487 | self.log.debug('Finished sending data to Influx in %.3f seconds', |
28e407b8 AA |
488 | runtime) |
489 | self.log.debug("Sleeping for %d seconds", self.config['interval']) | |
490 | self.event.wait(self.config['interval']) |