]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/influx/module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / influx / module.py
CommitLineData
494da23a 1from contextlib import contextmanager
3efd9988 2from datetime import datetime
11fdf7f2
TL
3from threading import Event, Thread
4from itertools import chain
5from six import next
6from six.moves import queue
7from six.moves import xrange as range
3efd9988
FG
8import json
9import errno
1adf2230 10import six
28e407b8 11import time
3efd9988
FG
12
13from mgr_module import MgrModule
14
15try:
16 from influxdb import InfluxDBClient
17 from influxdb.exceptions import InfluxDBClientError
11fdf7f2 18 from requests.exceptions import RequestException
3efd9988
FG
19except ImportError:
20 InfluxDBClient = None
21
28e407b8 22
3efd9988 23class Module(MgrModule):
11fdf7f2
TL
24 MODULE_OPTIONS = [
25 {
26 'name': 'hostname',
27 'default': None
28 },
29 {
30 'name': 'port',
31 'default': 8086
32 },
33 {
34 'name': 'database',
35 'default': 'ceph'
36 },
37 {
38 'name': 'username',
39 'default': None
40 },
41 {
42 'name': 'password',
43 'default': None
44 },
45 {
46 'name': 'interval',
47 'default': 30
48 },
49 {
50 'name': 'ssl',
51 'default': 'false'
52 },
53 {
54 'name': 'verify_ssl',
55 'default': 'true'
56 },
57 {
58 'name': 'threads',
59 'default': 5
60 },
61 {
62 'name': 'batch_size',
63 'default': 5000
64 }
65 ]
66
67 @property
68 def config_keys(self):
69 return dict((o['name'], o.get('default', None))
70 for o in self.MODULE_OPTIONS)
71
3efd9988 72 COMMANDS = [
28e407b8
AA
73 {
74 "cmd": "influx config-set name=key,type=CephString "
75 "name=value,type=CephString",
76 "desc": "Set a configuration value",
77 "perm": "rw"
78 },
79 {
80 "cmd": "influx config-show",
81 "desc": "Show current configuration",
82 "perm": "r"
83 },
84 {
85 "cmd": "influx send",
86 "desc": "Force sending data to Influx",
87 "perm": "rw"
11fdf7f2 88 }
3efd9988
FG
89 ]
90
3efd9988
FG
91 def __init__(self, *args, **kwargs):
92 super(Module, self).__init__(*args, **kwargs)
93 self.event = Event()
28e407b8
AA
94 self.run = True
95 self.config = dict()
11fdf7f2
TL
96 self.workers = list()
97 self.queue = queue.Queue(maxsize=100)
98 self.health_checks = dict()
3efd9988 99
28e407b8
AA
100 def get_fsid(self):
101 return self.get('mon_map')['fsid']
3efd9988 102
11fdf7f2
TL
103 @staticmethod
104 def can_run():
105 if InfluxDBClient is not None:
106 return True, ""
107 else:
108 return False, "influxdb python module not found"
109
110 @staticmethod
111 def get_timestamp():
112 return datetime.utcnow().isoformat() + 'Z'
113
114 @staticmethod
115 def chunk(l, n):
116 try:
117 while True:
118 xs = []
119 for _ in range(n):
120 xs.append(next(l))
121 yield xs
122 except StopIteration:
123 yield xs
124
125 def queue_worker(self):
126 while True:
127 try:
128 points = self.queue.get()
129 if points is None:
130 self.log.debug('Worker shutting down')
131 break
132
133 start = time.time()
494da23a
TL
134 with self.get_influx_client() as client:
135 client.write_points(points, time_precision='ms')
11fdf7f2
TL
136 runtime = time.time() - start
137 self.log.debug('Writing points %d to Influx took %.3f seconds',
138 len(points), runtime)
139 except RequestException as e:
140 self.log.exception("Failed to connect to Influx host %s:%d",
141 self.config['hostname'], self.config['port'])
142 self.health_checks.update({
143 'MGR_INFLUX_SEND_FAILED': {
144 'severity': 'warning',
145 'summary': 'Failed to send data to InfluxDB server '
146 'at %s:%d due to an connection error'
147 % (self.config['hostname'],
148 self.config['port']),
149 'detail': [str(e)]
150 }
151 })
152 except InfluxDBClientError as e:
153 self.health_checks.update({
154 'MGR_INFLUX_SEND_FAILED': {
155 'severity': 'warning',
156 'summary': 'Failed to send data to InfluxDB',
157 'detail': [str(e)]
158 }
159 })
160 self.log.exception('Failed to send data to InfluxDB')
161 except queue.Empty:
162 continue
163 except:
164 self.log.exception('Unhandled Exception while sending to Influx')
165 finally:
166 self.queue.task_done()
167
3efd9988
FG
168 def get_latest(self, daemon_type, daemon_name, stat):
169 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
170 if data:
171 return data[-1][1]
3efd9988 172
28e407b8 173 return 0
3efd9988 174
11fdf7f2 175 def get_df_stats(self, now):
3efd9988
FG
176 df = self.get("df")
177 data = []
11fdf7f2 178 pool_info = {}
28e407b8 179
3efd9988 180 df_types = [
11fdf7f2 181 'stored',
28e407b8 182 'kb_used',
3efd9988 183 'dirty',
28e407b8 184 'rd',
3efd9988 185 'rd_bytes',
11fdf7f2 186 'stored_raw',
28e407b8 187 'wr',
3efd9988
FG
188 'wr_bytes',
189 'objects',
28e407b8
AA
190 'max_avail',
191 'quota_objects',
192 'quota_bytes'
3efd9988 193 ]
11fdf7f2 194
3efd9988
FG
195 for df_type in df_types:
196 for pool in df['pools']:
197 point = {
198 "measurement": "ceph_pool_stats",
199 "tags": {
28e407b8
AA
200 "pool_name": pool['name'],
201 "pool_id": pool['id'],
202 "type_instance": df_type,
203 "fsid": self.get_fsid()
3efd9988 204 },
28e407b8
AA
205 "time": now,
206 "fields": {
207 "value": pool['stats'][df_type],
208 }
3efd9988
FG
209 }
210 data.append(point)
11fdf7f2
TL
211 pool_info.update({str(pool['id']):pool['name']})
212 return data, pool_info
213
214 def get_pg_summary_osd(self, pool_info, now):
215 pg_sum = self.get('pg_summary')
216 osd_sum = pg_sum['by_osd']
217 for osd_id, stats in six.iteritems(osd_sum):
218 metadata = self.get_metadata('osd', "%s" % osd_id)
219 if not metadata:
220 continue
3efd9988 221
11fdf7f2
TL
222 for stat in stats:
223 yield {
224 "measurement": "ceph_pg_summary_osd",
225 "tags": {
226 "ceph_daemon": "osd." + str(osd_id),
227 "type_instance": stat,
228 "host": metadata['hostname']
229 },
230 "time" : now,
231 "fields" : {
232 "value": stats[stat]
233 }
234 }
3efd9988 235
11fdf7f2
TL
236 def get_pg_summary_pool(self, pool_info, now):
237 pool_sum = self.get('pg_summary')['by_pool']
238 for pool_id, stats in six.iteritems(pool_sum):
239 for stat in stats:
240 yield {
241 "measurement": "ceph_pg_summary_pool",
242 "tags": {
243 "pool_name" : pool_info[pool_id],
244 "pool_id" : pool_id,
245 "type_instance" : stat,
246 },
247 "time" : now,
248 "fields": {
249 "value" : stats[stat],
250 }
251 }
28e407b8 252
11fdf7f2 253 def get_daemon_stats(self, now):
1adf2230 254 for daemon, counters in six.iteritems(self.get_all_perf_counters()):
28e407b8 255 svc_type, svc_id = daemon.split(".", 1)
3efd9988
FG
256 metadata = self.get_metadata(svc_type, svc_id)
257
258 for path, counter_info in counters.items():
259 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
260 continue
261
262 value = counter_info['value']
263
11fdf7f2 264 yield {
3efd9988
FG
265 "measurement": "ceph_daemon_stats",
266 "tags": {
267 "ceph_daemon": daemon,
268 "type_instance": path,
28e407b8
AA
269 "host": metadata['hostname'],
270 "fsid": self.get_fsid()
3efd9988 271 },
28e407b8 272 "time": now,
3efd9988
FG
273 "fields": {
274 "value": value
275 }
11fdf7f2 276 }
3efd9988 277
28e407b8
AA
278 def set_config_option(self, option, value):
279 if option not in self.config_keys.keys():
280 raise RuntimeError('{0} is a unknown configuration '
281 'option'.format(option))
282
11fdf7f2 283 if option in ['port', 'interval', 'threads', 'batch_size']:
28e407b8
AA
284 try:
285 value = int(value)
286 except (ValueError, TypeError):
287 raise RuntimeError('invalid {0} configured. Please specify '
288 'a valid integer'.format(option))
289
290 if option == 'interval' and value < 5:
291 raise RuntimeError('interval should be set to at least 5 seconds')
292
293 if option in ['ssl', 'verify_ssl']:
294 value = value.lower() == 'true'
295
11fdf7f2
TL
296 if option == 'threads':
297 if 1 > value > 32:
298 raise RuntimeError('threads should be in range 1-32')
299
28e407b8
AA
300 self.config[option] = value
301
302 def init_module_config(self):
303 self.config['hostname'] = \
11fdf7f2 304 self.get_module_option("hostname", default=self.config_keys['hostname'])
28e407b8 305 self.config['port'] = \
11fdf7f2 306 int(self.get_module_option("port", default=self.config_keys['port']))
28e407b8 307 self.config['database'] = \
11fdf7f2 308 self.get_module_option("database", default=self.config_keys['database'])
28e407b8 309 self.config['username'] = \
11fdf7f2 310 self.get_module_option("username", default=self.config_keys['username'])
28e407b8 311 self.config['password'] = \
11fdf7f2 312 self.get_module_option("password", default=self.config_keys['password'])
28e407b8 313 self.config['interval'] = \
11fdf7f2 314 int(self.get_module_option("interval",
28e407b8 315 default=self.config_keys['interval']))
11fdf7f2
TL
316 self.config['threads'] = \
317 int(self.get_module_option("threads",
318 default=self.config_keys['threads']))
319 self.config['batch_size'] = \
320 int(self.get_module_option("batch_size",
321 default=self.config_keys['batch_size']))
322 ssl = self.get_module_option("ssl", default=self.config_keys['ssl'])
28e407b8
AA
323 self.config['ssl'] = ssl.lower() == 'true'
324 verify_ssl = \
11fdf7f2 325 self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl'])
28e407b8
AA
326 self.config['verify_ssl'] = verify_ssl.lower() == 'true'
327
11fdf7f2
TL
328 def gather_statistics(self):
329 now = self.get_timestamp()
330 df_stats, pools = self.get_df_stats(now)
331 return chain(df_stats, self.get_daemon_stats(now),
332 self.get_pg_summary_osd(pools, now),
333 self.get_pg_summary_pool(pools, now))
334
494da23a 335 @contextmanager
11fdf7f2 336 def get_influx_client(self):
494da23a
TL
337 client = InfluxDBClient(self.config['hostname'],
338 self.config['port'],
339 self.config['username'],
340 self.config['password'],
341 self.config['database'],
342 self.config['ssl'],
343 self.config['verify_ssl'])
344 try:
345 yield client
346 finally:
347 try:
348 client.close()
349 except AttributeError:
350 # influxdb older than v5.0.0
351 pass
11fdf7f2 352
3efd9988 353 def send_to_influx(self):
28e407b8
AA
354 if not self.config['hostname']:
355 self.log.error("No Influx server configured, please set one using: "
356 "ceph influx config-set hostname <hostname>")
11fdf7f2 357
28e407b8
AA
358 self.set_health_checks({
359 'MGR_INFLUX_NO_SERVER': {
360 'severity': 'warning',
361 'summary': 'No InfluxDB server configured',
362 'detail': ['Configuration option hostname not set']
363 }
364 })
11fdf7f2
TL
365 return False
366
367 self.health_checks = dict()
3efd9988 368
28e407b8
AA
369 self.log.debug("Sending data to Influx host: %s",
370 self.config['hostname'])
3efd9988 371 try:
494da23a
TL
372 with self.get_influx_client() as client:
373 databases = client.get_list_database()
374 if {'name': self.config['database']} not in databases:
375 self.log.info("Database '%s' not found, trying to create "
376 "(requires admin privs). You can also create "
377 "manually and grant write privs to user "
378 "'%s'", self.config['database'],
379 self.config['database'])
380 client.create_database(self.config['database'])
381 client.create_retention_policy(name='8_weeks',
382 duration='8w',
383 replication='1',
384 default=True,
385 database=self.config['database'])
11fdf7f2
TL
386
387 self.log.debug('Gathering statistics')
388 points = self.gather_statistics()
389 for chunk in self.chunk(points, self.config['batch_size']):
390 self.queue.put(chunk, block=False)
391
392 self.log.debug('Queue currently contains %d items',
393 self.queue.qsize())
394 except queue.Full:
395 self.health_checks.update({
396 'MGR_INFLUX_QUEUE_FULL': {
397 'severity': 'warning',
398 'summary': 'Failed to chunk to InfluxDB Queue',
399 'detail': ['Queue is full. InfluxDB might be slow with '
400 'processing data']
401 }
402 })
403 self.log.error('Queue is full, failed to add chunk')
404 except (RequestException, InfluxDBClientError) as e:
405 self.health_checks.update({
406 'MGR_INFLUX_DB_LIST_FAILED': {
407 'severity': 'warning',
408 'summary': 'Failed to list/create InfluxDB database',
409 'detail': [str(e)]
410 }
411 })
412 self.log.exception('Failed to list/create InfluxDB database')
413 return False
414 finally:
415 self.set_health_checks(self.health_checks)
3efd9988
FG
416
417 def shutdown(self):
418 self.log.info('Stopping influx module')
419 self.run = False
420 self.event.set()
11fdf7f2 421 self.log.debug('Shutting down queue workers')
3efd9988 422
11fdf7f2
TL
423 for _ in self.workers:
424 self.queue.put(None)
425
426 self.queue.join()
427
428 for worker in self.workers:
429 worker.join()
430
431 def self_test(self):
432 now = self.get_timestamp()
433 daemon_stats = list(self.get_daemon_stats(now))
434 assert len(daemon_stats)
435 df_stats, pools = self.get_df_stats(now)
436
437 result = {
438 'daemon_stats': daemon_stats,
439 'df_stats': df_stats
440 }
441
442 return json.dumps(result, indent=2)
443
444 def handle_command(self, inbuf, cmd):
28e407b8
AA
445 if cmd['prefix'] == 'influx config-show':
446 return 0, json.dumps(self.config), ''
447 elif cmd['prefix'] == 'influx config-set':
448 key = cmd['key']
449 value = cmd['value']
450 if not value:
451 return -errno.EINVAL, '', 'Value should not be empty or None'
452
453 self.log.debug('Setting configuration option %s to %s', key, value)
454 self.set_config_option(key, value)
11fdf7f2 455 self.set_module_option(key, value)
28e407b8
AA
456 return 0, 'Configuration option {0} updated'.format(key), ''
457 elif cmd['prefix'] == 'influx send':
458 self.send_to_influx()
459 return 0, 'Sending data to Influx', ''
28e407b8
AA
460
461 return (-errno.EINVAL, '',
462 "Command not found '{0}'".format(cmd['prefix']))
3efd9988
FG
463
464 def serve(self):
465 if InfluxDBClient is None:
466 self.log.error("Cannot transmit statistics: influxdb python "
467 "module not found. Did you install it?")
468 return
469
470 self.log.info('Starting influx module')
28e407b8 471 self.init_module_config()
3efd9988 472 self.run = True
28e407b8 473
11fdf7f2
TL
474 self.log.debug('Starting %d queue worker threads',
475 self.config['threads'])
476 for i in range(self.config['threads']):
477 worker = Thread(target=self.queue_worker, args=())
478 worker.setDaemon(True)
479 worker.start()
480 self.workers.append(worker)
481
3efd9988 482 while self.run:
28e407b8 483 start = time.time()
3efd9988 484 self.send_to_influx()
28e407b8 485 runtime = time.time() - start
11fdf7f2 486 self.log.debug('Finished sending data to Influx in %.3f seconds',
28e407b8
AA
487 runtime)
488 self.log.debug("Sleeping for %d seconds", self.config['interval'])
489 self.event.wait(self.config['interval'])