]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/influx/module.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / influx / module.py
CommitLineData
494da23a 1from contextlib import contextmanager
3efd9988 2from datetime import datetime
11fdf7f2
TL
3from threading import Event, Thread
4from itertools import chain
f67539c2 5import queue
3efd9988
FG
6import json
7import errno
28e407b8 8import time
3efd9988
FG
9
10from mgr_module import MgrModule
11
12try:
13 from influxdb import InfluxDBClient
14 from influxdb.exceptions import InfluxDBClientError
11fdf7f2 15 from requests.exceptions import RequestException
3efd9988
FG
16except ImportError:
17 InfluxDBClient = None
18
28e407b8 19
3efd9988 20class Module(MgrModule):
11fdf7f2
TL
21 MODULE_OPTIONS = [
22 {
23 'name': 'hostname',
24 'default': None
25 },
26 {
27 'name': 'port',
28 'default': 8086
29 },
30 {
31 'name': 'database',
32 'default': 'ceph'
33 },
34 {
35 'name': 'username',
36 'default': None
37 },
38 {
39 'name': 'password',
40 'default': None
41 },
42 {
43 'name': 'interval',
44 'default': 30
45 },
46 {
47 'name': 'ssl',
48 'default': 'false'
49 },
50 {
51 'name': 'verify_ssl',
52 'default': 'true'
53 },
54 {
55 'name': 'threads',
56 'default': 5
57 },
58 {
59 'name': 'batch_size',
60 'default': 5000
61 }
62 ]
63
64 @property
65 def config_keys(self):
66 return dict((o['name'], o.get('default', None))
67 for o in self.MODULE_OPTIONS)
68
3efd9988 69 COMMANDS = [
28e407b8
AA
70 {
71 "cmd": "influx config-set name=key,type=CephString "
72 "name=value,type=CephString",
73 "desc": "Set a configuration value",
74 "perm": "rw"
75 },
76 {
77 "cmd": "influx config-show",
78 "desc": "Show current configuration",
79 "perm": "r"
80 },
81 {
82 "cmd": "influx send",
83 "desc": "Force sending data to Influx",
84 "perm": "rw"
11fdf7f2 85 }
3efd9988
FG
86 ]
87
3efd9988
FG
88 def __init__(self, *args, **kwargs):
89 super(Module, self).__init__(*args, **kwargs)
90 self.event = Event()
28e407b8
AA
91 self.run = True
92 self.config = dict()
11fdf7f2
TL
93 self.workers = list()
94 self.queue = queue.Queue(maxsize=100)
95 self.health_checks = dict()
3efd9988 96
28e407b8
AA
97 def get_fsid(self):
98 return self.get('mon_map')['fsid']
3efd9988 99
11fdf7f2
TL
100 @staticmethod
101 def can_run():
102 if InfluxDBClient is not None:
103 return True, ""
104 else:
105 return False, "influxdb python module not found"
106
107 @staticmethod
108 def get_timestamp():
109 return datetime.utcnow().isoformat() + 'Z'
110
111 @staticmethod
112 def chunk(l, n):
113 try:
114 while True:
115 xs = []
116 for _ in range(n):
117 xs.append(next(l))
118 yield xs
119 except StopIteration:
120 yield xs
121
122 def queue_worker(self):
123 while True:
124 try:
125 points = self.queue.get()
126 if points is None:
127 self.log.debug('Worker shutting down')
128 break
129
130 start = time.time()
494da23a
TL
131 with self.get_influx_client() as client:
132 client.write_points(points, time_precision='ms')
11fdf7f2
TL
133 runtime = time.time() - start
134 self.log.debug('Writing points %d to Influx took %.3f seconds',
135 len(points), runtime)
136 except RequestException as e:
137 self.log.exception("Failed to connect to Influx host %s:%d",
138 self.config['hostname'], self.config['port'])
139 self.health_checks.update({
140 'MGR_INFLUX_SEND_FAILED': {
141 'severity': 'warning',
142 'summary': 'Failed to send data to InfluxDB server '
143 'at %s:%d due to an connection error'
144 % (self.config['hostname'],
145 self.config['port']),
146 'detail': [str(e)]
147 }
148 })
149 except InfluxDBClientError as e:
150 self.health_checks.update({
151 'MGR_INFLUX_SEND_FAILED': {
152 'severity': 'warning',
153 'summary': 'Failed to send data to InfluxDB',
154 'detail': [str(e)]
155 }
156 })
157 self.log.exception('Failed to send data to InfluxDB')
158 except queue.Empty:
159 continue
160 except:
161 self.log.exception('Unhandled Exception while sending to Influx')
162 finally:
163 self.queue.task_done()
164
3efd9988
FG
165 def get_latest(self, daemon_type, daemon_name, stat):
166 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
167 if data:
168 return data[-1][1]
3efd9988 169
28e407b8 170 return 0
3efd9988 171
11fdf7f2 172 def get_df_stats(self, now):
3efd9988
FG
173 df = self.get("df")
174 data = []
11fdf7f2 175 pool_info = {}
28e407b8 176
3efd9988 177 df_types = [
11fdf7f2 178 'stored',
28e407b8 179 'kb_used',
3efd9988 180 'dirty',
28e407b8 181 'rd',
3efd9988 182 'rd_bytes',
11fdf7f2 183 'stored_raw',
28e407b8 184 'wr',
3efd9988
FG
185 'wr_bytes',
186 'objects',
28e407b8
AA
187 'max_avail',
188 'quota_objects',
189 'quota_bytes'
3efd9988 190 ]
11fdf7f2 191
3efd9988
FG
192 for df_type in df_types:
193 for pool in df['pools']:
194 point = {
195 "measurement": "ceph_pool_stats",
196 "tags": {
28e407b8
AA
197 "pool_name": pool['name'],
198 "pool_id": pool['id'],
199 "type_instance": df_type,
200 "fsid": self.get_fsid()
3efd9988 201 },
28e407b8
AA
202 "time": now,
203 "fields": {
204 "value": pool['stats'][df_type],
205 }
3efd9988
FG
206 }
207 data.append(point)
11fdf7f2
TL
208 pool_info.update({str(pool['id']):pool['name']})
209 return data, pool_info
210
211 def get_pg_summary_osd(self, pool_info, now):
212 pg_sum = self.get('pg_summary')
213 osd_sum = pg_sum['by_osd']
f67539c2 214 for osd_id, stats in osd_sum.items():
11fdf7f2
TL
215 metadata = self.get_metadata('osd', "%s" % osd_id)
216 if not metadata:
217 continue
3efd9988 218
11fdf7f2
TL
219 for stat in stats:
220 yield {
221 "measurement": "ceph_pg_summary_osd",
222 "tags": {
223 "ceph_daemon": "osd." + str(osd_id),
224 "type_instance": stat,
225 "host": metadata['hostname']
226 },
227 "time" : now,
228 "fields" : {
229 "value": stats[stat]
230 }
231 }
3efd9988 232
11fdf7f2
TL
233 def get_pg_summary_pool(self, pool_info, now):
234 pool_sum = self.get('pg_summary')['by_pool']
f67539c2 235 for pool_id, stats in pool_sum.items():
11fdf7f2
TL
236 for stat in stats:
237 yield {
238 "measurement": "ceph_pg_summary_pool",
239 "tags": {
240 "pool_name" : pool_info[pool_id],
241 "pool_id" : pool_id,
242 "type_instance" : stat,
243 },
244 "time" : now,
245 "fields": {
246 "value" : stats[stat],
247 }
248 }
28e407b8 249
11fdf7f2 250 def get_daemon_stats(self, now):
f67539c2 251 for daemon, counters in self.get_all_perf_counters().items():
28e407b8 252 svc_type, svc_id = daemon.split(".", 1)
3efd9988 253 metadata = self.get_metadata(svc_type, svc_id)
a4b75251
TL
254 if metadata is not None:
255 hostname = metadata['hostname']
256 else:
257 hostname = 'N/A'
3efd9988
FG
258
259 for path, counter_info in counters.items():
260 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
261 continue
262
263 value = counter_info['value']
264
11fdf7f2 265 yield {
3efd9988
FG
266 "measurement": "ceph_daemon_stats",
267 "tags": {
268 "ceph_daemon": daemon,
269 "type_instance": path,
a4b75251 270 "host": hostname,
28e407b8 271 "fsid": self.get_fsid()
3efd9988 272 },
28e407b8 273 "time": now,
3efd9988
FG
274 "fields": {
275 "value": value
276 }
11fdf7f2 277 }
3efd9988 278
28e407b8
AA
279 def set_config_option(self, option, value):
280 if option not in self.config_keys.keys():
281 raise RuntimeError('{0} is a unknown configuration '
282 'option'.format(option))
283
11fdf7f2 284 if option in ['port', 'interval', 'threads', 'batch_size']:
28e407b8
AA
285 try:
286 value = int(value)
287 except (ValueError, TypeError):
288 raise RuntimeError('invalid {0} configured. Please specify '
289 'a valid integer'.format(option))
290
291 if option == 'interval' and value < 5:
292 raise RuntimeError('interval should be set to at least 5 seconds')
293
294 if option in ['ssl', 'verify_ssl']:
295 value = value.lower() == 'true'
296
11fdf7f2
TL
297 if option == 'threads':
298 if 1 > value > 32:
299 raise RuntimeError('threads should be in range 1-32')
300
28e407b8
AA
301 self.config[option] = value
302
303 def init_module_config(self):
304 self.config['hostname'] = \
11fdf7f2 305 self.get_module_option("hostname", default=self.config_keys['hostname'])
28e407b8 306 self.config['port'] = \
11fdf7f2 307 int(self.get_module_option("port", default=self.config_keys['port']))
28e407b8 308 self.config['database'] = \
11fdf7f2 309 self.get_module_option("database", default=self.config_keys['database'])
28e407b8 310 self.config['username'] = \
11fdf7f2 311 self.get_module_option("username", default=self.config_keys['username'])
28e407b8 312 self.config['password'] = \
11fdf7f2 313 self.get_module_option("password", default=self.config_keys['password'])
28e407b8 314 self.config['interval'] = \
11fdf7f2 315 int(self.get_module_option("interval",
28e407b8 316 default=self.config_keys['interval']))
11fdf7f2
TL
317 self.config['threads'] = \
318 int(self.get_module_option("threads",
319 default=self.config_keys['threads']))
320 self.config['batch_size'] = \
321 int(self.get_module_option("batch_size",
322 default=self.config_keys['batch_size']))
323 ssl = self.get_module_option("ssl", default=self.config_keys['ssl'])
28e407b8
AA
324 self.config['ssl'] = ssl.lower() == 'true'
325 verify_ssl = \
11fdf7f2 326 self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl'])
28e407b8
AA
327 self.config['verify_ssl'] = verify_ssl.lower() == 'true'
328
11fdf7f2
TL
329 def gather_statistics(self):
330 now = self.get_timestamp()
331 df_stats, pools = self.get_df_stats(now)
332 return chain(df_stats, self.get_daemon_stats(now),
333 self.get_pg_summary_osd(pools, now),
334 self.get_pg_summary_pool(pools, now))
335
494da23a 336 @contextmanager
11fdf7f2 337 def get_influx_client(self):
494da23a
TL
338 client = InfluxDBClient(self.config['hostname'],
339 self.config['port'],
340 self.config['username'],
341 self.config['password'],
342 self.config['database'],
343 self.config['ssl'],
344 self.config['verify_ssl'])
345 try:
346 yield client
347 finally:
348 try:
349 client.close()
350 except AttributeError:
351 # influxdb older than v5.0.0
352 pass
11fdf7f2 353
3efd9988 354 def send_to_influx(self):
28e407b8
AA
355 if not self.config['hostname']:
356 self.log.error("No Influx server configured, please set one using: "
357 "ceph influx config-set hostname <hostname>")
11fdf7f2 358
28e407b8
AA
359 self.set_health_checks({
360 'MGR_INFLUX_NO_SERVER': {
361 'severity': 'warning',
362 'summary': 'No InfluxDB server configured',
363 'detail': ['Configuration option hostname not set']
364 }
365 })
11fdf7f2
TL
366 return False
367
368 self.health_checks = dict()
3efd9988 369
28e407b8
AA
370 self.log.debug("Sending data to Influx host: %s",
371 self.config['hostname'])
3efd9988 372 try:
494da23a
TL
373 with self.get_influx_client() as client:
374 databases = client.get_list_database()
375 if {'name': self.config['database']} not in databases:
376 self.log.info("Database '%s' not found, trying to create "
377 "(requires admin privs). You can also create "
378 "manually and grant write privs to user "
379 "'%s'", self.config['database'],
380 self.config['database'])
381 client.create_database(self.config['database'])
382 client.create_retention_policy(name='8_weeks',
383 duration='8w',
384 replication='1',
385 default=True,
386 database=self.config['database'])
11fdf7f2
TL
387
388 self.log.debug('Gathering statistics')
389 points = self.gather_statistics()
390 for chunk in self.chunk(points, self.config['batch_size']):
391 self.queue.put(chunk, block=False)
392
393 self.log.debug('Queue currently contains %d items',
394 self.queue.qsize())
395 except queue.Full:
396 self.health_checks.update({
397 'MGR_INFLUX_QUEUE_FULL': {
398 'severity': 'warning',
399 'summary': 'Failed to chunk to InfluxDB Queue',
400 'detail': ['Queue is full. InfluxDB might be slow with '
401 'processing data']
402 }
403 })
404 self.log.error('Queue is full, failed to add chunk')
405 except (RequestException, InfluxDBClientError) as e:
406 self.health_checks.update({
407 'MGR_INFLUX_DB_LIST_FAILED': {
408 'severity': 'warning',
409 'summary': 'Failed to list/create InfluxDB database',
410 'detail': [str(e)]
411 }
412 })
413 self.log.exception('Failed to list/create InfluxDB database')
414 return False
415 finally:
416 self.set_health_checks(self.health_checks)
3efd9988
FG
417
418 def shutdown(self):
419 self.log.info('Stopping influx module')
420 self.run = False
421 self.event.set()
11fdf7f2 422 self.log.debug('Shutting down queue workers')
3efd9988 423
11fdf7f2
TL
424 for _ in self.workers:
425 self.queue.put(None)
426
427 self.queue.join()
428
429 for worker in self.workers:
430 worker.join()
431
432 def self_test(self):
433 now = self.get_timestamp()
434 daemon_stats = list(self.get_daemon_stats(now))
435 assert len(daemon_stats)
436 df_stats, pools = self.get_df_stats(now)
437
438 result = {
439 'daemon_stats': daemon_stats,
440 'df_stats': df_stats
441 }
442
9f95a23c 443 return json.dumps(result, indent=2, sort_keys=True)
11fdf7f2
TL
444
445 def handle_command(self, inbuf, cmd):
28e407b8 446 if cmd['prefix'] == 'influx config-show':
9f95a23c 447 return 0, json.dumps(self.config, sort_keys=True), ''
28e407b8
AA
448 elif cmd['prefix'] == 'influx config-set':
449 key = cmd['key']
450 value = cmd['value']
451 if not value:
452 return -errno.EINVAL, '', 'Value should not be empty or None'
453
454 self.log.debug('Setting configuration option %s to %s', key, value)
455 self.set_config_option(key, value)
11fdf7f2 456 self.set_module_option(key, value)
28e407b8
AA
457 return 0, 'Configuration option {0} updated'.format(key), ''
458 elif cmd['prefix'] == 'influx send':
459 self.send_to_influx()
460 return 0, 'Sending data to Influx', ''
28e407b8
AA
461
462 return (-errno.EINVAL, '',
463 "Command not found '{0}'".format(cmd['prefix']))
3efd9988
FG
464
465 def serve(self):
466 if InfluxDBClient is None:
467 self.log.error("Cannot transmit statistics: influxdb python "
468 "module not found. Did you install it?")
469 return
470
471 self.log.info('Starting influx module')
28e407b8 472 self.init_module_config()
3efd9988 473 self.run = True
28e407b8 474
11fdf7f2
TL
475 self.log.debug('Starting %d queue worker threads',
476 self.config['threads'])
477 for i in range(self.config['threads']):
478 worker = Thread(target=self.queue_worker, args=())
479 worker.setDaemon(True)
480 worker.start()
481 self.workers.append(worker)
482
3efd9988 483 while self.run:
28e407b8 484 start = time.time()
3efd9988 485 self.send_to_influx()
28e407b8 486 runtime = time.time() - start
11fdf7f2 487 self.log.debug('Finished sending data to Influx in %.3f seconds',
28e407b8
AA
488 runtime)
489 self.log.debug("Sleeping for %d seconds", self.config['interval'])
490 self.event.wait(self.config['interval'])