]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/telegraf/module.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / telegraf / module.py
CommitLineData
11fdf7f2
TL
1import errno
2import json
3import itertools
11fdf7f2
TL
4import socket
5import time
6from threading import Event
7
8from telegraf.basesocket import BaseSocket
9from telegraf.protocol import Line
20effc67 10from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, PG_STATES
11fdf7f2 11
20effc67
TL
12from typing import cast, Any, Dict, Iterable, Optional, Tuple
13from urllib.parse import urlparse
11fdf7f2
TL
14
15
16class Module(MgrModule):
11fdf7f2 17 MODULE_OPTIONS = [
20effc67
TL
18 Option(name='address',
19 default='unixgram:///tmp/telegraf.sock'),
20 Option(name='interval',
21 type='secs',
22 default=15)]
11fdf7f2
TL
23
24 ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
25
26 @property
20effc67 27 def config_keys(self) -> Dict[str, OptionValue]:
11fdf7f2
TL
28 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
29
20effc67 30 def __init__(self, *args: Any, **kwargs: Any) -> None:
11fdf7f2
TL
31 super(Module, self).__init__(*args, **kwargs)
32 self.event = Event()
33 self.run = True
20effc67
TL
34 self.fsid: Optional[str] = None
35 self.config: Dict[str, OptionValue] = dict()
11fdf7f2 36
20effc67 37 def get_fsid(self) -> str:
11fdf7f2
TL
38 if not self.fsid:
39 self.fsid = self.get('mon_map')['fsid']
20effc67 40 assert self.fsid is not None
11fdf7f2
TL
41 return self.fsid
42
20effc67 43 def get_pool_stats(self) -> Iterable[Dict[str, Any]]:
11fdf7f2
TL
44 df = self.get('df')
45
46 df_types = [
47 'bytes_used',
48 'kb_used',
49 'dirty',
50 'rd',
51 'rd_bytes',
52 'stored_raw',
53 'wr',
54 'wr_bytes',
55 'objects',
56 'max_avail',
57 'quota_objects',
58 'quota_bytes'
59 ]
60
61 for df_type in df_types:
62 for pool in df['pools']:
63 yield {
64 'measurement': 'ceph_pool_stats',
65 'tags': {
66 'pool_name': pool['name'],
67 'pool_id': pool['id'],
68 'type_instance': df_type,
69 'fsid': self.get_fsid()
70 },
71 'value': pool['stats'][df_type],
72 }
73
20effc67 74 def get_daemon_stats(self) -> Iterable[Dict[str, Any]]:
aee94f69 75 for daemon, counters in self.get_unlabeled_perf_counters().items():
11fdf7f2
TL
76 svc_type, svc_id = daemon.split('.', 1)
77 metadata = self.get_metadata(svc_type, svc_id)
78 if not metadata:
79 continue
80
81 for path, counter_info in counters.items():
82 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
83 continue
84
85 yield {
86 'measurement': 'ceph_daemon_stats',
87 'tags': {
88 'ceph_daemon': daemon,
89 'type_instance': path,
90 'host': metadata['hostname'],
91 'fsid': self.get_fsid()
92 },
93 'value': counter_info['value']
94 }
95
20effc67 96 def get_pg_stats(self) -> Dict[str, int]:
11fdf7f2
TL
97 stats = dict()
98
99 pg_status = self.get('pg_status')
100 for key in ['bytes_total', 'data_bytes', 'bytes_used', 'bytes_avail',
101 'num_pgs', 'num_objects', 'num_pools']:
102 stats[key] = pg_status[key]
103
104 for state in PG_STATES:
105 stats['num_pgs_{0}'.format(state)] = 0
106
107 stats['num_pgs'] = pg_status['num_pgs']
108 for state in pg_status['pgs_by_state']:
109 states = state['state_name'].split('+')
110 for s in PG_STATES:
111 key = 'num_pgs_{0}'.format(s)
112 if s in states:
113 stats[key] += state['count']
114
115 return stats
116
20effc67 117 def get_cluster_stats(self) -> Iterable[Dict[str, Any]]:
11fdf7f2
TL
118 stats = dict()
119
120 health = json.loads(self.get('health')['json'])
121 stats['health'] = self.ceph_health_mapping.get(health['status'])
122
123 mon_status = json.loads(self.get('mon_status')['json'])
124 stats['num_mon'] = len(mon_status['monmap']['mons'])
125
126 stats['mon_election_epoch'] = mon_status['election_epoch']
127 stats['mon_outside_quorum'] = len(mon_status['outside_quorum'])
128 stats['mon_quorum'] = len(mon_status['quorum'])
129
130 osd_map = self.get('osd_map')
131 stats['num_osd'] = len(osd_map['osds'])
132 stats['num_pg_temp'] = len(osd_map['pg_temp'])
133 stats['osd_epoch'] = osd_map['epoch']
134
135 mgr_map = self.get('mgr_map')
136 stats['mgr_available'] = int(mgr_map['available'])
137 stats['num_mgr_standby'] = len(mgr_map['standbys'])
138 stats['mgr_epoch'] = mgr_map['epoch']
139
140 num_up = 0
141 num_in = 0
142 for osd in osd_map['osds']:
143 if osd['up'] == 1:
144 num_up += 1
145
146 if osd['in'] == 1:
147 num_in += 1
148
149 stats['num_osd_up'] = num_up
150 stats['num_osd_in'] = num_in
151
152 fs_map = self.get('fs_map')
153 stats['num_mds_standby'] = len(fs_map['standbys'])
154 stats['num_fs'] = len(fs_map['filesystems'])
155 stats['mds_epoch'] = fs_map['epoch']
156
157 num_mds_up = 0
158 for fs in fs_map['filesystems']:
159 num_mds_up += len(fs['mdsmap']['up'])
160
161 stats['num_mds_up'] = num_mds_up
20effc67 162 stats['num_mds'] = num_mds_up + cast(int, stats['num_mds_standby'])
11fdf7f2
TL
163
164 stats.update(self.get_pg_stats())
165
166 for key, value in stats.items():
20effc67 167 assert value is not None
11fdf7f2
TL
168 yield {
169 'measurement': 'ceph_cluster_stats',
170 'tags': {
171 'type_instance': key,
172 'fsid': self.get_fsid()
173 },
174 'value': int(value)
175 }
176
20effc67 177 def set_config_option(self, option: str, value: str) -> None:
11fdf7f2
TL
178 if option not in self.config_keys.keys():
179 raise RuntimeError('{0} is a unknown configuration '
180 'option'.format(option))
181
20effc67 182 if option == 'interval':
11fdf7f2 183 try:
20effc67 184 interval = int(value)
11fdf7f2
TL
185 except (ValueError, TypeError):
186 raise RuntimeError('invalid {0} configured. Please specify '
187 'a valid integer'.format(option))
20effc67
TL
188 if interval < 5:
189 raise RuntimeError('interval should be set to at least 5 seconds')
190 self.config[option] = interval
191 else:
192 self.config[option] = value
11fdf7f2 193
20effc67 194 def init_module_config(self) -> None:
11fdf7f2
TL
195 self.config['address'] = \
196 self.get_module_option("address", default=self.config_keys['address'])
20effc67
TL
197 interval = self.get_module_option("interval",
198 default=self.config_keys['interval'])
199 assert interval
200 self.config['interval'] = int(interval)
11fdf7f2 201
20effc67 202 def now(self) -> int:
11fdf7f2
TL
203 return int(round(time.time() * 1000000000))
204
20effc67 205 def gather_measurements(self) -> Iterable[Dict[str, Any]]:
11fdf7f2
TL
206 return itertools.chain(
207 self.get_pool_stats(),
208 self.get_daemon_stats(),
209 self.get_cluster_stats()
210 )
211
20effc67
TL
212 def send_to_telegraf(self) -> None:
213 url = urlparse(cast(str, self.config['address']))
11fdf7f2
TL
214
215 sock = BaseSocket(url)
216 self.log.debug('Sending data to Telegraf at %s', sock.address)
217 now = self.now()
1911f103
TL
218 try:
219 with sock as s:
11fdf7f2
TL
220 for measurement in self.gather_measurements():
221 self.log.debug(measurement)
222 line = Line(measurement['measurement'],
223 measurement['value'],
224 measurement['tags'], now)
225 self.log.debug(line.to_line_protocol())
226 s.send(line.to_line_protocol())
1911f103
TL
227 except (socket.error, RuntimeError, IOError, OSError):
228 self.log.exception('Failed to send statistics to Telegraf:')
229 except FileNotFoundError:
230 self.log.exception('Failed to open Telegraf at: %s', url.geturl())
11fdf7f2 231
20effc67 232 def shutdown(self) -> None:
11fdf7f2
TL
233 self.log.info('Stopping Telegraf module')
234 self.run = False
235 self.event.set()
236
20effc67
TL
237 @CLIReadCommand('telegraf config-show')
238 def config_show(self) -> Tuple[int, str, str]:
239 """
240 Show current configuration
241 """
242 return 0, json.dumps(self.config), ''
243
244 @CLICommand('telegraf config-set')
245 def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
246 """
247 Set a configuration value
248 """
249 if not value:
250 return -errno.EINVAL, '', 'Value should not be empty or None'
251 self.log.debug('Setting configuration option %s to %s', key, value)
252 self.set_config_option(key, value)
253 self.set_module_option(key, value)
254 return 0, 'Configuration option {0} updated'.format(key), ''
255
256 @CLICommand('telegraf send')
257 def send(self) -> Tuple[int, str, str]:
258 """
259 Force sending data to Telegraf
260 """
261 self.send_to_telegraf()
262 return 0, 'Sending data to Telegraf', ''
263
264 def self_test(self) -> None:
11fdf7f2
TL
265 measurements = list(self.gather_measurements())
266 if len(measurements) == 0:
267 raise RuntimeError('No measurements found')
268
20effc67 269 def serve(self) -> None:
11fdf7f2
TL
270 self.log.info('Starting Telegraf module')
271 self.init_module_config()
272 self.run = True
273
274 self.log.debug('Waiting 10 seconds before starting')
275 self.event.wait(10)
276
277 while self.run:
278 start = self.now()
279 self.send_to_telegraf()
280 runtime = (self.now() - start) / 1000000
281 self.log.debug('Sending data to Telegraf took %d ms', runtime)
282 self.log.debug("Sleeping for %d seconds", self.config['interval'])
20effc67 283 self.event.wait(cast(int, self.config['interval']))