]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 import json
3 import errno
4 import math
5 import os
6 import socket
7 from collections import OrderedDict
8 from mgr_module import MgrModule, MgrStandbyModule
9
10 # Defaults for the Prometheus HTTP server. Can also set in config-key
11 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
12 # for Prometheus exporter port registry
13
14 DEFAULT_ADDR = '::'
15 DEFAULT_PORT = 9283
16
17
18 # cherrypy likes to sys.exit on error. don't let it take us down too!
19 def os_exit_noop(*args, **kwargs):
20 pass
21
22
23 os._exit = os_exit_noop
24
25
26 # to access things in class Module from subclass Root. Because
27 # it's a dict, the writer doesn't need to declare 'global' for access
28
29 _global_instance = {'plugin': None}
30
31
32 def global_instance():
33 assert _global_instance['plugin'] is not None
34 return _global_instance['plugin']
35
36
37 def health_status_to_number(status):
38
39 if status == 'HEALTH_OK':
40 return 0
41 elif status == 'HEALTH_WARN':
42 return 1
43 elif status == 'HEALTH_ERR':
44 return 2
45
46 PG_STATES = [
47 "active",
48 "clean",
49 "down",
50 "recovery_unfound",
51 "backfill_unfound",
52 "scrubbing",
53 "degraded",
54 "inconsistent",
55 "peering",
56 "repair",
57 "recovering",
58 "forced_recovery",
59 "backfill_wait",
60 "incomplete",
61 "stale",
62 "remapped",
63 "deep",
64 "backfilling",
65 "forced_backfill",
66 "backfill_toofull",
67 "recovery_wait",
68 "recovery_toofull",
69 "undersized",
70 "activating",
71 "peered",
72 "snaptrim",
73 "snaptrim_wait",
74 "snaptrim_error",
75 "creating",
76 "unknown"]
77
78 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_objects']
79
80 DF_POOL = ['max_avail', 'bytes_used', 'raw_bytes_used', 'objects', 'dirty',
81 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
82
83 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
84 'norecover', 'noscrub', 'nodeep-scrub')
85
86 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
87
88 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
89 'ceph_version')
90
91 MON_METADATA = ('ceph_daemon', 'hostname', 'public_addr', 'rank', 'ceph_version')
92
93 OSD_METADATA = ('ceph_daemon', 'cluster_addr', 'device_class', 'hostname',
94 'public_addr', 'ceph_version')
95
96 OSD_STATUS = ['weight', 'up', 'in']
97
98 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
99
100 POOL_METADATA = ('pool_id', 'name')
101
102 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
103
104 DISK_OCCUPATION = ( 'ceph_daemon', 'device','instance')
105
106 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
107
108
109 class Metrics(object):
110 def __init__(self):
111 self.metrics = self._setup_static_metrics()
112 self.pending = {}
113
114 def set(self, key, value, labels=('',)):
115 '''
116 Set the value of a single Metrics. This should be used for static metrics,
117 e.g. cluster health.
118 '''
119 self.metrics[key].set(value, labels)
120
121 def append(self, key, value, labels = ('',)):
122 '''
123 Append a metrics to the staging area. Use this to aggregate daemon specific
124 metrics that can appear and go away as daemons are added or removed.
125 '''
126 if key not in self.pending:
127 self.pending[key] = []
128 self.pending[key].append((labels, value))
129
130 def reset(self):
131 '''
132 When metrics aggregation is done, call Metrics.reset() to apply the
133 aggregated metric. This will remove all label -> value mappings for a
134 metric and set the new mapping (from pending). This means daemon specific
135 metrics os daemons that do no longer exist, are removed.
136 '''
137 for k, v in self.pending.items():
138 self.metrics[k].reset(v)
139 self.pending = {}
140
141 def add_metric(self, path, metric):
142 if path not in self.metrics:
143 self.metrics[path] = metric
144
145
146 def _setup_static_metrics(self):
147 metrics = {}
148 metrics['health_status'] = Metric(
149 'untyped',
150 'health_status',
151 'Cluster health status'
152 )
153 metrics['mon_quorum_status'] = Metric(
154 'gauge',
155 'mon_quorum_status',
156 'Monitors in quorum',
157 ('ceph_daemon',)
158 )
159 metrics['fs_metadata'] = Metric(
160 'untyped',
161 'fs_metadata',
162 'FS Metadata',
163 FS_METADATA
164 )
165 metrics['mds_metadata'] = Metric(
166 'untyped',
167 'mds_metadata',
168 'MDS Metadata',
169 MDS_METADATA
170 )
171 metrics['mon_metadata'] = Metric(
172 'untyped',
173 'mon_metadata',
174 'MON Metadata',
175 MON_METADATA
176 )
177 metrics['osd_metadata'] = Metric(
178 'untyped',
179 'osd_metadata',
180 'OSD Metadata',
181 OSD_METADATA
182 )
183
184 # The reason for having this separate to OSD_METADATA is
185 # so that we can stably use the same tag names that
186 # the Prometheus node_exporter does
187 metrics['disk_occupation'] = Metric(
188 'untyped',
189 'disk_occupation',
190 'Associate Ceph daemon with disk used',
191 DISK_OCCUPATION
192 )
193
194 metrics['pool_metadata'] = Metric(
195 'untyped',
196 'pool_metadata',
197 'POOL Metadata',
198 POOL_METADATA
199 )
200
201 metrics['rgw_metadata'] = Metric(
202 'untyped',
203 'rgw_metadata',
204 'RGW Metadata',
205 RGW_METADATA
206 )
207
208 metrics['pg_total'] = Metric(
209 'gauge',
210 'pg_total',
211 'PG Total Count'
212 )
213
214 for flag in OSD_FLAGS:
215 path = 'osd_flag_{}'.format(flag)
216 metrics[path] = Metric(
217 'untyped',
218 path,
219 'OSD Flag {}'.format(flag)
220 )
221 for state in OSD_STATUS:
222 path = 'osd_{}'.format(state)
223 metrics[path] = Metric(
224 'untyped',
225 path,
226 'OSD status {}'.format(state),
227 ('ceph_daemon',)
228 )
229 for stat in OSD_STATS:
230 path = 'osd_{}'.format(stat)
231 metrics[path] = Metric(
232 'gauge',
233 path,
234 'OSD stat {}'.format(stat),
235 ('ceph_daemon',)
236 )
237 for state in PG_STATES:
238 path = 'pg_{}'.format(state)
239 metrics[path] = Metric(
240 'gauge',
241 path,
242 'PG {}'.format(state),
243 )
244 for state in DF_CLUSTER:
245 path = 'cluster_{}'.format(state)
246 metrics[path] = Metric(
247 'gauge',
248 path,
249 'DF {}'.format(state),
250 )
251 for state in DF_POOL:
252 path = 'pool_{}'.format(state)
253 metrics[path] = Metric(
254 'gauge',
255 path,
256 'DF pool {}'.format(state),
257 ('pool_id',)
258 )
259 for state in NUM_OBJECTS:
260 path = 'num_objects_{}'.format(state)
261 metrics[path] = Metric(
262 'gauge',
263 path,
264 'Number of {} objects'.format(state),
265 )
266
267 return metrics
268
269
270
271 class Metric(object):
272 def __init__(self, mtype, name, desc, labels=None):
273 self.mtype = mtype
274 self.name = name
275 self.desc = desc
276 self.labelnames = labels # tuple if present
277 self.value = {} # indexed by label values
278
279 def set(self, value, labelvalues=None):
280 # labelvalues must be a tuple
281 labelvalues = labelvalues or ('',)
282 self.value[labelvalues] = value
283
284 def reset(self, values):
285 self.value = {}
286 for labelvalues, value in values:
287 self.value[labelvalues] = value
288
289 def str_expfmt(self):
290
291 def promethize(path):
292 ''' replace illegal metric name characters '''
293 result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
294
295 # Hyphens usually turn into underscores, unless they are
296 # trailing
297 if result.endswith("-"):
298 result = result[0:-1] + "_minus"
299 else:
300 result = result.replace("-", "_")
301
302 return "ceph_{0}".format(result)
303
304 def floatstr(value):
305 ''' represent as Go-compatible float '''
306 if value == float('inf'):
307 return '+Inf'
308 if value == float('-inf'):
309 return '-Inf'
310 if math.isnan(value):
311 return 'NaN'
312 return repr(float(value))
313
314 name = promethize(self.name)
315 expfmt = '''
316 # HELP {name} {desc}
317 # TYPE {name} {mtype}'''.format(
318 name=name,
319 desc=self.desc,
320 mtype=self.mtype,
321 )
322
323 for labelvalues, value in self.value.items():
324 if self.labelnames:
325 labels = zip(self.labelnames, labelvalues)
326 labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
327 else:
328 labels = ''
329 if labels:
330 fmtstr = '\n{name}{{{labels}}} {value}'
331 else:
332 fmtstr = '\n{name} {value}'
333 expfmt += fmtstr.format(
334 name=name,
335 labels=labels,
336 value=floatstr(value),
337 )
338 return expfmt
339
340
341 class Module(MgrModule):
342 COMMANDS = [
343 {
344 "cmd": "prometheus self-test",
345 "desc": "Run a self test on the prometheus module",
346 "perm": "rw"
347 },
348 ]
349
350 def __init__(self, *args, **kwargs):
351 super(Module, self).__init__(*args, **kwargs)
352 self.metrics = Metrics()
353 self.schema = OrderedDict()
354 _global_instance['plugin'] = self
355
356 def get_health(self):
357 health = json.loads(self.get('health')['json'])
358 self.metrics.set('health_status',
359 health_status_to_number(health['status'])
360 )
361
362 def get_df(self):
363 # maybe get the to-be-exported metrics from a config?
364 df = self.get('df')
365 for stat in DF_CLUSTER:
366 self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
367
368 for pool in df['pools']:
369 for stat in DF_POOL:
370 self.metrics.append('pool_{}'.format(stat),
371 pool['stats'][stat],
372 (pool['id'],))
373
374 def get_fs(self):
375 fs_map = self.get('fs_map')
376 servers = self.get_service_list()
377 active_daemons = []
378 for fs in fs_map['filesystems']:
379 # collect fs metadata
380 data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
381 self.metrics.append('fs_metadata', 1,
382 (data_pools,
383 fs['id'],
384 fs['mdsmap']['metadata_pool'],
385 fs['mdsmap']['fs_name']))
386 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
387 for gid, daemon in fs['mdsmap']['info'].items():
388 id_ = daemon['name']
389 host_version = servers.get((id_, 'mds'), ('',''))
390 self.metrics.append('mds_metadata', 1,
391 ('mds.{}'.format(id_), fs['id'],
392 host_version[0], daemon['addr'],
393 daemon['rank'], host_version[1]))
394
395 def get_quorum_status(self):
396 mon_status = json.loads(self.get('mon_status')['json'])
397 servers = self.get_service_list()
398 for mon in mon_status['monmap']['mons']:
399 rank = mon['rank']
400 id_ = mon['name']
401 host_version = servers.get((id_, 'mon'), ('',''))
402 self.metrics.append('mon_metadata', 1,
403 ('mon.{}'.format(id_), host_version[0],
404 mon['public_addr'].split(':')[0], rank,
405 host_version[1]))
406 in_quorum = int(rank in mon_status['quorum'])
407 self.metrics.append('mon_quorum_status', in_quorum,
408 ('mon.{}'.format(id_),))
409
410 def get_pg_status(self):
411 # TODO add per pool status?
412 pg_status = self.get('pg_status')
413
414 # Set total count of PGs, first
415 self.metrics.set('pg_total', pg_status['num_pgs'])
416
417 reported_states = {}
418 for pg in pg_status['pgs_by_state']:
419 for state in pg['state_name'].split('+'):
420 reported_states[state] = reported_states.get(state, 0) + pg['count']
421
422 for state in reported_states:
423 path = 'pg_{}'.format(state)
424 try:
425 self.metrics.set(path, reported_states[state])
426 except KeyError:
427 self.log.warn("skipping pg in unknown state {}".format(state))
428
429 for state in PG_STATES:
430 if state not in reported_states:
431 try:
432 self.metrics.set('pg_{}'.format(state), 0)
433 except KeyError:
434 self.log.warn("skipping pg in unknown state {}".format(state))
435
436 def get_osd_stats(self):
437 osd_stats = self.get('osd_stats')
438 for osd in osd_stats['osd_stats']:
439 id_ = osd['osd']
440 for stat in OSD_STATS:
441 val = osd['perf_stat'][stat]
442 self.metrics.append('osd_{}'.format(stat), val,
443 ('osd.{}'.format(id_),))
444
445 def get_service_list(self):
446 ret = {}
447 for server in self.list_servers():
448 version = server.get('ceph_version', '')
449 host = server.get('hostname', '')
450 for service in server.get('services', []):
451 ret.update({(service['id'], service['type']): (host, version)})
452 return ret
453
454 def get_metadata_and_osd_status(self):
455 osd_map = self.get('osd_map')
456 osd_flags = osd_map['flags'].split(',')
457 for flag in OSD_FLAGS:
458 self.metrics.set('osd_flag_{}'.format(flag),
459 int(flag in osd_flags))
460
461 osd_devices = self.get('osd_map_crush')['devices']
462 servers = self.get_service_list()
463 for osd in osd_map['osds']:
464 # id can be used to link osd metrics and metadata
465 id_ = osd['osd']
466 # collect osd metadata
467 p_addr = osd['public_addr'].split(':')[0]
468 c_addr = osd['cluster_addr'].split(':')[0]
469 if p_addr == "-" or c_addr == "-":
470 self.log.info(
471 "Missing address metadata for osd {0}, skipping occupation"
472 " and metadata records for this osd".format(id_)
473 )
474 continue
475
476 dev_class = None
477 for osd_device in osd_devices:
478 if osd_device['id'] == id_:
479 dev_class = osd_device.get('class', '')
480 break
481
482 if dev_class is None:
483 self.log.info(
484 "OSD {0} is missing from CRUSH map, skipping output".format(
485 id_))
486 continue
487
488 host_version = servers.get((str(id_), 'osd'), ('',''))
489
490 self.metrics.append('osd_metadata', 1, (
491 'osd.{}'.format(id_),
492 c_addr,
493 dev_class,
494 host_version[0],
495 p_addr, host_version[1]
496 ))
497
498 # collect osd status
499 for state in OSD_STATUS:
500 status = osd[state]
501 self.metrics.append('osd_{}'.format(state), status,
502 ('osd.{}'.format(id_),))
503
504 # collect disk occupation metadata
505 osd_metadata = self.get_metadata("osd", str(id_))
506 if osd_metadata is None:
507 continue
508 dev_keys = ("backend_filestore_dev_node", "bluestore_bdev_dev_node")
509 osd_dev_node = None
510 for dev_key in dev_keys:
511 val = osd_metadata.get(dev_key, None)
512 if val and val != "unknown":
513 osd_dev_node = val
514 break
515 osd_hostname = osd_metadata.get('hostname', None)
516 if osd_dev_node and osd_hostname:
517 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
518 id_, osd_hostname, osd_dev_node))
519 self.metrics.set('disk_occupation', 1, (
520 "osd.{0}".format(id_),
521 osd_dev_node,
522 osd_hostname
523 ))
524 else:
525 self.log.info("Missing dev node metadata for osd {0}, skipping "
526 "occupation record for this osd".format(id_))
527
528 pool_meta = []
529 for pool in osd_map['pools']:
530 self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
531
532 # Populate rgw_metadata
533 for key, value in servers.items():
534 service_id, service_type = key
535 if service_type != 'rgw':
536 continue
537 hostname, version = value
538 self.metrics.append(
539 'rgw_metadata',
540 1,
541 ('{}.{}'.format(service_type, service_id), hostname, version)
542 )
543
544 def get_num_objects(self):
545 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
546 for obj in NUM_OBJECTS:
547 stat = 'num_objects_{}'.format(obj)
548 self.metrics.set(stat, pg_sum[stat])
549
550 def collect(self):
551 self.get_health()
552 self.get_df()
553 self.get_fs()
554 self.get_osd_stats()
555 self.get_quorum_status()
556 self.get_metadata_and_osd_status()
557 self.get_pg_status()
558 self.get_num_objects()
559
560 for daemon, counters in self.get_all_perf_counters().items():
561 for path, counter_info in counters.items():
562 # Skip histograms, they are represented by long running avgs
563 stattype = self._stattype_to_str(counter_info['type'])
564 if not stattype or stattype == 'histogram':
565 self.log.debug('ignoring %s, type %s' % (path, stattype))
566 continue
567
568 # Get the value of the counter
569 value = self._perfvalue_to_value(counter_info['type'], counter_info['value'])
570
571 # Represent the long running avgs as sum/count pairs
572 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
573 _path = path + '_sum'
574 self.metrics.add_metric(_path, Metric(
575 stattype,
576 _path,
577 counter_info['description'] + ' Total',
578 ("ceph_daemon",),
579 ))
580 self.metrics.append(_path, value, (daemon,))
581
582 _path = path + '_count'
583 self.metrics.add_metric(_path, Metric(
584 'counter',
585 _path,
586 counter_info['description'] + ' Count',
587 ("ceph_daemon",),
588 ))
589 self.metrics.append(_path, counter_info['count'], (daemon,))
590 else:
591 self.metrics.add_metric(path, Metric(
592 stattype,
593 path,
594 counter_info['description'],
595 ("ceph_daemon",),
596 ))
597 self.metrics.append(path, value, (daemon,))
598
599 # It is sufficient to reset the pending metrics once per scrape
600 self.metrics.reset()
601
602 return self.metrics.metrics
603
604 def handle_command(self, cmd):
605 if cmd['prefix'] == 'prometheus self-test':
606 self.collect()
607 return 0, '', 'Self-test OK'
608 else:
609 return (-errno.EINVAL, '',
610 "Command not found '{0}'".format(cmd['prefix']))
611
612 def serve(self):
613
614 class Root(object):
615
616 # collapse everything to '/'
617 def _cp_dispatch(self, vpath):
618 cherrypy.request.path = ''
619 return self
620
621 def format_metrics(self, metrics):
622 formatted = ''
623 for m in metrics.values():
624 formatted += m.str_expfmt()
625 return formatted + '\n'
626
627 @cherrypy.expose
628 def index(self):
629 return '''<!DOCTYPE html>
630 <html>
631 <head><title>Ceph Exporter</title></head>
632 <body>
633 <h1>Ceph Exporter</h1>
634 <p><a href='/metrics'>Metrics</a></p>
635 </body>
636 </html>'''
637
638 @cherrypy.expose
639 def metrics(self):
640 if global_instance().have_mon_connection():
641 metrics = global_instance().collect()
642 cherrypy.response.headers['Content-Type'] = 'text/plain'
643 if metrics:
644 return self.format_metrics(metrics)
645 else:
646 raise cherrypy.HTTPError(503, 'No MON connection')
647
648 server_addr = self.get_localized_config('server_addr', DEFAULT_ADDR)
649 server_port = self.get_localized_config('server_port', DEFAULT_PORT)
650 self.log.info(
651 "server_addr: %s server_port: %s" %
652 (server_addr, server_port)
653 )
654
655 # Publish the URI that others may use to access the service we're
656 # about to start serving
657 self.set_uri('http://{0}:{1}/'.format(
658 socket.getfqdn() if server_addr == '::' else server_addr,
659 server_port
660 ))
661
662 cherrypy.config.update({
663 'server.socket_host': server_addr,
664 'server.socket_port': int(server_port),
665 'engine.autoreload.on': False
666 })
667 cherrypy.tree.mount(Root(), "/")
668 self.log.info('Starting engine...')
669 cherrypy.engine.start()
670 self.log.info('Engine started.')
671 cherrypy.engine.block()
672
673 def shutdown(self):
674 self.log.info('Stopping engine...')
675 cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
676 cherrypy.engine.exit()
677 self.log.info('Stopped engine')
678
679
680 class StandbyModule(MgrStandbyModule):
681 def serve(self):
682 server_addr = self.get_localized_config('server_addr', '::')
683 server_port = self.get_localized_config('server_port', DEFAULT_PORT)
684 self.log.info("server_addr: %s server_port: %s" % (server_addr, server_port))
685 cherrypy.config.update({
686 'server.socket_host': server_addr,
687 'server.socket_port': int(server_port),
688 'engine.autoreload.on': False
689 })
690
691 module = self
692
693 class Root(object):
694
695 @cherrypy.expose
696 def index(self):
697 active_uri = module.get_active_uri()
698 return '''<!DOCTYPE html>
699 <html>
700 <head><title>Ceph Exporter</title></head>
701 <body>
702 <h1>Ceph Exporter</h1>
703 <p><a href='{}metrics'>Metrics</a></p>
704 </body>
705 </html>'''.format(active_uri)
706
707 @cherrypy.expose
708 def metrics(self):
709 cherrypy.response.headers['Content-Type'] = 'text/plain'
710 return ''
711
712 cherrypy.tree.mount(Root(), '/', {})
713 self.log.info('Starting engine...')
714 cherrypy.engine.start()
715 self.log.info("Waiting for engine...")
716 cherrypy.engine.wait(state=cherrypy.engine.states.STOPPED)
717 self.log.info('Engine started.')
718
719 def shutdown(self):
720 self.log.info("Stopping engine...")
721 cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
722 cherrypy.engine.stop()
723 self.log.info("Stopped engine")