]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/influx/module.py
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / mgr / influx / module.py
1 from datetime import datetime
2 from threading import Event
3 import json
4 import errno
5 import time
6
7 from mgr_module import MgrModule
8
9 try:
10 from influxdb import InfluxDBClient
11 from influxdb.exceptions import InfluxDBClientError
12 from requests.exceptions import ConnectionError
13 except ImportError:
14 InfluxDBClient = None
15
16
17 class Module(MgrModule):
18 COMMANDS = [
19 {
20 "cmd": "influx config-set name=key,type=CephString "
21 "name=value,type=CephString",
22 "desc": "Set a configuration value",
23 "perm": "rw"
24 },
25 {
26 "cmd": "influx config-show",
27 "desc": "Show current configuration",
28 "perm": "r"
29 },
30 {
31 "cmd": "influx send",
32 "desc": "Force sending data to Influx",
33 "perm": "rw"
34 },
35 {
36 "cmd": "influx self-test",
37 "desc": "debug the module",
38 "perm": "rw"
39 },
40 ]
41
42 config_keys = {
43 'hostname': None,
44 'port': 8086,
45 'database': 'ceph',
46 'username': None,
47 'password': None,
48 'interval': 5,
49 'ssl': 'false',
50 'verify_ssl': 'true'
51 }
52
53 def __init__(self, *args, **kwargs):
54 super(Module, self).__init__(*args, **kwargs)
55 self.event = Event()
56 self.run = True
57 self.config = dict()
58
59 def get_fsid(self):
60 return self.get('mon_map')['fsid']
61
62 def get_latest(self, daemon_type, daemon_name, stat):
63 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
64 if data:
65 return data[-1][1]
66
67 return 0
68
69 def get_df_stats(self):
70 df = self.get("df")
71 data = []
72
73 now = datetime.utcnow().isoformat() + 'Z'
74
75 df_types = [
76 'bytes_used',
77 'kb_used',
78 'dirty',
79 'rd',
80 'rd_bytes',
81 'raw_bytes_used',
82 'wr',
83 'wr_bytes',
84 'objects',
85 'max_avail',
86 'quota_objects',
87 'quota_bytes'
88 ]
89
90 for df_type in df_types:
91 for pool in df['pools']:
92 point = {
93 "measurement": "ceph_pool_stats",
94 "tags": {
95 "pool_name": pool['name'],
96 "pool_id": pool['id'],
97 "type_instance": df_type,
98 "fsid": self.get_fsid()
99 },
100 "time": now,
101 "fields": {
102 "value": pool['stats'][df_type],
103 }
104 }
105 data.append(point)
106 return data
107
108 def get_daemon_stats(self):
109 data = []
110
111 now = datetime.utcnow().isoformat() + 'Z'
112
113 for daemon, counters in self.get_all_perf_counters().iteritems():
114 svc_type, svc_id = daemon.split(".", 1)
115 metadata = self.get_metadata(svc_type, svc_id)
116
117 for path, counter_info in counters.items():
118 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
119 continue
120
121 value = counter_info['value']
122
123 data.append({
124 "measurement": "ceph_daemon_stats",
125 "tags": {
126 "ceph_daemon": daemon,
127 "type_instance": path,
128 "host": metadata['hostname'],
129 "fsid": self.get_fsid()
130 },
131 "time": now,
132 "fields": {
133 "value": value
134 }
135 })
136
137 return data
138
139 def set_config_option(self, option, value):
140 if option not in self.config_keys.keys():
141 raise RuntimeError('{0} is a unknown configuration '
142 'option'.format(option))
143
144 if option in ['port', 'interval']:
145 try:
146 value = int(value)
147 except (ValueError, TypeError):
148 raise RuntimeError('invalid {0} configured. Please specify '
149 'a valid integer'.format(option))
150
151 if option == 'interval' and value < 5:
152 raise RuntimeError('interval should be set to at least 5 seconds')
153
154 if option in ['ssl', 'verify_ssl']:
155 value = value.lower() == 'true'
156
157 self.config[option] = value
158
159 def init_module_config(self):
160 self.config['hostname'] = \
161 self.get_config("hostname", default=self.config_keys['hostname'])
162 self.config['port'] = \
163 int(self.get_config("port", default=self.config_keys['port']))
164 self.config['database'] = \
165 self.get_config("database", default=self.config_keys['database'])
166 self.config['username'] = \
167 self.get_config("username", default=self.config_keys['username'])
168 self.config['password'] = \
169 self.get_config("password", default=self.config_keys['password'])
170 self.config['interval'] = \
171 int(self.get_config("interval",
172 default=self.config_keys['interval']))
173 ssl = self.get_config("ssl", default=self.config_keys['ssl'])
174 self.config['ssl'] = ssl.lower() == 'true'
175 verify_ssl = \
176 self.get_config("verify_ssl", default=self.config_keys['verify_ssl'])
177 self.config['verify_ssl'] = verify_ssl.lower() == 'true'
178
179 def send_to_influx(self):
180 if not self.config['hostname']:
181 self.log.error("No Influx server configured, please set one using: "
182 "ceph influx config-set hostname <hostname>")
183 self.set_health_checks({
184 'MGR_INFLUX_NO_SERVER': {
185 'severity': 'warning',
186 'summary': 'No InfluxDB server configured',
187 'detail': ['Configuration option hostname not set']
188 }
189 })
190 return
191
192 # If influx server has authentication turned off then
193 # missing username/password is valid.
194 self.log.debug("Sending data to Influx host: %s",
195 self.config['hostname'])
196 client = InfluxDBClient(self.config['hostname'], self.config['port'],
197 self.config['username'],
198 self.config['password'],
199 self.config['database'],
200 self.config['ssl'],
201 self.config['verify_ssl'])
202
203 # using influx client get_list_database requires admin privs,
204 # instead we'll catch the not found exception and inform the user if
205 # db can not be created
206 try:
207 client.write_points(self.get_df_stats(), 'ms')
208 client.write_points(self.get_daemon_stats(), 'ms')
209 self.set_health_checks(dict())
210 except ConnectionError as e:
211 self.log.exception("Failed to connect to Influx host %s:%d",
212 self.config['hostname'], self.config['port'])
213 self.set_health_checks({
214 'MGR_INFLUX_SEND_FAILED': {
215 'severity': 'warning',
216 'summary': 'Failed to send data to InfluxDB server at %s:%d'
217 ' due to an connection error'
218 % (self.config['hostname'], self.config['port']),
219 'detail': [str(e)]
220 }
221 })
222 except InfluxDBClientError as e:
223 if e.code == 404:
224 self.log.info("Database '%s' not found, trying to create "
225 "(requires admin privs). You can also create "
226 "manually and grant write privs to user "
227 "'%s'", self.config['database'],
228 self.config['username'])
229 client.create_database(self.config['database'])
230 else:
231 self.set_health_checks({
232 'MGR_INFLUX_SEND_FAILED': {
233 'severity': 'warning',
234 'summary': 'Failed to send data to InfluxDB',
235 'detail': [str(e)]
236 }
237 })
238 raise
239
240 def shutdown(self):
241 self.log.info('Stopping influx module')
242 self.run = False
243 self.event.set()
244
245 def handle_command(self, cmd):
246 if cmd['prefix'] == 'influx config-show':
247 return 0, json.dumps(self.config), ''
248 elif cmd['prefix'] == 'influx config-set':
249 key = cmd['key']
250 value = cmd['value']
251 if not value:
252 return -errno.EINVAL, '', 'Value should not be empty or None'
253
254 self.log.debug('Setting configuration option %s to %s', key, value)
255 self.set_config_option(key, value)
256 self.set_config(key, value)
257 return 0, 'Configuration option {0} updated'.format(key), ''
258 elif cmd['prefix'] == 'influx send':
259 self.send_to_influx()
260 return 0, 'Sending data to Influx', ''
261 if cmd['prefix'] == 'influx self-test':
262 daemon_stats = self.get_daemon_stats()
263 assert len(daemon_stats)
264 df_stats = self.get_df_stats()
265
266 result = {
267 'daemon_stats': daemon_stats,
268 'df_stats': df_stats
269 }
270
271 return 0, json.dumps(result, indent=2), 'Self-test OK'
272
273 return (-errno.EINVAL, '',
274 "Command not found '{0}'".format(cmd['prefix']))
275
276 def serve(self):
277 if InfluxDBClient is None:
278 self.log.error("Cannot transmit statistics: influxdb python "
279 "module not found. Did you install it?")
280 return
281
282 self.log.info('Starting influx module')
283 self.init_module_config()
284 self.run = True
285
286 while self.run:
287 start = time.time()
288 self.send_to_influx()
289 runtime = time.time() - start
290 self.log.debug('Finished sending data in Influx in %.3f seconds',
291 runtime)
292 self.log.debug("Sleeping for %d seconds", self.config['interval'])
293 self.event.wait(self.config['interval'])