]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/influx/module.py
update sources to 12.2.8
[ceph.git] / ceph / src / pybind / mgr / influx / module.py
1 from datetime import datetime
2 from threading import Event
3 import json
4 import errno
5 import six
6 import time
7
8 from mgr_module import MgrModule
9
10 try:
11 from influxdb import InfluxDBClient
12 from influxdb.exceptions import InfluxDBClientError
13 from requests.exceptions import ConnectionError
14 except ImportError:
15 InfluxDBClient = None
16
17
18 class Module(MgrModule):
19 COMMANDS = [
20 {
21 "cmd": "influx config-set name=key,type=CephString "
22 "name=value,type=CephString",
23 "desc": "Set a configuration value",
24 "perm": "rw"
25 },
26 {
27 "cmd": "influx config-show",
28 "desc": "Show current configuration",
29 "perm": "r"
30 },
31 {
32 "cmd": "influx send",
33 "desc": "Force sending data to Influx",
34 "perm": "rw"
35 },
36 {
37 "cmd": "influx self-test",
38 "desc": "debug the module",
39 "perm": "rw"
40 },
41 ]
42
43 config_keys = {
44 'hostname': None,
45 'port': 8086,
46 'database': 'ceph',
47 'username': None,
48 'password': None,
49 'interval': 5,
50 'ssl': 'false',
51 'verify_ssl': 'true'
52 }
53
54 def __init__(self, *args, **kwargs):
55 super(Module, self).__init__(*args, **kwargs)
56 self.event = Event()
57 self.run = True
58 self.config = dict()
59
60 def get_fsid(self):
61 return self.get('mon_map')['fsid']
62
63 def get_latest(self, daemon_type, daemon_name, stat):
64 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
65 if data:
66 return data[-1][1]
67
68 return 0
69
70 def get_df_stats(self):
71 df = self.get("df")
72 data = []
73
74 now = datetime.utcnow().isoformat() + 'Z'
75
76 df_types = [
77 'bytes_used',
78 'kb_used',
79 'dirty',
80 'rd',
81 'rd_bytes',
82 'raw_bytes_used',
83 'wr',
84 'wr_bytes',
85 'objects',
86 'max_avail',
87 'quota_objects',
88 'quota_bytes'
89 ]
90
91 for df_type in df_types:
92 for pool in df['pools']:
93 point = {
94 "measurement": "ceph_pool_stats",
95 "tags": {
96 "pool_name": pool['name'],
97 "pool_id": pool['id'],
98 "type_instance": df_type,
99 "fsid": self.get_fsid()
100 },
101 "time": now,
102 "fields": {
103 "value": pool['stats'][df_type],
104 }
105 }
106 data.append(point)
107 return data
108
109 def get_daemon_stats(self):
110 data = []
111
112 now = datetime.utcnow().isoformat() + 'Z'
113
114 for daemon, counters in six.iteritems(self.get_all_perf_counters()):
115 svc_type, svc_id = daemon.split(".", 1)
116 metadata = self.get_metadata(svc_type, svc_id)
117
118 for path, counter_info in counters.items():
119 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
120 continue
121
122 value = counter_info['value']
123
124 data.append({
125 "measurement": "ceph_daemon_stats",
126 "tags": {
127 "ceph_daemon": daemon,
128 "type_instance": path,
129 "host": metadata['hostname'],
130 "fsid": self.get_fsid()
131 },
132 "time": now,
133 "fields": {
134 "value": value
135 }
136 })
137
138 return data
139
140 def set_config_option(self, option, value):
141 if option not in self.config_keys.keys():
142 raise RuntimeError('{0} is a unknown configuration '
143 'option'.format(option))
144
145 if option in ['port', 'interval']:
146 try:
147 value = int(value)
148 except (ValueError, TypeError):
149 raise RuntimeError('invalid {0} configured. Please specify '
150 'a valid integer'.format(option))
151
152 if option == 'interval' and value < 5:
153 raise RuntimeError('interval should be set to at least 5 seconds')
154
155 if option in ['ssl', 'verify_ssl']:
156 value = value.lower() == 'true'
157
158 self.config[option] = value
159
160 def init_module_config(self):
161 self.config['hostname'] = \
162 self.get_config("hostname", default=self.config_keys['hostname'])
163 self.config['port'] = \
164 int(self.get_config("port", default=self.config_keys['port']))
165 self.config['database'] = \
166 self.get_config("database", default=self.config_keys['database'])
167 self.config['username'] = \
168 self.get_config("username", default=self.config_keys['username'])
169 self.config['password'] = \
170 self.get_config("password", default=self.config_keys['password'])
171 self.config['interval'] = \
172 int(self.get_config("interval",
173 default=self.config_keys['interval']))
174 ssl = self.get_config("ssl", default=self.config_keys['ssl'])
175 self.config['ssl'] = ssl.lower() == 'true'
176 verify_ssl = \
177 self.get_config("verify_ssl", default=self.config_keys['verify_ssl'])
178 self.config['verify_ssl'] = verify_ssl.lower() == 'true'
179
180 def send_to_influx(self):
181 if not self.config['hostname']:
182 self.log.error("No Influx server configured, please set one using: "
183 "ceph influx config-set hostname <hostname>")
184 self.set_health_checks({
185 'MGR_INFLUX_NO_SERVER': {
186 'severity': 'warning',
187 'summary': 'No InfluxDB server configured',
188 'detail': ['Configuration option hostname not set']
189 }
190 })
191 return
192
193 # If influx server has authentication turned off then
194 # missing username/password is valid.
195 self.log.debug("Sending data to Influx host: %s",
196 self.config['hostname'])
197 client = InfluxDBClient(self.config['hostname'], self.config['port'],
198 self.config['username'],
199 self.config['password'],
200 self.config['database'],
201 self.config['ssl'],
202 self.config['verify_ssl'])
203
204 # using influx client get_list_database requires admin privs,
205 # instead we'll catch the not found exception and inform the user if
206 # db can not be created
207 try:
208 client.write_points(self.get_df_stats(), 'ms')
209 client.write_points(self.get_daemon_stats(), 'ms')
210 self.set_health_checks(dict())
211 except ConnectionError as e:
212 self.log.exception("Failed to connect to Influx host %s:%d",
213 self.config['hostname'], self.config['port'])
214 self.set_health_checks({
215 'MGR_INFLUX_SEND_FAILED': {
216 'severity': 'warning',
217 'summary': 'Failed to send data to InfluxDB server at %s:%d'
218 ' due to an connection error'
219 % (self.config['hostname'], self.config['port']),
220 'detail': [str(e)]
221 }
222 })
223 except InfluxDBClientError as e:
224 if e.code == 404:
225 self.log.info("Database '%s' not found, trying to create "
226 "(requires admin privs). You can also create "
227 "manually and grant write privs to user "
228 "'%s'", self.config['database'],
229 self.config['username'])
230 client.create_database(self.config['database'])
231 else:
232 self.set_health_checks({
233 'MGR_INFLUX_SEND_FAILED': {
234 'severity': 'warning',
235 'summary': 'Failed to send data to InfluxDB',
236 'detail': [str(e)]
237 }
238 })
239 raise
240
241 def shutdown(self):
242 self.log.info('Stopping influx module')
243 self.run = False
244 self.event.set()
245
246 def handle_command(self, cmd):
247 if cmd['prefix'] == 'influx config-show':
248 return 0, json.dumps(self.config), ''
249 elif cmd['prefix'] == 'influx config-set':
250 key = cmd['key']
251 value = cmd['value']
252 if not value:
253 return -errno.EINVAL, '', 'Value should not be empty or None'
254
255 self.log.debug('Setting configuration option %s to %s', key, value)
256 self.set_config_option(key, value)
257 self.set_config(key, value)
258 return 0, 'Configuration option {0} updated'.format(key), ''
259 elif cmd['prefix'] == 'influx send':
260 self.send_to_influx()
261 return 0, 'Sending data to Influx', ''
262 if cmd['prefix'] == 'influx self-test':
263 daemon_stats = self.get_daemon_stats()
264 assert len(daemon_stats)
265 df_stats = self.get_df_stats()
266
267 result = {
268 'daemon_stats': daemon_stats,
269 'df_stats': df_stats
270 }
271
272 return 0, json.dumps(result, indent=2), 'Self-test OK'
273
274 return (-errno.EINVAL, '',
275 "Command not found '{0}'".format(cmd['prefix']))
276
277 def serve(self):
278 if InfluxDBClient is None:
279 self.log.error("Cannot transmit statistics: influxdb python "
280 "module not found. Did you install it?")
281 return
282
283 self.log.info('Starting influx module')
284 self.init_module_config()
285 self.run = True
286
287 while self.run:
288 start = time.time()
289 self.send_to_influx()
290 runtime = time.time() - start
291 self.log.debug('Finished sending data in Influx in %.3f seconds',
292 runtime)
293 self.log.debug("Sleeping for %d seconds", self.config['interval'])
294 self.event.wait(self.config['interval'])