]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/influx/module.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / influx / module.py
1 from contextlib import contextmanager
2 from datetime import datetime
3 from threading import Event, Thread
4 from itertools import chain
5 import queue
6 import json
7 import errno
8 import time
9 from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union
10
11 from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue
12
13 try:
14 from influxdb import InfluxDBClient
15 from influxdb.exceptions import InfluxDBClientError
16 from requests.exceptions import RequestException
17 except ImportError:
18 InfluxDBClient = None
19
20
21 class Module(MgrModule):
22 MODULE_OPTIONS = [
23 Option(name='hostname',
24 default=None,
25 desc='InfluxDB server hostname'),
26 Option(name='port',
27 type='int',
28 default=8086,
29 desc='InfluxDB server port'),
30 Option(name='database',
31 default='ceph',
32 desc=('InfluxDB database name. You will need to create this '
33 'database and grant write privileges to the configured '
34 'username or the username must have admin privileges to '
35 'create it.')),
36 Option(name='username',
37 default=None,
38 desc='username of InfluxDB server user'),
39 Option(name='password',
40 default=None,
41 desc='password of InfluxDB server user'),
42 Option(name='interval',
43 type='secs',
44 min=5,
45 default=30,
46 desc='Time between reports to InfluxDB. Default 30 seconds.'),
47 Option(name='ssl',
48 default='false',
49 desc='Use https connection for InfluxDB server. Use "true" or "false".'),
50 Option(name='verify_ssl',
51 default='true',
52 desc='Verify https cert for InfluxDB server. Use "true" or "false".'),
53 Option(name='threads',
54 type='int',
55 min=1,
56 max=32,
57 default=5,
58 desc='How many worker threads should be spawned for sending data to InfluxDB.'),
59 Option(name='batch_size',
60 type='int',
61 default=5000,
62 desc='How big batches of data points should be when sending to InfluxDB.'),
63 ]
64
65 @property
66 def config_keys(self) -> Dict[str, OptionValue]:
67 return dict((o['name'], o.get('default', None))
68 for o in self.MODULE_OPTIONS)
69
70 COMMANDS = [
71 {
72 "cmd": "influx config-set name=key,type=CephString "
73 "name=value,type=CephString",
74 "desc": "Set a configuration value",
75 "perm": "rw"
76 },
77 {
78 "cmd": "influx config-show",
79 "desc": "Show current configuration",
80 "perm": "r"
81 },
82 {
83 "cmd": "influx send",
84 "desc": "Force sending data to Influx",
85 "perm": "rw"
86 }
87 ]
88
89 def __init__(self, *args: Any, **kwargs: Any) -> None:
90 super(Module, self).__init__(*args, **kwargs)
91 self.event = Event()
92 self.run = True
93 self.config: Dict[str, OptionValue] = dict()
94 self.workers: List[Thread] = list()
95 self.queue: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue.Queue(maxsize=100)
96 self.health_checks: Dict[str, Dict[str, Any]] = dict()
97
98 def get_fsid(self) -> str:
99 return self.get('mon_map')['fsid']
100
101 @staticmethod
102 def can_run() -> Tuple[bool, str]:
103 if InfluxDBClient is not None:
104 return True, ""
105 else:
106 return False, "influxdb python module not found"
107
108 @staticmethod
109 def get_timestamp() -> str:
110 return datetime.utcnow().isoformat() + 'Z'
111
112 @staticmethod
113 def chunk(l: Iterator[Dict[str, str]], n: int) -> Iterator[List[Dict[str, str]]]:
114 try:
115 while True:
116 xs = []
117 for _ in range(n):
118 xs.append(next(l))
119 yield xs
120 except StopIteration:
121 yield xs
122
123 def queue_worker(self) -> None:
124 while True:
125 try:
126 points = self.queue.get()
127 if not points:
128 self.log.debug('Worker shutting down')
129 break
130
131 start = time.time()
132 with self.get_influx_client() as client:
133 client.write_points(points, time_precision='ms')
134 runtime = time.time() - start
135 self.log.debug('Writing points %d to Influx took %.3f seconds',
136 len(points), runtime)
137 except RequestException as e:
138 hostname = self.config['hostname']
139 port = self.config['port']
140 self.log.exception(f"Failed to connect to Influx host {hostname}:{port}")
141 self.health_checks.update({
142 'MGR_INFLUX_SEND_FAILED': {
143 'severity': 'warning',
144 'summary': 'Failed to send data to InfluxDB server '
145 f'at {hostname}:{port} due to an connection error',
146 'detail': [str(e)]
147 }
148 })
149 except InfluxDBClientError as e:
150 self.health_checks.update({
151 'MGR_INFLUX_SEND_FAILED': {
152 'severity': 'warning',
153 'summary': 'Failed to send data to InfluxDB',
154 'detail': [str(e)]
155 }
156 })
157 self.log.exception('Failed to send data to InfluxDB')
158 except queue.Empty:
159 continue
160 except:
161 self.log.exception('Unhandled Exception while sending to Influx')
162 finally:
163 self.queue.task_done()
164
165 def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
166 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
167 if data:
168 return data[-1][1]
169
170 return 0
171
172 def get_df_stats(self, now) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
173 df = self.get("df")
174 data = []
175 pool_info = {}
176
177 df_types = [
178 'stored',
179 'kb_used',
180 'dirty',
181 'rd',
182 'rd_bytes',
183 'stored_raw',
184 'wr',
185 'wr_bytes',
186 'objects',
187 'max_avail',
188 'quota_objects',
189 'quota_bytes'
190 ]
191
192 for df_type in df_types:
193 for pool in df['pools']:
194 point = {
195 "measurement": "ceph_pool_stats",
196 "tags": {
197 "pool_name": pool['name'],
198 "pool_id": pool['id'],
199 "type_instance": df_type,
200 "fsid": self.get_fsid()
201 },
202 "time": now,
203 "fields": {
204 "value": pool['stats'][df_type],
205 }
206 }
207 data.append(point)
208 pool_info.update({str(pool['id']):pool['name']})
209 return data, pool_info
210
211 def get_pg_summary_osd(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
212 pg_sum = self.get('pg_summary')
213 osd_sum = pg_sum['by_osd']
214 for osd_id, stats in osd_sum.items():
215 metadata = self.get_metadata('osd', "%s" % osd_id)
216 if not metadata:
217 continue
218
219 for stat in stats:
220 yield {
221 "measurement": "ceph_pg_summary_osd",
222 "tags": {
223 "ceph_daemon": "osd." + str(osd_id),
224 "type_instance": stat,
225 "host": metadata['hostname']
226 },
227 "time" : now,
228 "fields" : {
229 "value": stats[stat]
230 }
231 }
232
233 def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
234 pool_sum = self.get('pg_summary')['by_pool']
235 for pool_id, stats in pool_sum.items():
236 try:
237 pool_name = pool_info[pool_id]
238 except KeyError:
239 self.log.error('Unable to find pool name for pool {}'.format(pool_id))
240 continue
241 for stat in stats:
242 yield {
243 "measurement": "ceph_pg_summary_pool",
244 "tags": {
245 "pool_name" : pool_name,
246 "pool_id" : pool_id,
247 "type_instance" : stat,
248 },
249 "time" : now,
250 "fields": {
251 "value" : stats[stat],
252 }
253 }
254
255 def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]:
256 for daemon, counters in self.get_unlabeled_perf_counters().items():
257 svc_type, svc_id = daemon.split(".", 1)
258 metadata = self.get_metadata(svc_type, svc_id)
259 if metadata is not None:
260 hostname = metadata['hostname']
261 else:
262 hostname = 'N/A'
263
264 for path, counter_info in counters.items():
265 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
266 continue
267
268 value = counter_info['value']
269
270 yield {
271 "measurement": "ceph_daemon_stats",
272 "tags": {
273 "ceph_daemon": daemon,
274 "type_instance": path,
275 "host": hostname,
276 "fsid": self.get_fsid()
277 },
278 "time": now,
279 "fields": {
280 "value": value
281 }
282 }
283
284 def init_module_config(self) -> None:
285 self.config['hostname'] = \
286 self.get_module_option("hostname", default=self.config_keys['hostname'])
287 self.config['port'] = \
288 cast(int, self.get_module_option("port", default=self.config_keys['port']))
289 self.config['database'] = \
290 self.get_module_option("database", default=self.config_keys['database'])
291 self.config['username'] = \
292 self.get_module_option("username", default=self.config_keys['username'])
293 self.config['password'] = \
294 self.get_module_option("password", default=self.config_keys['password'])
295 self.config['interval'] = \
296 cast(int, self.get_module_option("interval",
297 default=self.config_keys['interval']))
298 self.config['threads'] = \
299 cast(int, self.get_module_option("threads",
300 default=self.config_keys['threads']))
301 self.config['batch_size'] = \
302 cast(int, self.get_module_option("batch_size",
303 default=self.config_keys['batch_size']))
304 ssl = cast(str, self.get_module_option("ssl", default=self.config_keys['ssl']))
305 self.config['ssl'] = ssl.lower() == 'true'
306 verify_ssl = \
307 cast(str, self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']))
308 self.config['verify_ssl'] = verify_ssl.lower() == 'true'
309
310 def gather_statistics(self) -> Iterator[Dict[str, str]]:
311 now = self.get_timestamp()
312 df_stats, pools = self.get_df_stats(now)
313 return chain(df_stats, self.get_daemon_stats(now),
314 self.get_pg_summary_osd(pools, now),
315 self.get_pg_summary_pool(pools, now))
316
317 @contextmanager
318 def get_influx_client(self) -> Iterator['InfluxDBClient']:
319 client = InfluxDBClient(self.config['hostname'],
320 self.config['port'],
321 self.config['username'],
322 self.config['password'],
323 self.config['database'],
324 self.config['ssl'],
325 self.config['verify_ssl'])
326 try:
327 yield client
328 finally:
329 try:
330 client.close()
331 except AttributeError:
332 # influxdb older than v5.0.0
333 pass
334
335 def send_to_influx(self) -> bool:
336 if not self.config['hostname']:
337 self.log.error("No Influx server configured, please set one using: "
338 "ceph influx config-set hostname <hostname>")
339
340 self.set_health_checks({
341 'MGR_INFLUX_NO_SERVER': {
342 'severity': 'warning',
343 'summary': 'No InfluxDB server configured',
344 'detail': ['Configuration option hostname not set']
345 }
346 })
347 return False
348
349 self.health_checks = dict()
350
351 self.log.debug("Sending data to Influx host: %s",
352 self.config['hostname'])
353 try:
354 with self.get_influx_client() as client:
355 databases = client.get_list_database()
356 if {'name': self.config['database']} not in databases:
357 self.log.info("Database '%s' not found, trying to create "
358 "(requires admin privs). You can also create "
359 "manually and grant write privs to user "
360 "'%s'", self.config['database'],
361 self.config['database'])
362 client.create_database(self.config['database'])
363 client.create_retention_policy(name='8_weeks',
364 duration='8w',
365 replication='1',
366 default=True,
367 database=self.config['database'])
368
369 self.log.debug('Gathering statistics')
370 points = self.gather_statistics()
371 for chunk in self.chunk(points, cast(int, self.config['batch_size'])):
372 self.queue.put(chunk, block=False)
373
374 self.log.debug('Queue currently contains %d items',
375 self.queue.qsize())
376 return True
377 except queue.Full:
378 self.health_checks.update({
379 'MGR_INFLUX_QUEUE_FULL': {
380 'severity': 'warning',
381 'summary': 'Failed to chunk to InfluxDB Queue',
382 'detail': ['Queue is full. InfluxDB might be slow with '
383 'processing data']
384 }
385 })
386 self.log.error('Queue is full, failed to add chunk')
387 return False
388 except (RequestException, InfluxDBClientError) as e:
389 self.health_checks.update({
390 'MGR_INFLUX_DB_LIST_FAILED': {
391 'severity': 'warning',
392 'summary': 'Failed to list/create InfluxDB database',
393 'detail': [str(e)]
394 }
395 })
396 self.log.exception('Failed to list/create InfluxDB database')
397 return False
398 finally:
399 self.set_health_checks(self.health_checks)
400
401 def shutdown(self) -> None:
402 self.log.info('Stopping influx module')
403 self.run = False
404 self.event.set()
405 self.log.debug('Shutting down queue workers')
406
407 for _ in self.workers:
408 self.queue.put([])
409
410 self.queue.join()
411
412 for worker in self.workers:
413 worker.join()
414
415 def self_test(self) -> Optional[str]:
416 now = self.get_timestamp()
417 daemon_stats = list(self.get_daemon_stats(now))
418 assert len(daemon_stats)
419 df_stats, pools = self.get_df_stats(now)
420
421 result = {
422 'daemon_stats': daemon_stats,
423 'df_stats': df_stats
424 }
425
426 return json.dumps(result, indent=2, sort_keys=True)
427
428 @CLIReadCommand('influx config-show')
429 def config_show(self) -> Tuple[int, str, str]:
430 """
431 Show current configuration
432 """
433 return 0, json.dumps(self.config, sort_keys=True), ''
434
435 @CLIWriteCommand('influx config-set')
436 def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
437 if not value:
438 return -errno.EINVAL, '', 'Value should not be empty'
439
440 self.log.debug('Setting configuration option %s to %s', key, value)
441 try:
442 self.set_module_option(key, value)
443 self.config[key] = self.get_module_option(key)
444 return 0, 'Configuration option {0} updated'.format(key), ''
445 except ValueError as e:
446 return -errno.EINVAL, '', str(e)
447
448 @CLICommand('influx send')
449 def send(self) -> Tuple[int, str, str]:
450 """
451 Force sending data to Influx
452 """
453 self.send_to_influx()
454 return 0, 'Sending data to Influx', ''
455
456 def serve(self) -> None:
457 if InfluxDBClient is None:
458 self.log.error("Cannot transmit statistics: influxdb python "
459 "module not found. Did you install it?")
460 return
461
462 self.log.info('Starting influx module')
463 self.init_module_config()
464 self.run = True
465
466 self.log.debug('Starting %d queue worker threads',
467 self.config['threads'])
468 for i in range(cast(int, self.config['threads'])):
469 worker = Thread(target=self.queue_worker, args=())
470 worker.setDaemon(True)
471 worker.start()
472 self.workers.append(worker)
473
474 while self.run:
475 start = time.time()
476 self.send_to_influx()
477 runtime = time.time() - start
478 self.log.debug('Finished sending data to Influx in %.3f seconds',
479 runtime)
480 self.log.debug("Sleeping for %d seconds", self.config['interval'])
481 self.event.wait(cast(float, self.config['interval']))