]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/influx/module.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / pybind / mgr / influx / module.py
CommitLineData
494da23a 1from contextlib import contextmanager
3efd9988 2from datetime import datetime
11fdf7f2
TL
3from threading import Event, Thread
4from itertools import chain
f67539c2 5import queue
3efd9988
FG
6import json
7import errno
28e407b8 8import time
20effc67 9from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union
3efd9988 10
20effc67 11from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue
3efd9988
FG
12
13try:
14 from influxdb import InfluxDBClient
15 from influxdb.exceptions import InfluxDBClientError
11fdf7f2 16 from requests.exceptions import RequestException
3efd9988
FG
17except ImportError:
18 InfluxDBClient = None
19
28e407b8 20
3efd9988 21class Module(MgrModule):
11fdf7f2 22 MODULE_OPTIONS = [
20effc67
TL
23 Option(name='hostname',
24 default=None,
25 desc='InfluxDB server hostname'),
26 Option(name='port',
27 type='int',
28 default=8086,
29 desc='InfluxDB server port'),
30 Option(name='database',
31 default='ceph',
32 desc=('InfluxDB database name. You will need to create this '
33 'database and grant write privileges to the configured '
34 'username or the username must have admin privileges to '
35 'create it.')),
36 Option(name='username',
37 default=None,
38 desc='username of InfluxDB server user'),
39 Option(name='password',
40 default=None,
41 desc='password of InfluxDB server user'),
42 Option(name='interval',
43 type='secs',
44 min=5,
45 default=30,
46 desc='Time between reports to InfluxDB. Default 30 seconds.'),
47 Option(name='ssl',
48 default='false',
49 desc='Use https connection for InfluxDB server. Use "true" or "false".'),
50 Option(name='verify_ssl',
51 default='true',
52 desc='Verify https cert for InfluxDB server. Use "true" or "false".'),
53 Option(name='threads',
54 type='int',
55 min=1,
56 max=32,
57 default=5,
58 desc='How many worker threads should be spawned for sending data to InfluxDB.'),
59 Option(name='batch_size',
60 type='int',
61 default=5000,
62 desc='How big batches of data points should be when sending to InfluxDB.'),
11fdf7f2
TL
63 ]
64
65 @property
20effc67 66 def config_keys(self) -> Dict[str, OptionValue]:
11fdf7f2
TL
67 return dict((o['name'], o.get('default', None))
68 for o in self.MODULE_OPTIONS)
69
3efd9988 70 COMMANDS = [
28e407b8
AA
71 {
72 "cmd": "influx config-set name=key,type=CephString "
73 "name=value,type=CephString",
74 "desc": "Set a configuration value",
75 "perm": "rw"
76 },
77 {
78 "cmd": "influx config-show",
79 "desc": "Show current configuration",
80 "perm": "r"
81 },
82 {
83 "cmd": "influx send",
84 "desc": "Force sending data to Influx",
85 "perm": "rw"
11fdf7f2 86 }
3efd9988
FG
87 ]
88
20effc67 89 def __init__(self, *args: Any, **kwargs: Any) -> None:
3efd9988
FG
90 super(Module, self).__init__(*args, **kwargs)
91 self.event = Event()
28e407b8 92 self.run = True
20effc67
TL
93 self.config: Dict[str, OptionValue] = dict()
94 self.workers: List[Thread] = list()
95 self.queue: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue.Queue(maxsize=100)
96 self.health_checks: Dict[str, Dict[str, Any]] = dict()
3efd9988 97
20effc67 98 def get_fsid(self) -> str:
28e407b8 99 return self.get('mon_map')['fsid']
3efd9988 100
11fdf7f2 101 @staticmethod
20effc67 102 def can_run() -> Tuple[bool, str]:
11fdf7f2
TL
103 if InfluxDBClient is not None:
104 return True, ""
105 else:
106 return False, "influxdb python module not found"
107
108 @staticmethod
20effc67 109 def get_timestamp() -> str:
11fdf7f2
TL
110 return datetime.utcnow().isoformat() + 'Z'
111
112 @staticmethod
20effc67 113 def chunk(l: Iterator[Dict[str, str]], n: int) -> Iterator[List[Dict[str, str]]]:
11fdf7f2
TL
114 try:
115 while True:
116 xs = []
117 for _ in range(n):
118 xs.append(next(l))
119 yield xs
120 except StopIteration:
121 yield xs
122
20effc67 123 def queue_worker(self) -> None:
11fdf7f2
TL
124 while True:
125 try:
126 points = self.queue.get()
20effc67 127 if not points:
11fdf7f2
TL
128 self.log.debug('Worker shutting down')
129 break
130
131 start = time.time()
494da23a
TL
132 with self.get_influx_client() as client:
133 client.write_points(points, time_precision='ms')
11fdf7f2
TL
134 runtime = time.time() - start
135 self.log.debug('Writing points %d to Influx took %.3f seconds',
136 len(points), runtime)
137 except RequestException as e:
20effc67
TL
138 hostname = self.config['hostname']
139 port = self.config['port']
140 self.log.exception(f"Failed to connect to Influx host {hostname}:{port}")
11fdf7f2
TL
141 self.health_checks.update({
142 'MGR_INFLUX_SEND_FAILED': {
143 'severity': 'warning',
144 'summary': 'Failed to send data to InfluxDB server '
20effc67 145 f'at {hostname}:{port} due to an connection error',
11fdf7f2
TL
146 'detail': [str(e)]
147 }
148 })
149 except InfluxDBClientError as e:
150 self.health_checks.update({
151 'MGR_INFLUX_SEND_FAILED': {
152 'severity': 'warning',
153 'summary': 'Failed to send data to InfluxDB',
154 'detail': [str(e)]
155 }
156 })
157 self.log.exception('Failed to send data to InfluxDB')
158 except queue.Empty:
159 continue
160 except:
161 self.log.exception('Unhandled Exception while sending to Influx')
162 finally:
163 self.queue.task_done()
164
20effc67 165 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
3efd9988
FG
166 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
167 if data:
168 return data[-1][1]
3efd9988 169
28e407b8 170 return 0
3efd9988 171
20effc67 172 def get_df_stats(self, now) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
3efd9988
FG
173 df = self.get("df")
174 data = []
11fdf7f2 175 pool_info = {}
28e407b8 176
3efd9988 177 df_types = [
11fdf7f2 178 'stored',
28e407b8 179 'kb_used',
3efd9988 180 'dirty',
28e407b8 181 'rd',
3efd9988 182 'rd_bytes',
11fdf7f2 183 'stored_raw',
28e407b8 184 'wr',
3efd9988
FG
185 'wr_bytes',
186 'objects',
28e407b8
AA
187 'max_avail',
188 'quota_objects',
189 'quota_bytes'
3efd9988 190 ]
11fdf7f2 191
3efd9988
FG
192 for df_type in df_types:
193 for pool in df['pools']:
194 point = {
195 "measurement": "ceph_pool_stats",
196 "tags": {
28e407b8
AA
197 "pool_name": pool['name'],
198 "pool_id": pool['id'],
199 "type_instance": df_type,
200 "fsid": self.get_fsid()
3efd9988 201 },
28e407b8
AA
202 "time": now,
203 "fields": {
204 "value": pool['stats'][df_type],
205 }
3efd9988
FG
206 }
207 data.append(point)
11fdf7f2
TL
208 pool_info.update({str(pool['id']):pool['name']})
209 return data, pool_info
210
20effc67 211 def get_pg_summary_osd(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
11fdf7f2
TL
212 pg_sum = self.get('pg_summary')
213 osd_sum = pg_sum['by_osd']
f67539c2 214 for osd_id, stats in osd_sum.items():
11fdf7f2
TL
215 metadata = self.get_metadata('osd', "%s" % osd_id)
216 if not metadata:
217 continue
3efd9988 218
11fdf7f2
TL
219 for stat in stats:
220 yield {
221 "measurement": "ceph_pg_summary_osd",
222 "tags": {
223 "ceph_daemon": "osd." + str(osd_id),
224 "type_instance": stat,
225 "host": metadata['hostname']
226 },
227 "time" : now,
228 "fields" : {
229 "value": stats[stat]
230 }
231 }
3efd9988 232
20effc67 233 def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
11fdf7f2 234 pool_sum = self.get('pg_summary')['by_pool']
f67539c2 235 for pool_id, stats in pool_sum.items():
1e59de90
TL
236 try:
237 pool_name = pool_info[pool_id]
238 except KeyError:
239 self.log.error('Unable to find pool name for pool {}'.format(pool_id))
240 continue
11fdf7f2
TL
241 for stat in stats:
242 yield {
243 "measurement": "ceph_pg_summary_pool",
244 "tags": {
1e59de90 245 "pool_name" : pool_name,
11fdf7f2
TL
246 "pool_id" : pool_id,
247 "type_instance" : stat,
248 },
249 "time" : now,
250 "fields": {
251 "value" : stats[stat],
252 }
253 }
28e407b8 254
20effc67 255 def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]:
f67539c2 256 for daemon, counters in self.get_all_perf_counters().items():
28e407b8 257 svc_type, svc_id = daemon.split(".", 1)
3efd9988 258 metadata = self.get_metadata(svc_type, svc_id)
a4b75251
TL
259 if metadata is not None:
260 hostname = metadata['hostname']
261 else:
262 hostname = 'N/A'
3efd9988
FG
263
264 for path, counter_info in counters.items():
265 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
266 continue
267
268 value = counter_info['value']
269
11fdf7f2 270 yield {
3efd9988
FG
271 "measurement": "ceph_daemon_stats",
272 "tags": {
273 "ceph_daemon": daemon,
274 "type_instance": path,
a4b75251 275 "host": hostname,
28e407b8 276 "fsid": self.get_fsid()
3efd9988 277 },
28e407b8 278 "time": now,
3efd9988
FG
279 "fields": {
280 "value": value
281 }
11fdf7f2 282 }
3efd9988 283
20effc67 284 def init_module_config(self) -> None:
28e407b8 285 self.config['hostname'] = \
11fdf7f2 286 self.get_module_option("hostname", default=self.config_keys['hostname'])
28e407b8 287 self.config['port'] = \
20effc67 288 cast(int, self.get_module_option("port", default=self.config_keys['port']))
28e407b8 289 self.config['database'] = \
11fdf7f2 290 self.get_module_option("database", default=self.config_keys['database'])
28e407b8 291 self.config['username'] = \
11fdf7f2 292 self.get_module_option("username", default=self.config_keys['username'])
28e407b8 293 self.config['password'] = \
11fdf7f2 294 self.get_module_option("password", default=self.config_keys['password'])
28e407b8 295 self.config['interval'] = \
20effc67
TL
296 cast(int, self.get_module_option("interval",
297 default=self.config_keys['interval']))
11fdf7f2 298 self.config['threads'] = \
20effc67
TL
299 cast(int, self.get_module_option("threads",
300 default=self.config_keys['threads']))
11fdf7f2 301 self.config['batch_size'] = \
20effc67
TL
302 cast(int, self.get_module_option("batch_size",
303 default=self.config_keys['batch_size']))
304 ssl = cast(str, self.get_module_option("ssl", default=self.config_keys['ssl']))
28e407b8
AA
305 self.config['ssl'] = ssl.lower() == 'true'
306 verify_ssl = \
20effc67 307 cast(str, self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']))
28e407b8
AA
308 self.config['verify_ssl'] = verify_ssl.lower() == 'true'
309
20effc67 310 def gather_statistics(self) -> Iterator[Dict[str, str]]:
11fdf7f2
TL
311 now = self.get_timestamp()
312 df_stats, pools = self.get_df_stats(now)
313 return chain(df_stats, self.get_daemon_stats(now),
314 self.get_pg_summary_osd(pools, now),
315 self.get_pg_summary_pool(pools, now))
316
494da23a 317 @contextmanager
20effc67 318 def get_influx_client(self) -> Iterator['InfluxDBClient']:
494da23a
TL
319 client = InfluxDBClient(self.config['hostname'],
320 self.config['port'],
321 self.config['username'],
322 self.config['password'],
323 self.config['database'],
324 self.config['ssl'],
325 self.config['verify_ssl'])
326 try:
327 yield client
328 finally:
329 try:
330 client.close()
331 except AttributeError:
332 # influxdb older than v5.0.0
333 pass
11fdf7f2 334
20effc67 335 def send_to_influx(self) -> bool:
28e407b8
AA
336 if not self.config['hostname']:
337 self.log.error("No Influx server configured, please set one using: "
338 "ceph influx config-set hostname <hostname>")
11fdf7f2 339
28e407b8
AA
340 self.set_health_checks({
341 'MGR_INFLUX_NO_SERVER': {
342 'severity': 'warning',
343 'summary': 'No InfluxDB server configured',
344 'detail': ['Configuration option hostname not set']
345 }
346 })
11fdf7f2
TL
347 return False
348
349 self.health_checks = dict()
3efd9988 350
28e407b8
AA
351 self.log.debug("Sending data to Influx host: %s",
352 self.config['hostname'])
3efd9988 353 try:
494da23a
TL
354 with self.get_influx_client() as client:
355 databases = client.get_list_database()
356 if {'name': self.config['database']} not in databases:
357 self.log.info("Database '%s' not found, trying to create "
358 "(requires admin privs). You can also create "
359 "manually and grant write privs to user "
360 "'%s'", self.config['database'],
361 self.config['database'])
362 client.create_database(self.config['database'])
363 client.create_retention_policy(name='8_weeks',
364 duration='8w',
365 replication='1',
366 default=True,
367 database=self.config['database'])
11fdf7f2
TL
368
369 self.log.debug('Gathering statistics')
370 points = self.gather_statistics()
20effc67 371 for chunk in self.chunk(points, cast(int, self.config['batch_size'])):
11fdf7f2
TL
372 self.queue.put(chunk, block=False)
373
374 self.log.debug('Queue currently contains %d items',
375 self.queue.qsize())
20effc67 376 return True
11fdf7f2
TL
377 except queue.Full:
378 self.health_checks.update({
379 'MGR_INFLUX_QUEUE_FULL': {
380 'severity': 'warning',
381 'summary': 'Failed to chunk to InfluxDB Queue',
382 'detail': ['Queue is full. InfluxDB might be slow with '
383 'processing data']
384 }
385 })
386 self.log.error('Queue is full, failed to add chunk')
20effc67 387 return False
11fdf7f2
TL
388 except (RequestException, InfluxDBClientError) as e:
389 self.health_checks.update({
390 'MGR_INFLUX_DB_LIST_FAILED': {
391 'severity': 'warning',
392 'summary': 'Failed to list/create InfluxDB database',
393 'detail': [str(e)]
394 }
395 })
396 self.log.exception('Failed to list/create InfluxDB database')
397 return False
398 finally:
399 self.set_health_checks(self.health_checks)
3efd9988 400
20effc67 401 def shutdown(self) -> None:
3efd9988
FG
402 self.log.info('Stopping influx module')
403 self.run = False
404 self.event.set()
11fdf7f2 405 self.log.debug('Shutting down queue workers')
3efd9988 406
11fdf7f2 407 for _ in self.workers:
20effc67 408 self.queue.put([])
11fdf7f2
TL
409
410 self.queue.join()
411
412 for worker in self.workers:
413 worker.join()
414
20effc67 415 def self_test(self) -> Optional[str]:
11fdf7f2
TL
416 now = self.get_timestamp()
417 daemon_stats = list(self.get_daemon_stats(now))
418 assert len(daemon_stats)
419 df_stats, pools = self.get_df_stats(now)
420
421 result = {
422 'daemon_stats': daemon_stats,
423 'df_stats': df_stats
424 }
425
9f95a23c 426 return json.dumps(result, indent=2, sort_keys=True)
11fdf7f2 427
20effc67
TL
428 @CLIReadCommand('influx config-show')
429 def config_show(self) -> Tuple[int, str, str]:
430 """
431 Show current configuration
432 """
433 return 0, json.dumps(self.config, sort_keys=True), ''
28e407b8 434
20effc67
TL
435 @CLIWriteCommand('influx config-set')
436 def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
437 if not value:
438 return -errno.EINVAL, '', 'Value should not be empty'
3efd9988 439
20effc67
TL
440 self.log.debug('Setting configuration option %s to %s', key, value)
441 try:
442 self.set_module_option(key, value)
443 self.config[key] = self.get_module_option(key)
444 return 0, 'Configuration option {0} updated'.format(key), ''
445 except ValueError as e:
446 return -errno.EINVAL, '', str(e)
447
448 @CLICommand('influx send')
449 def send(self) -> Tuple[int, str, str]:
450 """
451 Force sending data to Influx
452 """
453 self.send_to_influx()
454 return 0, 'Sending data to Influx', ''
455
456 def serve(self) -> None:
3efd9988
FG
457 if InfluxDBClient is None:
458 self.log.error("Cannot transmit statistics: influxdb python "
459 "module not found. Did you install it?")
460 return
461
462 self.log.info('Starting influx module')
28e407b8 463 self.init_module_config()
3efd9988 464 self.run = True
28e407b8 465
11fdf7f2
TL
466 self.log.debug('Starting %d queue worker threads',
467 self.config['threads'])
20effc67 468 for i in range(cast(int, self.config['threads'])):
11fdf7f2
TL
469 worker = Thread(target=self.queue_worker, args=())
470 worker.setDaemon(True)
471 worker.start()
472 self.workers.append(worker)
473
3efd9988 474 while self.run:
28e407b8 475 start = time.time()
3efd9988 476 self.send_to_influx()
28e407b8 477 runtime = time.time() - start
11fdf7f2 478 self.log.debug('Finished sending data to Influx in %.3f seconds',
28e407b8
AA
479 runtime)
480 self.log.debug("Sleeping for %d seconds", self.config['interval'])
20effc67 481 self.event.wait(cast(float, self.config['interval']))