]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import errno |
2 | import json | |
3 | import itertools | |
11fdf7f2 TL |
4 | import socket |
5 | import time | |
6 | from threading import Event | |
7 | ||
8 | from telegraf.basesocket import BaseSocket | |
9 | from telegraf.protocol import Line | |
20effc67 | 10 | from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, PG_STATES |
11fdf7f2 | 11 | |
20effc67 TL |
12 | from typing import cast, Any, Dict, Iterable, Optional, Tuple |
13 | from urllib.parse import urlparse | |
11fdf7f2 TL |
14 | |
15 | ||
16 | class Module(MgrModule): | |
11fdf7f2 | 17 | MODULE_OPTIONS = [ |
20effc67 TL |
18 | Option(name='address', |
19 | default='unixgram:///tmp/telegraf.sock'), | |
20 | Option(name='interval', | |
21 | type='secs', | |
22 | default=15)] | |
11fdf7f2 TL |
23 | |
24 | ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} | |
25 | ||
26 | @property | |
20effc67 | 27 | def config_keys(self) -> Dict[str, OptionValue]: |
11fdf7f2 TL |
28 | return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) |
29 | ||
20effc67 | 30 | def __init__(self, *args: Any, **kwargs: Any) -> None: |
11fdf7f2 TL |
31 | super(Module, self).__init__(*args, **kwargs) |
32 | self.event = Event() | |
33 | self.run = True | |
20effc67 TL |
34 | self.fsid: Optional[str] = None |
35 | self.config: Dict[str, OptionValue] = dict() | |
11fdf7f2 | 36 | |
20effc67 | 37 | def get_fsid(self) -> str: |
11fdf7f2 TL |
38 | if not self.fsid: |
39 | self.fsid = self.get('mon_map')['fsid'] | |
20effc67 | 40 | assert self.fsid is not None |
11fdf7f2 TL |
41 | return self.fsid |
42 | ||
20effc67 | 43 | def get_pool_stats(self) -> Iterable[Dict[str, Any]]: |
11fdf7f2 TL |
44 | df = self.get('df') |
45 | ||
46 | df_types = [ | |
47 | 'bytes_used', | |
48 | 'kb_used', | |
49 | 'dirty', | |
50 | 'rd', | |
51 | 'rd_bytes', | |
52 | 'stored_raw', | |
53 | 'wr', | |
54 | 'wr_bytes', | |
55 | 'objects', | |
56 | 'max_avail', | |
57 | 'quota_objects', | |
58 | 'quota_bytes' | |
59 | ] | |
60 | ||
61 | for df_type in df_types: | |
62 | for pool in df['pools']: | |
63 | yield { | |
64 | 'measurement': 'ceph_pool_stats', | |
65 | 'tags': { | |
66 | 'pool_name': pool['name'], | |
67 | 'pool_id': pool['id'], | |
68 | 'type_instance': df_type, | |
69 | 'fsid': self.get_fsid() | |
70 | }, | |
71 | 'value': pool['stats'][df_type], | |
72 | } | |
73 | ||
20effc67 | 74 | def get_daemon_stats(self) -> Iterable[Dict[str, Any]]: |
aee94f69 | 75 | for daemon, counters in self.get_unlabeled_perf_counters().items(): |
11fdf7f2 TL |
76 | svc_type, svc_id = daemon.split('.', 1) |
77 | metadata = self.get_metadata(svc_type, svc_id) | |
78 | if not metadata: | |
79 | continue | |
80 | ||
81 | for path, counter_info in counters.items(): | |
82 | if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM: | |
83 | continue | |
84 | ||
85 | yield { | |
86 | 'measurement': 'ceph_daemon_stats', | |
87 | 'tags': { | |
88 | 'ceph_daemon': daemon, | |
89 | 'type_instance': path, | |
90 | 'host': metadata['hostname'], | |
91 | 'fsid': self.get_fsid() | |
92 | }, | |
93 | 'value': counter_info['value'] | |
94 | } | |
95 | ||
20effc67 | 96 | def get_pg_stats(self) -> Dict[str, int]: |
11fdf7f2 TL |
97 | stats = dict() |
98 | ||
99 | pg_status = self.get('pg_status') | |
100 | for key in ['bytes_total', 'data_bytes', 'bytes_used', 'bytes_avail', | |
101 | 'num_pgs', 'num_objects', 'num_pools']: | |
102 | stats[key] = pg_status[key] | |
103 | ||
104 | for state in PG_STATES: | |
105 | stats['num_pgs_{0}'.format(state)] = 0 | |
106 | ||
107 | stats['num_pgs'] = pg_status['num_pgs'] | |
108 | for state in pg_status['pgs_by_state']: | |
109 | states = state['state_name'].split('+') | |
110 | for s in PG_STATES: | |
111 | key = 'num_pgs_{0}'.format(s) | |
112 | if s in states: | |
113 | stats[key] += state['count'] | |
114 | ||
115 | return stats | |
116 | ||
20effc67 | 117 | def get_cluster_stats(self) -> Iterable[Dict[str, Any]]: |
11fdf7f2 TL |
118 | stats = dict() |
119 | ||
120 | health = json.loads(self.get('health')['json']) | |
121 | stats['health'] = self.ceph_health_mapping.get(health['status']) | |
122 | ||
123 | mon_status = json.loads(self.get('mon_status')['json']) | |
124 | stats['num_mon'] = len(mon_status['monmap']['mons']) | |
125 | ||
126 | stats['mon_election_epoch'] = mon_status['election_epoch'] | |
127 | stats['mon_outside_quorum'] = len(mon_status['outside_quorum']) | |
128 | stats['mon_quorum'] = len(mon_status['quorum']) | |
129 | ||
130 | osd_map = self.get('osd_map') | |
131 | stats['num_osd'] = len(osd_map['osds']) | |
132 | stats['num_pg_temp'] = len(osd_map['pg_temp']) | |
133 | stats['osd_epoch'] = osd_map['epoch'] | |
134 | ||
135 | mgr_map = self.get('mgr_map') | |
136 | stats['mgr_available'] = int(mgr_map['available']) | |
137 | stats['num_mgr_standby'] = len(mgr_map['standbys']) | |
138 | stats['mgr_epoch'] = mgr_map['epoch'] | |
139 | ||
140 | num_up = 0 | |
141 | num_in = 0 | |
142 | for osd in osd_map['osds']: | |
143 | if osd['up'] == 1: | |
144 | num_up += 1 | |
145 | ||
146 | if osd['in'] == 1: | |
147 | num_in += 1 | |
148 | ||
149 | stats['num_osd_up'] = num_up | |
150 | stats['num_osd_in'] = num_in | |
151 | ||
152 | fs_map = self.get('fs_map') | |
153 | stats['num_mds_standby'] = len(fs_map['standbys']) | |
154 | stats['num_fs'] = len(fs_map['filesystems']) | |
155 | stats['mds_epoch'] = fs_map['epoch'] | |
156 | ||
157 | num_mds_up = 0 | |
158 | for fs in fs_map['filesystems']: | |
159 | num_mds_up += len(fs['mdsmap']['up']) | |
160 | ||
161 | stats['num_mds_up'] = num_mds_up | |
20effc67 | 162 | stats['num_mds'] = num_mds_up + cast(int, stats['num_mds_standby']) |
11fdf7f2 TL |
163 | |
164 | stats.update(self.get_pg_stats()) | |
165 | ||
166 | for key, value in stats.items(): | |
20effc67 | 167 | assert value is not None |
11fdf7f2 TL |
168 | yield { |
169 | 'measurement': 'ceph_cluster_stats', | |
170 | 'tags': { | |
171 | 'type_instance': key, | |
172 | 'fsid': self.get_fsid() | |
173 | }, | |
174 | 'value': int(value) | |
175 | } | |
176 | ||
20effc67 | 177 | def set_config_option(self, option: str, value: str) -> None: |
11fdf7f2 TL |
178 | if option not in self.config_keys.keys(): |
179 | raise RuntimeError('{0} is a unknown configuration ' | |
180 | 'option'.format(option)) | |
181 | ||
20effc67 | 182 | if option == 'interval': |
11fdf7f2 | 183 | try: |
20effc67 | 184 | interval = int(value) |
11fdf7f2 TL |
185 | except (ValueError, TypeError): |
186 | raise RuntimeError('invalid {0} configured. Please specify ' | |
187 | 'a valid integer'.format(option)) | |
20effc67 TL |
188 | if interval < 5: |
189 | raise RuntimeError('interval should be set to at least 5 seconds') | |
190 | self.config[option] = interval | |
191 | else: | |
192 | self.config[option] = value | |
11fdf7f2 | 193 | |
20effc67 | 194 | def init_module_config(self) -> None: |
11fdf7f2 TL |
195 | self.config['address'] = \ |
196 | self.get_module_option("address", default=self.config_keys['address']) | |
20effc67 TL |
197 | interval = self.get_module_option("interval", |
198 | default=self.config_keys['interval']) | |
199 | assert interval | |
200 | self.config['interval'] = int(interval) | |
11fdf7f2 | 201 | |
20effc67 | 202 | def now(self) -> int: |
11fdf7f2 TL |
203 | return int(round(time.time() * 1000000000)) |
204 | ||
20effc67 | 205 | def gather_measurements(self) -> Iterable[Dict[str, Any]]: |
11fdf7f2 TL |
206 | return itertools.chain( |
207 | self.get_pool_stats(), | |
208 | self.get_daemon_stats(), | |
209 | self.get_cluster_stats() | |
210 | ) | |
211 | ||
20effc67 TL |
212 | def send_to_telegraf(self) -> None: |
213 | url = urlparse(cast(str, self.config['address'])) | |
11fdf7f2 TL |
214 | |
215 | sock = BaseSocket(url) | |
216 | self.log.debug('Sending data to Telegraf at %s', sock.address) | |
217 | now = self.now() | |
1911f103 TL |
218 | try: |
219 | with sock as s: | |
11fdf7f2 TL |
220 | for measurement in self.gather_measurements(): |
221 | self.log.debug(measurement) | |
222 | line = Line(measurement['measurement'], | |
223 | measurement['value'], | |
224 | measurement['tags'], now) | |
225 | self.log.debug(line.to_line_protocol()) | |
226 | s.send(line.to_line_protocol()) | |
1911f103 TL |
227 | except (socket.error, RuntimeError, IOError, OSError): |
228 | self.log.exception('Failed to send statistics to Telegraf:') | |
229 | except FileNotFoundError: | |
230 | self.log.exception('Failed to open Telegraf at: %s', url.geturl()) | |
11fdf7f2 | 231 | |
20effc67 | 232 | def shutdown(self) -> None: |
11fdf7f2 TL |
233 | self.log.info('Stopping Telegraf module') |
234 | self.run = False | |
235 | self.event.set() | |
236 | ||
20effc67 TL |
237 | @CLIReadCommand('telegraf config-show') |
238 | def config_show(self) -> Tuple[int, str, str]: | |
239 | """ | |
240 | Show current configuration | |
241 | """ | |
242 | return 0, json.dumps(self.config), '' | |
243 | ||
244 | @CLICommand('telegraf config-set') | |
245 | def config_set(self, key: str, value: str) -> Tuple[int, str, str]: | |
246 | """ | |
247 | Set a configuration value | |
248 | """ | |
249 | if not value: | |
250 | return -errno.EINVAL, '', 'Value should not be empty or None' | |
251 | self.log.debug('Setting configuration option %s to %s', key, value) | |
252 | self.set_config_option(key, value) | |
253 | self.set_module_option(key, value) | |
254 | return 0, 'Configuration option {0} updated'.format(key), '' | |
255 | ||
256 | @CLICommand('telegraf send') | |
257 | def send(self) -> Tuple[int, str, str]: | |
258 | """ | |
259 | Force sending data to Telegraf | |
260 | """ | |
261 | self.send_to_telegraf() | |
262 | return 0, 'Sending data to Telegraf', '' | |
263 | ||
264 | def self_test(self) -> None: | |
11fdf7f2 TL |
265 | measurements = list(self.gather_measurements()) |
266 | if len(measurements) == 0: | |
267 | raise RuntimeError('No measurements found') | |
268 | ||
20effc67 | 269 | def serve(self) -> None: |
11fdf7f2 TL |
270 | self.log.info('Starting Telegraf module') |
271 | self.init_module_config() | |
272 | self.run = True | |
273 | ||
274 | self.log.debug('Waiting 10 seconds before starting') | |
275 | self.event.wait(10) | |
276 | ||
277 | while self.run: | |
278 | start = self.now() | |
279 | self.send_to_telegraf() | |
280 | runtime = (self.now() - start) / 1000000 | |
281 | self.log.debug('Sending data to Telegraf took %d ms', runtime) | |
282 | self.log.debug("Sleeping for %d seconds", self.config['interval']) | |
20effc67 | 283 | self.event.wait(cast(int, self.config['interval'])) |