]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/influx/module.py
1 from contextlib
import contextmanager
2 from datetime
import datetime
3 from threading
import Event
, Thread
4 from itertools
import chain
9 from typing
import cast
, Any
, Dict
, Iterator
, List
, Optional
, Tuple
, Union
11 from mgr_module
import CLICommand
, CLIReadCommand
, CLIWriteCommand
, MgrModule
, Option
, OptionValue
14 from influxdb
import InfluxDBClient
15 from influxdb
.exceptions
import InfluxDBClientError
16 from requests
.exceptions
import RequestException
21 class Module(MgrModule
):
23 Option(name
='hostname',
25 desc
='InfluxDB server hostname'),
29 desc
='InfluxDB server port'),
30 Option(name
='database',
32 desc
=('InfluxDB database name. You will need to create this '
33 'database and grant write privileges to the configured '
34 'username or the username must have admin privileges to '
36 Option(name
='username',
38 desc
='username of InfluxDB server user'),
39 Option(name
='password',
41 desc
='password of InfluxDB server user'),
42 Option(name
='interval',
46 desc
='Time between reports to InfluxDB. Default 30 seconds.'),
49 desc
='Use https connection for InfluxDB server. Use "true" or "false".'),
50 Option(name
='verify_ssl',
52 desc
='Verify https cert for InfluxDB server. Use "true" or "false".'),
53 Option(name
='threads',
58 desc
='How many worker threads should be spawned for sending data to InfluxDB.'),
59 Option(name
='batch_size',
62 desc
='How big batches of data points should be when sending to InfluxDB.'),
66 def config_keys(self
) -> Dict
[str, OptionValue
]:
67 return dict((o
['name'], o
.get('default', None))
68 for o
in self
.MODULE_OPTIONS
)
72 "cmd": "influx config-set name=key,type=CephString "
73 "name=value,type=CephString",
74 "desc": "Set a configuration value",
78 "cmd": "influx config-show",
79 "desc": "Show current configuration",
84 "desc": "Force sending data to Influx",
89 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
90 super(Module
, self
).__init
__(*args
, **kwargs
)
93 self
.config
: Dict
[str, OptionValue
] = dict()
94 self
.workers
: List
[Thread
] = list()
95 self
.queue
: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue
.Queue(maxsize
=100)
96 self
.health_checks
: Dict
[str, Dict
[str, Any
]] = dict()
98 def get_fsid(self
) -> str:
99 return self
.get('mon_map')['fsid']
102 def can_run() -> Tuple
[bool, str]:
103 if InfluxDBClient
is not None:
106 return False, "influxdb python module not found"
109 def get_timestamp() -> str:
110 return datetime
.utcnow().isoformat() + 'Z'
113 def chunk(l
: Iterator
[Dict
[str, str]], n
: int) -> Iterator
[List
[Dict
[str, str]]]:
120 except StopIteration:
123 def queue_worker(self
) -> None:
126 points
= self
.queue
.get()
128 self
.log
.debug('Worker shutting down')
132 with self
.get_influx_client() as client
:
133 client
.write_points(points
, time_precision
='ms')
134 runtime
= time
.time() - start
135 self
.log
.debug('Writing points %d to Influx took %.3f seconds',
136 len(points
), runtime
)
137 except RequestException
as e
:
138 hostname
= self
.config
['hostname']
139 port
= self
.config
['port']
140 self
.log
.exception(f
"Failed to connect to Influx host {hostname}:{port}")
141 self
.health_checks
.update({
142 'MGR_INFLUX_SEND_FAILED': {
143 'severity': 'warning',
144 'summary': 'Failed to send data to InfluxDB server '
145 f
'at {hostname}:{port} due to an connection error',
149 except InfluxDBClientError
as e
:
150 self
.health_checks
.update({
151 'MGR_INFLUX_SEND_FAILED': {
152 'severity': 'warning',
153 'summary': 'Failed to send data to InfluxDB',
157 self
.log
.exception('Failed to send data to InfluxDB')
161 self
.log
.exception('Unhandled Exception while sending to Influx')
163 self
.queue
.task_done()
165 def get_latest(self
, daemon_type
: str, daemon_name
: str, stat
: str) -> int:
166 data
= self
.get_counter(daemon_type
, daemon_name
, stat
)[stat
]
172 def get_df_stats(self
, now
) -> Tuple
[List
[Dict
[str, Any
]], Dict
[str, str]]:
192 for df_type
in df_types
:
193 for pool
in df
['pools']:
195 "measurement": "ceph_pool_stats",
197 "pool_name": pool
['name'],
198 "pool_id": pool
['id'],
199 "type_instance": df_type
,
200 "fsid": self
.get_fsid()
204 "value": pool
['stats'][df_type
],
208 pool_info
.update({str(pool
['id']):pool
['name']})
209 return data
, pool_info
211 def get_pg_summary_osd(self
, pool_info
: Dict
[str, str], now
: str) -> Iterator
[Dict
[str, Any
]]:
212 pg_sum
= self
.get('pg_summary')
213 osd_sum
= pg_sum
['by_osd']
214 for osd_id
, stats
in osd_sum
.items():
215 metadata
= self
.get_metadata('osd', "%s" % osd_id
)
221 "measurement": "ceph_pg_summary_osd",
223 "ceph_daemon": "osd." + str(osd_id
),
224 "type_instance": stat
,
225 "host": metadata
['hostname']
233 def get_pg_summary_pool(self
, pool_info
: Dict
[str, str], now
: str) -> Iterator
[Dict
[str, Any
]]:
234 pool_sum
= self
.get('pg_summary')['by_pool']
235 for pool_id
, stats
in pool_sum
.items():
237 pool_name
= pool_info
[pool_id
]
239 self
.log
.error('Unable to find pool name for pool {}'.format(pool_id
))
243 "measurement": "ceph_pg_summary_pool",
245 "pool_name" : pool_name
,
247 "type_instance" : stat
,
251 "value" : stats
[stat
],
255 def get_daemon_stats(self
, now
: str) -> Iterator
[Dict
[str, Any
]]:
256 for daemon
, counters
in self
.get_unlabeled_perf_counters().items():
257 svc_type
, svc_id
= daemon
.split(".", 1)
258 metadata
= self
.get_metadata(svc_type
, svc_id
)
259 if metadata
is not None:
260 hostname
= metadata
['hostname']
264 for path
, counter_info
in counters
.items():
265 if counter_info
['type'] & self
.PERFCOUNTER_HISTOGRAM
:
268 value
= counter_info
['value']
271 "measurement": "ceph_daemon_stats",
273 "ceph_daemon": daemon
,
274 "type_instance": path
,
276 "fsid": self
.get_fsid()
284 def init_module_config(self
) -> None:
285 self
.config
['hostname'] = \
286 self
.get_module_option("hostname", default
=self
.config_keys
['hostname'])
287 self
.config
['port'] = \
288 cast(int, self
.get_module_option("port", default
=self
.config_keys
['port']))
289 self
.config
['database'] = \
290 self
.get_module_option("database", default
=self
.config_keys
['database'])
291 self
.config
['username'] = \
292 self
.get_module_option("username", default
=self
.config_keys
['username'])
293 self
.config
['password'] = \
294 self
.get_module_option("password", default
=self
.config_keys
['password'])
295 self
.config
['interval'] = \
296 cast(int, self
.get_module_option("interval",
297 default
=self
.config_keys
['interval']))
298 self
.config
['threads'] = \
299 cast(int, self
.get_module_option("threads",
300 default
=self
.config_keys
['threads']))
301 self
.config
['batch_size'] = \
302 cast(int, self
.get_module_option("batch_size",
303 default
=self
.config_keys
['batch_size']))
304 ssl
= cast(str, self
.get_module_option("ssl", default
=self
.config_keys
['ssl']))
305 self
.config
['ssl'] = ssl
.lower() == 'true'
307 cast(str, self
.get_module_option("verify_ssl", default
=self
.config_keys
['verify_ssl']))
308 self
.config
['verify_ssl'] = verify_ssl
.lower() == 'true'
310 def gather_statistics(self
) -> Iterator
[Dict
[str, str]]:
311 now
= self
.get_timestamp()
312 df_stats
, pools
= self
.get_df_stats(now
)
313 return chain(df_stats
, self
.get_daemon_stats(now
),
314 self
.get_pg_summary_osd(pools
, now
),
315 self
.get_pg_summary_pool(pools
, now
))
318 def get_influx_client(self
) -> Iterator
['InfluxDBClient']:
319 client
= InfluxDBClient(self
.config
['hostname'],
321 self
.config
['username'],
322 self
.config
['password'],
323 self
.config
['database'],
325 self
.config
['verify_ssl'])
331 except AttributeError:
332 # influxdb older than v5.0.0
335 def send_to_influx(self
) -> bool:
336 if not self
.config
['hostname']:
337 self
.log
.error("No Influx server configured, please set one using: "
338 "ceph influx config-set hostname <hostname>")
340 self
.set_health_checks({
341 'MGR_INFLUX_NO_SERVER': {
342 'severity': 'warning',
343 'summary': 'No InfluxDB server configured',
344 'detail': ['Configuration option hostname not set']
349 self
.health_checks
= dict()
351 self
.log
.debug("Sending data to Influx host: %s",
352 self
.config
['hostname'])
354 with self
.get_influx_client() as client
:
355 databases
= client
.get_list_database()
356 if {'name': self
.config
['database']} not in databases
:
357 self
.log
.info("Database '%s' not found, trying to create "
358 "(requires admin privs). You can also create "
359 "manually and grant write privs to user "
360 "'%s'", self
.config
['database'],
361 self
.config
['database'])
362 client
.create_database(self
.config
['database'])
363 client
.create_retention_policy(name
='8_weeks',
367 database
=self
.config
['database'])
369 self
.log
.debug('Gathering statistics')
370 points
= self
.gather_statistics()
371 for chunk
in self
.chunk(points
, cast(int, self
.config
['batch_size'])):
372 self
.queue
.put(chunk
, block
=False)
374 self
.log
.debug('Queue currently contains %d items',
378 self
.health_checks
.update({
379 'MGR_INFLUX_QUEUE_FULL': {
380 'severity': 'warning',
381 'summary': 'Failed to chunk to InfluxDB Queue',
382 'detail': ['Queue is full. InfluxDB might be slow with '
386 self
.log
.error('Queue is full, failed to add chunk')
388 except (RequestException
, InfluxDBClientError
) as e
:
389 self
.health_checks
.update({
390 'MGR_INFLUX_DB_LIST_FAILED': {
391 'severity': 'warning',
392 'summary': 'Failed to list/create InfluxDB database',
396 self
.log
.exception('Failed to list/create InfluxDB database')
399 self
.set_health_checks(self
.health_checks
)
401 def shutdown(self
) -> None:
402 self
.log
.info('Stopping influx module')
405 self
.log
.debug('Shutting down queue workers')
407 for _
in self
.workers
:
412 for worker
in self
.workers
:
415 def self_test(self
) -> Optional
[str]:
416 now
= self
.get_timestamp()
417 daemon_stats
= list(self
.get_daemon_stats(now
))
418 assert len(daemon_stats
)
419 df_stats
, pools
= self
.get_df_stats(now
)
422 'daemon_stats': daemon_stats
,
426 return json
.dumps(result
, indent
=2, sort_keys
=True)
428 @CLIReadCommand('influx config-show')
429 def config_show(self
) -> Tuple
[int, str, str]:
431 Show current configuration
433 return 0, json
.dumps(self
.config
, sort_keys
=True), ''
435 @CLIWriteCommand('influx config-set')
436 def config_set(self
, key
: str, value
: str) -> Tuple
[int, str, str]:
438 return -errno
.EINVAL
, '', 'Value should not be empty'
440 self
.log
.debug('Setting configuration option %s to %s', key
, value
)
442 self
.set_module_option(key
, value
)
443 self
.config
[key
] = self
.get_module_option(key
)
444 return 0, 'Configuration option {0} updated'.format(key
), ''
445 except ValueError as e
:
446 return -errno
.EINVAL
, '', str(e
)
448 @CLICommand('influx send')
449 def send(self
) -> Tuple
[int, str, str]:
451 Force sending data to Influx
453 self
.send_to_influx()
454 return 0, 'Sending data to Influx', ''
456 def serve(self
) -> None:
457 if InfluxDBClient
is None:
458 self
.log
.error("Cannot transmit statistics: influxdb python "
459 "module not found. Did you install it?")
462 self
.log
.info('Starting influx module')
463 self
.init_module_config()
466 self
.log
.debug('Starting %d queue worker threads',
467 self
.config
['threads'])
468 for i
in range(cast(int, self
.config
['threads'])):
469 worker
= Thread(target
=self
.queue_worker
, args
=())
470 worker
.setDaemon(True)
472 self
.workers
.append(worker
)
476 self
.send_to_influx()
477 runtime
= time
.time() - start
478 self
.log
.debug('Finished sending data to Influx in %.3f seconds',
480 self
.log
.debug("Sleeping for %d seconds", self
.config
['interval'])
481 self
.event
.wait(cast(float, self
.config
['interval']))