]>
Commit | Line | Data |
---|---|---|
494da23a | 1 | from contextlib import contextmanager |
3efd9988 | 2 | from datetime import datetime |
11fdf7f2 TL |
3 | from threading import Event, Thread |
4 | from itertools import chain | |
f67539c2 | 5 | import queue |
3efd9988 FG |
6 | import json |
7 | import errno | |
28e407b8 | 8 | import time |
20effc67 | 9 | from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union |
3efd9988 | 10 | |
20effc67 | 11 | from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue |
3efd9988 FG |
12 | |
13 | try: | |
14 | from influxdb import InfluxDBClient | |
15 | from influxdb.exceptions import InfluxDBClientError | |
11fdf7f2 | 16 | from requests.exceptions import RequestException |
3efd9988 FG |
17 | except ImportError: |
18 | InfluxDBClient = None | |
19 | ||
28e407b8 | 20 | |
3efd9988 | 21 | class Module(MgrModule): |
11fdf7f2 | 22 | MODULE_OPTIONS = [ |
20effc67 TL |
23 | Option(name='hostname', |
24 | default=None, | |
25 | desc='InfluxDB server hostname'), | |
26 | Option(name='port', | |
27 | type='int', | |
28 | default=8086, | |
29 | desc='InfluxDB server port'), | |
30 | Option(name='database', | |
31 | default='ceph', | |
32 | desc=('InfluxDB database name. You will need to create this ' | |
33 | 'database and grant write privileges to the configured ' | |
34 | 'username or the username must have admin privileges to ' | |
35 | 'create it.')), | |
36 | Option(name='username', | |
37 | default=None, | |
38 | desc='username of InfluxDB server user'), | |
39 | Option(name='password', | |
40 | default=None, | |
41 | desc='password of InfluxDB server user'), | |
42 | Option(name='interval', | |
43 | type='secs', | |
44 | min=5, | |
45 | default=30, | |
46 | desc='Time between reports to InfluxDB. Default 30 seconds.'), | |
47 | Option(name='ssl', | |
48 | default='false', | |
49 | desc='Use https connection for InfluxDB server. Use "true" or "false".'), | |
50 | Option(name='verify_ssl', | |
51 | default='true', | |
52 | desc='Verify https cert for InfluxDB server. Use "true" or "false".'), | |
53 | Option(name='threads', | |
54 | type='int', | |
55 | min=1, | |
56 | max=32, | |
57 | default=5, | |
58 | desc='How many worker threads should be spawned for sending data to InfluxDB.'), | |
59 | Option(name='batch_size', | |
60 | type='int', | |
61 | default=5000, | |
62 | desc='How big batches of data points should be when sending to InfluxDB.'), | |
11fdf7f2 TL |
63 | ] |
64 | ||
65 | @property | |
20effc67 | 66 | def config_keys(self) -> Dict[str, OptionValue]: |
11fdf7f2 TL |
67 | return dict((o['name'], o.get('default', None)) |
68 | for o in self.MODULE_OPTIONS) | |
69 | ||
3efd9988 | 70 | COMMANDS = [ |
28e407b8 AA |
71 | { |
72 | "cmd": "influx config-set name=key,type=CephString " | |
73 | "name=value,type=CephString", | |
74 | "desc": "Set a configuration value", | |
75 | "perm": "rw" | |
76 | }, | |
77 | { | |
78 | "cmd": "influx config-show", | |
79 | "desc": "Show current configuration", | |
80 | "perm": "r" | |
81 | }, | |
82 | { | |
83 | "cmd": "influx send", | |
84 | "desc": "Force sending data to Influx", | |
85 | "perm": "rw" | |
11fdf7f2 | 86 | } |
3efd9988 FG |
87 | ] |
88 | ||
20effc67 | 89 | def __init__(self, *args: Any, **kwargs: Any) -> None: |
3efd9988 FG |
90 | super(Module, self).__init__(*args, **kwargs) |
91 | self.event = Event() | |
28e407b8 | 92 | self.run = True |
20effc67 TL |
93 | self.config: Dict[str, OptionValue] = dict() |
94 | self.workers: List[Thread] = list() | |
95 | self.queue: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue.Queue(maxsize=100) | |
96 | self.health_checks: Dict[str, Dict[str, Any]] = dict() | |
3efd9988 | 97 | |
20effc67 | 98 | def get_fsid(self) -> str: |
28e407b8 | 99 | return self.get('mon_map')['fsid'] |
3efd9988 | 100 | |
11fdf7f2 | 101 | @staticmethod |
20effc67 | 102 | def can_run() -> Tuple[bool, str]: |
11fdf7f2 TL |
103 | if InfluxDBClient is not None: |
104 | return True, "" | |
105 | else: | |
106 | return False, "influxdb python module not found" | |
107 | ||
108 | @staticmethod | |
20effc67 | 109 | def get_timestamp() -> str: |
11fdf7f2 TL |
110 | return datetime.utcnow().isoformat() + 'Z' |
111 | ||
112 | @staticmethod | |
20effc67 | 113 | def chunk(l: Iterator[Dict[str, str]], n: int) -> Iterator[List[Dict[str, str]]]: |
11fdf7f2 TL |
114 | try: |
115 | while True: | |
116 | xs = [] | |
117 | for _ in range(n): | |
118 | xs.append(next(l)) | |
119 | yield xs | |
120 | except StopIteration: | |
121 | yield xs | |
122 | ||
20effc67 | 123 | def queue_worker(self) -> None: |
11fdf7f2 TL |
124 | while True: |
125 | try: | |
126 | points = self.queue.get() | |
20effc67 | 127 | if not points: |
11fdf7f2 TL |
128 | self.log.debug('Worker shutting down') |
129 | break | |
130 | ||
131 | start = time.time() | |
494da23a TL |
132 | with self.get_influx_client() as client: |
133 | client.write_points(points, time_precision='ms') | |
11fdf7f2 TL |
134 | runtime = time.time() - start |
135 | self.log.debug('Writing points %d to Influx took %.3f seconds', | |
136 | len(points), runtime) | |
137 | except RequestException as e: | |
20effc67 TL |
138 | hostname = self.config['hostname'] |
139 | port = self.config['port'] | |
140 | self.log.exception(f"Failed to connect to Influx host {hostname}:{port}") | |
11fdf7f2 TL |
141 | self.health_checks.update({ |
142 | 'MGR_INFLUX_SEND_FAILED': { | |
143 | 'severity': 'warning', | |
144 | 'summary': 'Failed to send data to InfluxDB server ' | |
20effc67 | 145 | f'at {hostname}:{port} due to an connection error', |
11fdf7f2 TL |
146 | 'detail': [str(e)] |
147 | } | |
148 | }) | |
149 | except InfluxDBClientError as e: | |
150 | self.health_checks.update({ | |
151 | 'MGR_INFLUX_SEND_FAILED': { | |
152 | 'severity': 'warning', | |
153 | 'summary': 'Failed to send data to InfluxDB', | |
154 | 'detail': [str(e)] | |
155 | } | |
156 | }) | |
157 | self.log.exception('Failed to send data to InfluxDB') | |
158 | except queue.Empty: | |
159 | continue | |
160 | except: | |
161 | self.log.exception('Unhandled Exception while sending to Influx') | |
162 | finally: | |
163 | self.queue.task_done() | |
164 | ||
20effc67 | 165 | def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int: |
3efd9988 FG |
166 | data = self.get_counter(daemon_type, daemon_name, stat)[stat] |
167 | if data: | |
168 | return data[-1][1] | |
3efd9988 | 169 | |
28e407b8 | 170 | return 0 |
3efd9988 | 171 | |
20effc67 | 172 | def get_df_stats(self, now) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: |
3efd9988 FG |
173 | df = self.get("df") |
174 | data = [] | |
11fdf7f2 | 175 | pool_info = {} |
28e407b8 | 176 | |
3efd9988 | 177 | df_types = [ |
11fdf7f2 | 178 | 'stored', |
28e407b8 | 179 | 'kb_used', |
3efd9988 | 180 | 'dirty', |
28e407b8 | 181 | 'rd', |
3efd9988 | 182 | 'rd_bytes', |
11fdf7f2 | 183 | 'stored_raw', |
28e407b8 | 184 | 'wr', |
3efd9988 FG |
185 | 'wr_bytes', |
186 | 'objects', | |
28e407b8 AA |
187 | 'max_avail', |
188 | 'quota_objects', | |
189 | 'quota_bytes' | |
3efd9988 | 190 | ] |
11fdf7f2 | 191 | |
3efd9988 FG |
192 | for df_type in df_types: |
193 | for pool in df['pools']: | |
194 | point = { | |
195 | "measurement": "ceph_pool_stats", | |
196 | "tags": { | |
28e407b8 AA |
197 | "pool_name": pool['name'], |
198 | "pool_id": pool['id'], | |
199 | "type_instance": df_type, | |
200 | "fsid": self.get_fsid() | |
3efd9988 | 201 | }, |
28e407b8 AA |
202 | "time": now, |
203 | "fields": { | |
204 | "value": pool['stats'][df_type], | |
205 | } | |
3efd9988 FG |
206 | } |
207 | data.append(point) | |
11fdf7f2 TL |
208 | pool_info.update({str(pool['id']):pool['name']}) |
209 | return data, pool_info | |
210 | ||
20effc67 | 211 | def get_pg_summary_osd(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]: |
11fdf7f2 TL |
212 | pg_sum = self.get('pg_summary') |
213 | osd_sum = pg_sum['by_osd'] | |
f67539c2 | 214 | for osd_id, stats in osd_sum.items(): |
11fdf7f2 TL |
215 | metadata = self.get_metadata('osd', "%s" % osd_id) |
216 | if not metadata: | |
217 | continue | |
3efd9988 | 218 | |
11fdf7f2 TL |
219 | for stat in stats: |
220 | yield { | |
221 | "measurement": "ceph_pg_summary_osd", | |
222 | "tags": { | |
223 | "ceph_daemon": "osd." + str(osd_id), | |
224 | "type_instance": stat, | |
225 | "host": metadata['hostname'] | |
226 | }, | |
227 | "time" : now, | |
228 | "fields" : { | |
229 | "value": stats[stat] | |
230 | } | |
231 | } | |
3efd9988 | 232 | |
20effc67 | 233 | def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]: |
11fdf7f2 | 234 | pool_sum = self.get('pg_summary')['by_pool'] |
f67539c2 | 235 | for pool_id, stats in pool_sum.items(): |
1e59de90 TL |
236 | try: |
237 | pool_name = pool_info[pool_id] | |
238 | except KeyError: | |
239 | self.log.error('Unable to find pool name for pool {}'.format(pool_id)) | |
240 | continue | |
11fdf7f2 TL |
241 | for stat in stats: |
242 | yield { | |
243 | "measurement": "ceph_pg_summary_pool", | |
244 | "tags": { | |
1e59de90 | 245 | "pool_name" : pool_name, |
11fdf7f2 TL |
246 | "pool_id" : pool_id, |
247 | "type_instance" : stat, | |
248 | }, | |
249 | "time" : now, | |
250 | "fields": { | |
251 | "value" : stats[stat], | |
252 | } | |
253 | } | |
28e407b8 | 254 | |
20effc67 | 255 | def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]: |
f67539c2 | 256 | for daemon, counters in self.get_all_perf_counters().items(): |
28e407b8 | 257 | svc_type, svc_id = daemon.split(".", 1) |
3efd9988 | 258 | metadata = self.get_metadata(svc_type, svc_id) |
a4b75251 TL |
259 | if metadata is not None: |
260 | hostname = metadata['hostname'] | |
261 | else: | |
262 | hostname = 'N/A' | |
3efd9988 FG |
263 | |
264 | for path, counter_info in counters.items(): | |
265 | if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM: | |
266 | continue | |
267 | ||
268 | value = counter_info['value'] | |
269 | ||
11fdf7f2 | 270 | yield { |
3efd9988 FG |
271 | "measurement": "ceph_daemon_stats", |
272 | "tags": { | |
273 | "ceph_daemon": daemon, | |
274 | "type_instance": path, | |
a4b75251 | 275 | "host": hostname, |
28e407b8 | 276 | "fsid": self.get_fsid() |
3efd9988 | 277 | }, |
28e407b8 | 278 | "time": now, |
3efd9988 FG |
279 | "fields": { |
280 | "value": value | |
281 | } | |
11fdf7f2 | 282 | } |
3efd9988 | 283 | |
20effc67 | 284 | def init_module_config(self) -> None: |
28e407b8 | 285 | self.config['hostname'] = \ |
11fdf7f2 | 286 | self.get_module_option("hostname", default=self.config_keys['hostname']) |
28e407b8 | 287 | self.config['port'] = \ |
20effc67 | 288 | cast(int, self.get_module_option("port", default=self.config_keys['port'])) |
28e407b8 | 289 | self.config['database'] = \ |
11fdf7f2 | 290 | self.get_module_option("database", default=self.config_keys['database']) |
28e407b8 | 291 | self.config['username'] = \ |
11fdf7f2 | 292 | self.get_module_option("username", default=self.config_keys['username']) |
28e407b8 | 293 | self.config['password'] = \ |
11fdf7f2 | 294 | self.get_module_option("password", default=self.config_keys['password']) |
28e407b8 | 295 | self.config['interval'] = \ |
20effc67 TL |
296 | cast(int, self.get_module_option("interval", |
297 | default=self.config_keys['interval'])) | |
11fdf7f2 | 298 | self.config['threads'] = \ |
20effc67 TL |
299 | cast(int, self.get_module_option("threads", |
300 | default=self.config_keys['threads'])) | |
11fdf7f2 | 301 | self.config['batch_size'] = \ |
20effc67 TL |
302 | cast(int, self.get_module_option("batch_size", |
303 | default=self.config_keys['batch_size'])) | |
304 | ssl = cast(str, self.get_module_option("ssl", default=self.config_keys['ssl'])) | |
28e407b8 AA |
305 | self.config['ssl'] = ssl.lower() == 'true' |
306 | verify_ssl = \ | |
20effc67 | 307 | cast(str, self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl'])) |
28e407b8 AA |
308 | self.config['verify_ssl'] = verify_ssl.lower() == 'true' |
309 | ||
20effc67 | 310 | def gather_statistics(self) -> Iterator[Dict[str, str]]: |
11fdf7f2 TL |
311 | now = self.get_timestamp() |
312 | df_stats, pools = self.get_df_stats(now) | |
313 | return chain(df_stats, self.get_daemon_stats(now), | |
314 | self.get_pg_summary_osd(pools, now), | |
315 | self.get_pg_summary_pool(pools, now)) | |
316 | ||
494da23a | 317 | @contextmanager |
20effc67 | 318 | def get_influx_client(self) -> Iterator['InfluxDBClient']: |
494da23a TL |
319 | client = InfluxDBClient(self.config['hostname'], |
320 | self.config['port'], | |
321 | self.config['username'], | |
322 | self.config['password'], | |
323 | self.config['database'], | |
324 | self.config['ssl'], | |
325 | self.config['verify_ssl']) | |
326 | try: | |
327 | yield client | |
328 | finally: | |
329 | try: | |
330 | client.close() | |
331 | except AttributeError: | |
332 | # influxdb older than v5.0.0 | |
333 | pass | |
11fdf7f2 | 334 | |
20effc67 | 335 | def send_to_influx(self) -> bool: |
28e407b8 AA |
336 | if not self.config['hostname']: |
337 | self.log.error("No Influx server configured, please set one using: " | |
338 | "ceph influx config-set hostname <hostname>") | |
11fdf7f2 | 339 | |
28e407b8 AA |
340 | self.set_health_checks({ |
341 | 'MGR_INFLUX_NO_SERVER': { | |
342 | 'severity': 'warning', | |
343 | 'summary': 'No InfluxDB server configured', | |
344 | 'detail': ['Configuration option hostname not set'] | |
345 | } | |
346 | }) | |
11fdf7f2 TL |
347 | return False |
348 | ||
349 | self.health_checks = dict() | |
3efd9988 | 350 | |
28e407b8 AA |
351 | self.log.debug("Sending data to Influx host: %s", |
352 | self.config['hostname']) | |
3efd9988 | 353 | try: |
494da23a TL |
354 | with self.get_influx_client() as client: |
355 | databases = client.get_list_database() | |
356 | if {'name': self.config['database']} not in databases: | |
357 | self.log.info("Database '%s' not found, trying to create " | |
358 | "(requires admin privs). You can also create " | |
359 | "manually and grant write privs to user " | |
360 | "'%s'", self.config['database'], | |
361 | self.config['database']) | |
362 | client.create_database(self.config['database']) | |
363 | client.create_retention_policy(name='8_weeks', | |
364 | duration='8w', | |
365 | replication='1', | |
366 | default=True, | |
367 | database=self.config['database']) | |
11fdf7f2 TL |
368 | |
369 | self.log.debug('Gathering statistics') | |
370 | points = self.gather_statistics() | |
20effc67 | 371 | for chunk in self.chunk(points, cast(int, self.config['batch_size'])): |
11fdf7f2 TL |
372 | self.queue.put(chunk, block=False) |
373 | ||
374 | self.log.debug('Queue currently contains %d items', | |
375 | self.queue.qsize()) | |
20effc67 | 376 | return True |
11fdf7f2 TL |
377 | except queue.Full: |
378 | self.health_checks.update({ | |
379 | 'MGR_INFLUX_QUEUE_FULL': { | |
380 | 'severity': 'warning', | |
381 | 'summary': 'Failed to chunk to InfluxDB Queue', | |
382 | 'detail': ['Queue is full. InfluxDB might be slow with ' | |
383 | 'processing data'] | |
384 | } | |
385 | }) | |
386 | self.log.error('Queue is full, failed to add chunk') | |
20effc67 | 387 | return False |
11fdf7f2 TL |
388 | except (RequestException, InfluxDBClientError) as e: |
389 | self.health_checks.update({ | |
390 | 'MGR_INFLUX_DB_LIST_FAILED': { | |
391 | 'severity': 'warning', | |
392 | 'summary': 'Failed to list/create InfluxDB database', | |
393 | 'detail': [str(e)] | |
394 | } | |
395 | }) | |
396 | self.log.exception('Failed to list/create InfluxDB database') | |
397 | return False | |
398 | finally: | |
399 | self.set_health_checks(self.health_checks) | |
3efd9988 | 400 | |
20effc67 | 401 | def shutdown(self) -> None: |
3efd9988 FG |
402 | self.log.info('Stopping influx module') |
403 | self.run = False | |
404 | self.event.set() | |
11fdf7f2 | 405 | self.log.debug('Shutting down queue workers') |
3efd9988 | 406 | |
11fdf7f2 | 407 | for _ in self.workers: |
20effc67 | 408 | self.queue.put([]) |
11fdf7f2 TL |
409 | |
410 | self.queue.join() | |
411 | ||
412 | for worker in self.workers: | |
413 | worker.join() | |
414 | ||
20effc67 | 415 | def self_test(self) -> Optional[str]: |
11fdf7f2 TL |
416 | now = self.get_timestamp() |
417 | daemon_stats = list(self.get_daemon_stats(now)) | |
418 | assert len(daemon_stats) | |
419 | df_stats, pools = self.get_df_stats(now) | |
420 | ||
421 | result = { | |
422 | 'daemon_stats': daemon_stats, | |
423 | 'df_stats': df_stats | |
424 | } | |
425 | ||
9f95a23c | 426 | return json.dumps(result, indent=2, sort_keys=True) |
11fdf7f2 | 427 | |
20effc67 TL |
428 | @CLIReadCommand('influx config-show') |
429 | def config_show(self) -> Tuple[int, str, str]: | |
430 | """ | |
431 | Show current configuration | |
432 | """ | |
433 | return 0, json.dumps(self.config, sort_keys=True), '' | |
28e407b8 | 434 | |
20effc67 TL |
435 | @CLIWriteCommand('influx config-set') |
436 | def config_set(self, key: str, value: str) -> Tuple[int, str, str]: | |
437 | if not value: | |
438 | return -errno.EINVAL, '', 'Value should not be empty' | |
3efd9988 | 439 | |
20effc67 TL |
440 | self.log.debug('Setting configuration option %s to %s', key, value) |
441 | try: | |
442 | self.set_module_option(key, value) | |
443 | self.config[key] = self.get_module_option(key) | |
444 | return 0, 'Configuration option {0} updated'.format(key), '' | |
445 | except ValueError as e: | |
446 | return -errno.EINVAL, '', str(e) | |
447 | ||
448 | @CLICommand('influx send') | |
449 | def send(self) -> Tuple[int, str, str]: | |
450 | """ | |
451 | Force sending data to Influx | |
452 | """ | |
453 | self.send_to_influx() | |
454 | return 0, 'Sending data to Influx', '' | |
455 | ||
456 | def serve(self) -> None: | |
3efd9988 FG |
457 | if InfluxDBClient is None: |
458 | self.log.error("Cannot transmit statistics: influxdb python " | |
459 | "module not found. Did you install it?") | |
460 | return | |
461 | ||
462 | self.log.info('Starting influx module') | |
28e407b8 | 463 | self.init_module_config() |
3efd9988 | 464 | self.run = True |
28e407b8 | 465 | |
11fdf7f2 TL |
466 | self.log.debug('Starting %d queue worker threads', |
467 | self.config['threads']) | |
20effc67 | 468 | for i in range(cast(int, self.config['threads'])): |
11fdf7f2 TL |
469 | worker = Thread(target=self.queue_worker, args=()) |
470 | worker.setDaemon(True) | |
471 | worker.start() | |
472 | self.workers.append(worker) | |
473 | ||
3efd9988 | 474 | while self.run: |
28e407b8 | 475 | start = time.time() |
3efd9988 | 476 | self.send_to_influx() |
28e407b8 | 477 | runtime = time.time() - start |
11fdf7f2 | 478 | self.log.debug('Finished sending data to Influx in %.3f seconds', |
28e407b8 AA |
479 | runtime) |
480 | self.log.debug("Sleeping for %d seconds", self.config['interval']) | |
20effc67 | 481 | self.event.wait(cast(float, self.config['interval'])) |