]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
update sources to v12.2.5
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 import json
3 import errno
4 import math
5 import os
6 import socket
7 from collections import OrderedDict
8 from mgr_module import MgrModule, MgrStandbyModule
9
10 # Defaults for the Prometheus HTTP server. Can also set in config-key
11 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
12 # for Prometheus exporter port registry
13
14 DEFAULT_ADDR = '::'
15 DEFAULT_PORT = 9283
16
17
18 # cherrypy likes to sys.exit on error. don't let it take us down too!
19 def os_exit_noop(*args, **kwargs):
20 pass
21
22
23 os._exit = os_exit_noop
24
25
26 # to access things in class Module from subclass Root. Because
27 # it's a dict, the writer doesn't need to declare 'global' for access
28
29 _global_instance = {'plugin': None}
30
31
32 def global_instance():
33 assert _global_instance['plugin'] is not None
34 return _global_instance['plugin']
35
36
37 def health_status_to_number(status):
38
39 if status == 'HEALTH_OK':
40 return 0
41 elif status == 'HEALTH_WARN':
42 return 1
43 elif status == 'HEALTH_ERR':
44 return 2
45
46 PG_STATES = [
47 "active",
48 "clean",
49 "down",
50 "recovery_unfound",
51 "backfill_unfound",
52 "scrubbing",
53 "degraded",
54 "inconsistent",
55 "peering",
56 "repair",
57 "recovering",
58 "forced_recovery",
59 "backfill_wait",
60 "incomplete",
61 "stale",
62 "remapped",
63 "deep",
64 "backfilling",
65 "forced_backfill",
66 "backfill_toofull",
67 "recovery_wait",
68 "recovery_toofull",
69 "undersized",
70 "activating",
71 "peered",
72 "snaptrim",
73 "snaptrim_wait",
74 "snaptrim_error",
75 "creating",
76 "unknown"]
77
78 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_objects']
79
80 DF_POOL = ['max_avail', 'bytes_used', 'raw_bytes_used', 'objects', 'dirty',
81 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
82
83 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
84 'norecover', 'noscrub', 'nodeep-scrub')
85
86 FS_METADATA = ('data_pools', 'id', 'metadata_pool', 'name')
87
88 MDS_METADATA = ('id', 'fs', 'hostname', 'public_addr', 'rank', 'ceph_version')
89
90 MON_METADATA = ('id', 'hostname', 'public_addr', 'rank', 'ceph_version')
91
92 OSD_METADATA = ('cluster_addr', 'device_class', 'id', 'hostname', 'public_addr',
93 'ceph_version')
94
95 OSD_STATUS = ['weight', 'up', 'in']
96
97 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
98
99 POOL_METADATA = ('pool_id', 'name')
100
101 RGW_METADATA = ('id', 'hostname', 'ceph_version')
102
103 DISK_OCCUPATION = ('instance', 'device', 'ceph_daemon')
104
105
106 class Metrics(object):
107 def __init__(self):
108 self.metrics = self._setup_static_metrics()
109 self.pending = {}
110
111 def set(self, key, value, labels=('',)):
112 '''
113 Set the value of a single Metrics. This should be used for static metrics,
114 e.g. cluster health.
115 '''
116 self.metrics[key].set(value, labels)
117
118 def append(self, key, value, labels = ('',)):
119 '''
120 Append a metrics to the staging area. Use this to aggregate daemon specific
121 metrics that can appear and go away as daemons are added or removed.
122 '''
123 if key not in self.pending:
124 self.pending[key] = []
125 self.pending[key].append((labels, value))
126
127 def reset(self):
128 '''
129 When metrics aggregation is done, call Metrics.reset() to apply the
130 aggregated metric. This will remove all label -> value mappings for a
131 metric and set the new mapping (from pending). This means daemon specific
132 metrics os daemons that do no longer exist, are removed.
133 '''
134 for k, v in self.pending.items():
135 self.metrics[k].reset(v)
136 self.pending = {}
137
138 def add_metric(self, path, metric):
139 if path not in self.metrics:
140 self.metrics[path] = metric
141
142
143 def _setup_static_metrics(self):
144 metrics = {}
145 metrics['health_status'] = Metric(
146 'untyped',
147 'health_status',
148 'Cluster health status'
149 )
150 metrics['mon_quorum_status'] = Metric(
151 'gauge',
152 'mon_quorum_status',
153 'Monitors in quorum',
154 ('ceph_daemon',)
155 )
156 metrics['fs_metadata'] = Metric(
157 'untyped',
158 'fs_metadata',
159 'FS Metadata',
160 FS_METADATA
161 )
162 metrics['mds_metadata'] = Metric(
163 'untyped',
164 'mds_metadata',
165 'MDS Metadata',
166 MDS_METADATA
167 )
168 metrics['mon_metadata'] = Metric(
169 'untyped',
170 'mon_metadata',
171 'MON Metadata',
172 MON_METADATA
173 )
174 metrics['osd_metadata'] = Metric(
175 'untyped',
176 'osd_metadata',
177 'OSD Metadata',
178 OSD_METADATA
179 )
180
181 # The reason for having this separate to OSD_METADATA is
182 # so that we can stably use the same tag names that
183 # the Prometheus node_exporter does
184 metrics['disk_occupation'] = Metric(
185 'untyped',
186 'disk_occupation',
187 'Associate Ceph daemon with disk used',
188 DISK_OCCUPATION
189 )
190
191 metrics['pool_metadata'] = Metric(
192 'untyped',
193 'pool_metadata',
194 'POOL Metadata',
195 POOL_METADATA
196 )
197
198 metrics['rgw_metadata'] = Metric(
199 'untyped',
200 'rgw_metadata',
201 'RGW Metadata',
202 RGW_METADATA
203 )
204
205 metrics['pg_total'] = Metric(
206 'gauge',
207 'pg_total',
208 'PG Total Count'
209 )
210
211 for flag in OSD_FLAGS:
212 path = 'osd_flag_{}'.format(flag)
213 metrics[path] = Metric(
214 'untyped',
215 path,
216 'OSD Flag {}'.format(flag)
217 )
218 for state in OSD_STATUS:
219 path = 'osd_{}'.format(state)
220 metrics[path] = Metric(
221 'untyped',
222 path,
223 'OSD status {}'.format(state),
224 ('ceph_daemon',)
225 )
226 for stat in OSD_STATS:
227 path = 'osd_{}'.format(stat)
228 metrics[path] = Metric(
229 'gauge',
230 path,
231 'OSD stat {}'.format(stat),
232 ('ceph_daemon',)
233 )
234 for state in PG_STATES:
235 path = 'pg_{}'.format(state)
236 metrics[path] = Metric(
237 'gauge',
238 path,
239 'PG {}'.format(state),
240 )
241 for state in DF_CLUSTER:
242 path = 'cluster_{}'.format(state)
243 metrics[path] = Metric(
244 'gauge',
245 path,
246 'DF {}'.format(state),
247 )
248 for state in DF_POOL:
249 path = 'pool_{}'.format(state)
250 metrics[path] = Metric(
251 'gauge',
252 path,
253 'DF pool {}'.format(state),
254 ('pool_id',)
255 )
256
257 return metrics
258
259
260
261 class Metric(object):
262 def __init__(self, mtype, name, desc, labels=None):
263 self.mtype = mtype
264 self.name = name
265 self.desc = desc
266 self.labelnames = labels # tuple if present
267 self.value = {} # indexed by label values
268
269 def set(self, value, labelvalues=None):
270 # labelvalues must be a tuple
271 labelvalues = labelvalues or ('',)
272 self.value[labelvalues] = value
273
274 def reset(self, values):
275 self.value = {}
276 for labelvalues, value in values:
277 self.value[labelvalues] = value
278
279 def str_expfmt(self):
280
281 def promethize(path):
282 ''' replace illegal metric name characters '''
283 result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
284
285 # Hyphens usually turn into underscores, unless they are
286 # trailing
287 if result.endswith("-"):
288 result = result[0:-1] + "_minus"
289 else:
290 result = result.replace("-", "_")
291
292 return "ceph_{0}".format(result)
293
294 def floatstr(value):
295 ''' represent as Go-compatible float '''
296 if value == float('inf'):
297 return '+Inf'
298 if value == float('-inf'):
299 return '-Inf'
300 if math.isnan(value):
301 return 'NaN'
302 return repr(float(value))
303
304 name = promethize(self.name)
305 expfmt = '''
306 # HELP {name} {desc}
307 # TYPE {name} {mtype}'''.format(
308 name=name,
309 desc=self.desc,
310 mtype=self.mtype,
311 )
312
313 for labelvalues, value in self.value.items():
314 if self.labelnames:
315 labels = zip(self.labelnames, labelvalues)
316 labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
317 else:
318 labels = ''
319 if labels:
320 fmtstr = '\n{name}{{{labels}}} {value}'
321 else:
322 fmtstr = '\n{name} {value}'
323 expfmt += fmtstr.format(
324 name=name,
325 labels=labels,
326 value=floatstr(value),
327 )
328 return expfmt
329
330
331 class Module(MgrModule):
332 COMMANDS = [
333 {
334 "cmd": "prometheus self-test",
335 "desc": "Run a self test on the prometheus module",
336 "perm": "rw"
337 },
338 ]
339
340 def __init__(self, *args, **kwargs):
341 super(Module, self).__init__(*args, **kwargs)
342 self.metrics = Metrics()
343 self.schema = OrderedDict()
344 _global_instance['plugin'] = self
345
346 def get_health(self):
347 health = json.loads(self.get('health')['json'])
348 self.metrics.set('health_status',
349 health_status_to_number(health['status'])
350 )
351
352 def get_df(self):
353 # maybe get the to-be-exported metrics from a config?
354 df = self.get('df')
355 for stat in DF_CLUSTER:
356 self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
357
358 for pool in df['pools']:
359 for stat in DF_POOL:
360 self.metrics.append('pool_{}'.format(stat),
361 pool['stats'][stat],
362 (pool['id'],))
363
364 def get_fs(self):
365 fs_map = self.get('fs_map')
366 servers = self.get_service_list()
367 active_daemons = []
368 for fs in fs_map['filesystems']:
369 # collect fs metadata
370 data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
371 self.metrics.append('fs_metadata', 1,
372 (data_pools,
373 fs['id'],
374 fs['mdsmap']['metadata_pool'],
375 fs['mdsmap']['fs_name']))
376 for gid, daemon in fs['mdsmap']['info'].items():
377 id_ = daemon['name']
378 host_version = servers.get((id_, 'mds'), ('',''))
379 self.metrics.append('mds_metadata', 1,
380 (id_, fs['id'], host_version[0],
381 daemon['addr'], daemon['rank'],
382 host_version[1]))
383
384 def get_quorum_status(self):
385 mon_status = json.loads(self.get('mon_status')['json'])
386 servers = self.get_service_list()
387 for mon in mon_status['monmap']['mons']:
388 rank = mon['rank']
389 id_ = mon['name']
390 host_version = servers.get((id_, 'mon'), ('',''))
391 self.metrics.append('mon_metadata', 1,
392 (id_, host_version[0],
393 mon['public_addr'].split(':')[0], rank,
394 host_version[1]))
395 in_quorum = int(rank in mon_status['quorum'])
396 self.metrics.append('mon_quorum_status', in_quorum,
397 ('mon_{}'.format(id_),))
398
399 def get_pg_status(self):
400 # TODO add per pool status?
401 pg_status = self.get('pg_status')
402
403 # Set total count of PGs, first
404 self.metrics.set('pg_total', pg_status['num_pgs'])
405
406 reported_states = {}
407 for pg in pg_status['pgs_by_state']:
408 for state in pg['state_name'].split('+'):
409 reported_states[state] = reported_states.get(state, 0) + pg['count']
410
411 for state in reported_states:
412 path = 'pg_{}'.format(state)
413 try:
414 self.metrics.set(path, reported_states[state])
415 except KeyError:
416 self.log.warn("skipping pg in unknown state {}".format(state))
417
418 for state in PG_STATES:
419 if state not in reported_states:
420 try:
421 self.metrics.set('pg_{}'.format(state), 0)
422 except KeyError:
423 self.log.warn("skipping pg in unknown state {}".format(state))
424
425 def get_osd_stats(self):
426 osd_stats = self.get('osd_stats')
427 for osd in osd_stats['osd_stats']:
428 id_ = osd['osd']
429 for stat in OSD_STATS:
430 val = osd['perf_stat'][stat]
431 self.metrics.append('osd_{}'.format(stat), val,
432 ('osd.{}'.format(id_),))
433
434 def get_service_list(self):
435 ret = {}
436 for server in self.list_servers():
437 version = server.get('ceph_version', '')
438 host = server.get('hostname', '')
439 for service in server.get('services', []):
440 ret.update({(service['id'], service['type']): (host, version)})
441 return ret
442
443 def get_metadata_and_osd_status(self):
444 osd_map = self.get('osd_map')
445 osd_flags = osd_map['flags'].split(',')
446 for flag in OSD_FLAGS:
447 self.metrics.set('osd_flag_{}'.format(flag),
448 int(flag in osd_flags))
449
450 osd_devices = self.get('osd_map_crush')['devices']
451 servers = self.get_service_list()
452 for osd in osd_map['osds']:
453 # id can be used to link osd metrics and metadata
454 id_ = osd['osd']
455 # collect osd metadata
456 p_addr = osd['public_addr'].split(':')[0]
457 c_addr = osd['cluster_addr'].split(':')[0]
458 if p_addr == "-" or c_addr == "-":
459 self.log.info(
460 "Missing address metadata for osd {0}, skipping occupation"
461 " and metadata records for this osd".format(id_)
462 )
463 continue
464
465 dev_class = None
466 for osd_device in osd_devices:
467 if osd_device['id'] == id_:
468 dev_class = osd_device.get('class', '')
469 break
470
471 if dev_class is None:
472 self.log.info(
473 "OSD {0} is missing from CRUSH map, skipping output".format(
474 id_))
475 continue
476
477 host_version = servers.get((str(id_), 'osd'), ('',''))
478
479 self.metrics.append('osd_metadata', 1, (
480 c_addr,
481 dev_class,
482 id_, host_version[0],
483 p_addr, host_version[1]
484 ))
485
486 # collect osd status
487 for state in OSD_STATUS:
488 status = osd[state]
489 self.metrics.append('osd_{}'.format(state), status,
490 ('osd.{}'.format(id_),))
491
492 # collect disk occupation metadata
493 osd_metadata = self.get_metadata("osd", str(id_))
494 if osd_metadata is None:
495 continue
496 dev_keys = ("backend_filestore_dev_node", "bluestore_bdev_dev_node")
497 osd_dev_node = None
498 for dev_key in dev_keys:
499 val = osd_metadata.get(dev_key, None)
500 if val and val != "unknown":
501 osd_dev_node = val
502 break
503 osd_hostname = osd_metadata.get('hostname', None)
504 if osd_dev_node and osd_hostname:
505 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
506 id_, osd_hostname, osd_dev_node))
507 self.metrics.set('disk_occupation', 1, (
508 osd_hostname,
509 osd_dev_node,
510 "osd.{0}".format(id_)
511 ))
512 else:
513 self.log.info("Missing dev node metadata for osd {0}, skipping "
514 "occupation record for this osd".format(id_))
515
516 pool_meta = []
517 for pool in osd_map['pools']:
518 self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
519
520 # Populate rgw_metadata
521 for key, value in servers.items():
522 service_id, service_type = key
523 if service_type != 'rgw':
524 continue
525 hostname, version = value
526 self.metrics.append(
527 'rgw_metadata',
528 1,
529 (service_id, hostname, version)
530 )
531
532 def collect(self):
533 self.get_health()
534 self.get_df()
535 self.get_fs()
536 self.get_osd_stats()
537 self.get_quorum_status()
538 self.get_metadata_and_osd_status()
539 self.get_pg_status()
540
541 for daemon, counters in self.get_all_perf_counters().items():
542 for path, counter_info in counters.items():
543 stattype = self._stattype_to_str(counter_info['type'])
544 # XXX simplify first effort: no histograms
545 # averages are already collapsed to one value for us
546 if not stattype or stattype == 'histogram':
547 self.log.debug('ignoring %s, type %s' % (path, stattype))
548 continue
549
550 self.metrics.add_metric(path, Metric(
551 stattype,
552 path,
553 counter_info['description'],
554 ("ceph_daemon",),
555 ))
556
557 self.metrics.append(path, counter_info['value'], (daemon,))
558 # It is sufficient to reset the pending metrics once per scrape
559 self.metrics.reset()
560
561 return self.metrics.metrics
562
563 def handle_command(self, cmd):
564 if cmd['prefix'] == 'prometheus self-test':
565 self.collect()
566 return 0, '', 'Self-test OK'
567 else:
568 return (-errno.EINVAL, '',
569 "Command not found '{0}'".format(cmd['prefix']))
570
571 def serve(self):
572
573 class Root(object):
574
575 # collapse everything to '/'
576 def _cp_dispatch(self, vpath):
577 cherrypy.request.path = ''
578 return self
579
580 def format_metrics(self, metrics):
581 formatted = ''
582 for m in metrics.values():
583 formatted += m.str_expfmt()
584 return formatted + '\n'
585
586 @cherrypy.expose
587 def index(self):
588 return '''<!DOCTYPE html>
589 <html>
590 <head><title>Ceph Exporter</title></head>
591 <body>
592 <h1>Ceph Exporter</h1>
593 <p><a href='/metrics'>Metrics</a></p>
594 </body>
595 </html>'''
596
597 @cherrypy.expose
598 def metrics(self):
599 if global_instance().have_mon_connection():
600 metrics = global_instance().collect()
601 cherrypy.response.headers['Content-Type'] = 'text/plain'
602 if metrics:
603 return self.format_metrics(metrics)
604 else:
605 raise cherrypy.HTTPError(503, 'No MON connection')
606
607 server_addr = self.get_localized_config('server_addr', DEFAULT_ADDR)
608 server_port = self.get_localized_config('server_port', DEFAULT_PORT)
609 self.log.info(
610 "server_addr: %s server_port: %s" %
611 (server_addr, server_port)
612 )
613
614 # Publish the URI that others may use to access the service we're
615 # about to start serving
616 self.set_uri('http://{0}:{1}/'.format(
617 socket.getfqdn() if server_addr == '::' else server_addr,
618 server_port
619 ))
620
621 cherrypy.config.update({
622 'server.socket_host': server_addr,
623 'server.socket_port': int(server_port),
624 'engine.autoreload.on': False
625 })
626 cherrypy.tree.mount(Root(), "/")
627 self.log.info('Starting engine...')
628 cherrypy.engine.start()
629 self.log.info('Engine started.')
630 cherrypy.engine.block()
631
632 def shutdown(self):
633 self.log.info('Stopping engine...')
634 cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
635 cherrypy.engine.exit()
636 self.log.info('Stopped engine')
637
638
639 class StandbyModule(MgrStandbyModule):
640 def serve(self):
641 server_addr = self.get_localized_config('server_addr', '::')
642 server_port = self.get_localized_config('server_port', DEFAULT_PORT)
643 self.log.info("server_addr: %s server_port: %s" % (server_addr, server_port))
644 cherrypy.config.update({
645 'server.socket_host': server_addr,
646 'server.socket_port': int(server_port),
647 'engine.autoreload.on': False
648 })
649
650 module = self
651
652 class Root(object):
653
654 @cherrypy.expose
655 def index(self):
656 active_uri = module.get_active_uri()
657 return '''<!DOCTYPE html>
658 <html>
659 <head><title>Ceph Exporter</title></head>
660 <body>
661 <h1>Ceph Exporter</h1>
662 <p><a href='{}metrics'>Metrics</a></p>
663 </body>
664 </html>'''.format(active_uri)
665
666 @cherrypy.expose
667 def metrics(self):
668 cherrypy.response.headers['Content-Type'] = 'text/plain'
669 return ''
670
671 cherrypy.tree.mount(Root(), '/', {})
672 self.log.info('Starting engine...')
673 cherrypy.engine.start()
674 self.log.info("Waiting for engine...")
675 cherrypy.engine.wait(state=cherrypy.engine.states.STOPPED)
676 self.log.info('Engine started.')
677
678 def shutdown(self):
679 self.log.info("Stopping engine...")
680 cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
681 cherrypy.engine.stop()
682 self.log.info("Stopped engine")