]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
7 from collections
import OrderedDict
8 from mgr_module
import MgrModule
, MgrStandbyModule
10 # Defaults for the Prometheus HTTP server. Can also set in config-key
11 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
12 # for Prometheus exporter port registry
18 # cherrypy likes to sys.exit on error. don't let it take us down too!
19 def os_exit_noop(*args
, **kwargs
):
23 os
._exit
= os_exit_noop
26 # to access things in class Module from subclass Root. Because
27 # it's a dict, the writer doesn't need to declare 'global' for access
29 _global_instance
= {'plugin': None}
32 def global_instance():
33 assert _global_instance
['plugin'] is not None
34 return _global_instance
['plugin']
37 def health_status_to_number(status
):
39 if status
== 'HEALTH_OK':
41 elif status
== 'HEALTH_WARN':
43 elif status
== 'HEALTH_ERR':
78 DF_CLUSTER
= ['total_bytes', 'total_used_bytes', 'total_objects']
80 DF_POOL
= ['max_avail', 'bytes_used', 'raw_bytes_used', 'objects', 'dirty',
81 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
83 OSD_FLAGS
= ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
84 'norecover', 'noscrub', 'nodeep-scrub')
86 FS_METADATA
= ('data_pools', 'id', 'metadata_pool', 'name')
88 MDS_METADATA
= ('id', 'fs', 'hostname', 'public_addr', 'rank', 'ceph_version')
90 MON_METADATA
= ('id', 'hostname', 'public_addr', 'rank', 'ceph_version')
92 OSD_METADATA
= ('cluster_addr', 'device_class', 'id', 'hostname', 'public_addr',
95 OSD_STATUS
= ['weight', 'up', 'in']
97 OSD_STATS
= ['apply_latency_ms', 'commit_latency_ms']
99 POOL_METADATA
= ('pool_id', 'name')
101 RGW_METADATA
= ('id', 'hostname', 'ceph_version')
103 DISK_OCCUPATION
= ('instance', 'device', 'ceph_daemon')
106 class Metrics(object):
108 self
.metrics
= self
._setup
_static
_metrics
()
111 def set(self
, key
, value
, labels
=('',)):
113 Set the value of a single Metrics. This should be used for static metrics,
116 self
.metrics
[key
].set(value
, labels
)
118 def append(self
, key
, value
, labels
= ('',)):
120 Append a metrics to the staging area. Use this to aggregate daemon specific
121 metrics that can appear and go away as daemons are added or removed.
123 if key
not in self
.pending
:
124 self
.pending
[key
] = []
125 self
.pending
[key
].append((labels
, value
))
129 When metrics aggregation is done, call Metrics.reset() to apply the
130 aggregated metric. This will remove all label -> value mappings for a
131 metric and set the new mapping (from pending). This means daemon specific
132 metrics os daemons that do no longer exist, are removed.
134 for k
, v
in self
.pending
.items():
135 self
.metrics
[k
].reset(v
)
138 def add_metric(self
, path
, metric
):
139 if path
not in self
.metrics
:
140 self
.metrics
[path
] = metric
143 def _setup_static_metrics(self
):
145 metrics
['health_status'] = Metric(
148 'Cluster health status'
150 metrics
['mon_quorum_status'] = Metric(
153 'Monitors in quorum',
156 metrics
['fs_metadata'] = Metric(
162 metrics
['mds_metadata'] = Metric(
168 metrics
['mon_metadata'] = Metric(
174 metrics
['osd_metadata'] = Metric(
181 # The reason for having this separate to OSD_METADATA is
182 # so that we can stably use the same tag names that
183 # the Prometheus node_exporter does
184 metrics
['disk_occupation'] = Metric(
187 'Associate Ceph daemon with disk used',
191 metrics
['pool_metadata'] = Metric(
198 metrics
['rgw_metadata'] = Metric(
205 metrics
['pg_total'] = Metric(
211 for flag
in OSD_FLAGS
:
212 path
= 'osd_flag_{}'.format(flag
)
213 metrics
[path
] = Metric(
216 'OSD Flag {}'.format(flag
)
218 for state
in OSD_STATUS
:
219 path
= 'osd_{}'.format(state
)
220 metrics
[path
] = Metric(
223 'OSD status {}'.format(state
),
226 for stat
in OSD_STATS
:
227 path
= 'osd_{}'.format(stat
)
228 metrics
[path
] = Metric(
231 'OSD stat {}'.format(stat
),
234 for state
in PG_STATES
:
235 path
= 'pg_{}'.format(state
)
236 metrics
[path
] = Metric(
239 'PG {}'.format(state
),
241 for state
in DF_CLUSTER
:
242 path
= 'cluster_{}'.format(state
)
243 metrics
[path
] = Metric(
246 'DF {}'.format(state
),
248 for state
in DF_POOL
:
249 path
= 'pool_{}'.format(state
)
250 metrics
[path
] = Metric(
253 'DF pool {}'.format(state
),
261 class Metric(object):
262 def __init__(self
, mtype
, name
, desc
, labels
=None):
266 self
.labelnames
= labels
# tuple if present
267 self
.value
= {} # indexed by label values
269 def set(self
, value
, labelvalues
=None):
270 # labelvalues must be a tuple
271 labelvalues
= labelvalues
or ('',)
272 self
.value
[labelvalues
] = value
274 def reset(self
, values
):
276 for labelvalues
, value
in values
:
277 self
.value
[labelvalues
] = value
279 def str_expfmt(self
):
281 def promethize(path
):
282 ''' replace illegal metric name characters '''
283 result
= path
.replace('.', '_').replace('+', '_plus').replace('::', '_')
285 # Hyphens usually turn into underscores, unless they are
287 if result
.endswith("-"):
288 result
= result
[0:-1] + "_minus"
290 result
= result
.replace("-", "_")
292 return "ceph_{0}".format(result
)
295 ''' represent as Go-compatible float '''
296 if value
== float('inf'):
298 if value
== float('-inf'):
300 if math
.isnan(value
):
302 return repr(float(value
))
304 name
= promethize(self
.name
)
307 # TYPE {name} {mtype}'''.format(
313 for labelvalues
, value
in self
.value
.items():
315 labels
= zip(self
.labelnames
, labelvalues
)
316 labels
= ','.join('%s="%s"' % (k
, v
) for k
, v
in labels
)
320 fmtstr
= '\n{name}{{{labels}}} {value}'
322 fmtstr
= '\n{name} {value}'
323 expfmt
+= fmtstr
.format(
326 value
=floatstr(value
),
331 class Module(MgrModule
):
334 "cmd": "prometheus self-test",
335 "desc": "Run a self test on the prometheus module",
340 def __init__(self
, *args
, **kwargs
):
341 super(Module
, self
).__init
__(*args
, **kwargs
)
342 self
.metrics
= Metrics()
343 self
.schema
= OrderedDict()
344 _global_instance
['plugin'] = self
346 def get_health(self
):
347 health
= json
.loads(self
.get('health')['json'])
348 self
.metrics
.set('health_status',
349 health_status_to_number(health
['status'])
353 # maybe get the to-be-exported metrics from a config?
355 for stat
in DF_CLUSTER
:
356 self
.metrics
.set('cluster_{}'.format(stat
), df
['stats'][stat
])
358 for pool
in df
['pools']:
360 self
.metrics
.append('pool_{}'.format(stat
),
365 fs_map
= self
.get('fs_map')
366 servers
= self
.get_service_list()
368 for fs
in fs_map
['filesystems']:
369 # collect fs metadata
370 data_pools
= ",".join([str(pool
) for pool
in fs
['mdsmap']['data_pools']])
371 self
.metrics
.append('fs_metadata', 1,
374 fs
['mdsmap']['metadata_pool'],
375 fs
['mdsmap']['fs_name']))
376 for gid
, daemon
in fs
['mdsmap']['info'].items():
378 host_version
= servers
.get((id_
, 'mds'), ('',''))
379 self
.metrics
.append('mds_metadata', 1,
380 (id_
, fs
['id'], host_version
[0],
381 daemon
['addr'], daemon
['rank'],
384 def get_quorum_status(self
):
385 mon_status
= json
.loads(self
.get('mon_status')['json'])
386 servers
= self
.get_service_list()
387 for mon
in mon_status
['monmap']['mons']:
390 host_version
= servers
.get((id_
, 'mon'), ('',''))
391 self
.metrics
.append('mon_metadata', 1,
392 (id_
, host_version
[0],
393 mon
['public_addr'].split(':')[0], rank
,
395 in_quorum
= int(rank
in mon_status
['quorum'])
396 self
.metrics
.append('mon_quorum_status', in_quorum
,
397 ('mon_{}'.format(id_
),))
399 def get_pg_status(self
):
400 # TODO add per pool status?
401 pg_status
= self
.get('pg_status')
403 # Set total count of PGs, first
404 self
.metrics
.set('pg_total', pg_status
['num_pgs'])
407 for pg
in pg_status
['pgs_by_state']:
408 for state
in pg
['state_name'].split('+'):
409 reported_states
[state
] = reported_states
.get(state
, 0) + pg
['count']
411 for state
in reported_states
:
412 path
= 'pg_{}'.format(state
)
414 self
.metrics
.set(path
, reported_states
[state
])
416 self
.log
.warn("skipping pg in unknown state {}".format(state
))
418 for state
in PG_STATES
:
419 if state
not in reported_states
:
421 self
.metrics
.set('pg_{}'.format(state
), 0)
423 self
.log
.warn("skipping pg in unknown state {}".format(state
))
425 def get_osd_stats(self
):
426 osd_stats
= self
.get('osd_stats')
427 for osd
in osd_stats
['osd_stats']:
429 for stat
in OSD_STATS
:
430 val
= osd
['perf_stat'][stat
]
431 self
.metrics
.append('osd_{}'.format(stat
), val
,
432 ('osd.{}'.format(id_
),))
434 def get_service_list(self
):
436 for server
in self
.list_servers():
437 version
= server
.get('ceph_version', '')
438 host
= server
.get('hostname', '')
439 for service
in server
.get('services', []):
440 ret
.update({(service
['id'], service
['type']): (host
, version
)})
443 def get_metadata_and_osd_status(self
):
444 osd_map
= self
.get('osd_map')
445 osd_flags
= osd_map
['flags'].split(',')
446 for flag
in OSD_FLAGS
:
447 self
.metrics
.set('osd_flag_{}'.format(flag
),
448 int(flag
in osd_flags
))
450 osd_devices
= self
.get('osd_map_crush')['devices']
451 servers
= self
.get_service_list()
452 for osd
in osd_map
['osds']:
453 # id can be used to link osd metrics and metadata
455 # collect osd metadata
456 p_addr
= osd
['public_addr'].split(':')[0]
457 c_addr
= osd
['cluster_addr'].split(':')[0]
458 if p_addr
== "-" or c_addr
== "-":
460 "Missing address metadata for osd {0}, skipping occupation"
461 " and metadata records for this osd".format(id_
)
466 for osd_device
in osd_devices
:
467 if osd_device
['id'] == id_
:
468 dev_class
= osd_device
.get('class', '')
471 if dev_class
is None:
473 "OSD {0} is missing from CRUSH map, skipping output".format(
477 host_version
= servers
.get((str(id_
), 'osd'), ('',''))
479 self
.metrics
.append('osd_metadata', 1, (
482 id_
, host_version
[0],
483 p_addr
, host_version
[1]
487 for state
in OSD_STATUS
:
489 self
.metrics
.append('osd_{}'.format(state
), status
,
490 ('osd.{}'.format(id_
),))
492 # collect disk occupation metadata
493 osd_metadata
= self
.get_metadata("osd", str(id_
))
494 if osd_metadata
is None:
496 dev_keys
= ("backend_filestore_dev_node", "bluestore_bdev_dev_node")
498 for dev_key
in dev_keys
:
499 val
= osd_metadata
.get(dev_key
, None)
500 if val
and val
!= "unknown":
503 osd_hostname
= osd_metadata
.get('hostname', None)
504 if osd_dev_node
and osd_hostname
:
505 self
.log
.debug("Got dev for osd {0}: {1}/{2}".format(
506 id_
, osd_hostname
, osd_dev_node
))
507 self
.metrics
.set('disk_occupation', 1, (
510 "osd.{0}".format(id_
)
513 self
.log
.info("Missing dev node metadata for osd {0}, skipping "
514 "occupation record for this osd".format(id_
))
517 for pool
in osd_map
['pools']:
518 self
.metrics
.append('pool_metadata', 1, (pool
['pool'], pool
['pool_name']))
520 # Populate rgw_metadata
521 for key
, value
in servers
.items():
522 service_id
, service_type
= key
523 if service_type
!= 'rgw':
525 hostname
, version
= value
529 (service_id
, hostname
, version
)
537 self
.get_quorum_status()
538 self
.get_metadata_and_osd_status()
541 for daemon
, counters
in self
.get_all_perf_counters().items():
542 for path
, counter_info
in counters
.items():
543 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
544 # XXX simplify first effort: no histograms
545 # averages are already collapsed to one value for us
546 if not stattype
or stattype
== 'histogram':
547 self
.log
.debug('ignoring %s, type %s' % (path
, stattype
))
550 self
.metrics
.add_metric(path
, Metric(
553 counter_info
['description'],
557 self
.metrics
.append(path
, counter_info
['value'], (daemon
,))
558 # It is sufficient to reset the pending metrics once per scrape
561 return self
.metrics
.metrics
563 def handle_command(self
, cmd
):
564 if cmd
['prefix'] == 'prometheus self-test':
566 return 0, '', 'Self-test OK'
568 return (-errno
.EINVAL
, '',
569 "Command not found '{0}'".format(cmd
['prefix']))
575 # collapse everything to '/'
576 def _cp_dispatch(self
, vpath
):
577 cherrypy
.request
.path
= ''
580 def format_metrics(self
, metrics
):
582 for m
in metrics
.values():
583 formatted
+= m
.str_expfmt()
584 return formatted
+ '\n'
588 return '''<!DOCTYPE html>
590 <head><title>Ceph Exporter</title></head>
592 <h1>Ceph Exporter</h1>
593 <p><a href='/metrics'>Metrics</a></p>
599 if global_instance().have_mon_connection():
600 metrics
= global_instance().collect()
601 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
603 return self
.format_metrics(metrics
)
605 raise cherrypy
.HTTPError(503, 'No MON connection')
607 server_addr
= self
.get_localized_config('server_addr', DEFAULT_ADDR
)
608 server_port
= self
.get_localized_config('server_port', DEFAULT_PORT
)
610 "server_addr: %s server_port: %s" %
611 (server_addr
, server_port
)
614 # Publish the URI that others may use to access the service we're
615 # about to start serving
616 self
.set_uri('http://{0}:{1}/'.format(
617 socket
.getfqdn() if server_addr
== '::' else server_addr
,
621 cherrypy
.config
.update({
622 'server.socket_host': server_addr
,
623 'server.socket_port': int(server_port
),
624 'engine.autoreload.on': False
626 cherrypy
.tree
.mount(Root(), "/")
627 self
.log
.info('Starting engine...')
628 cherrypy
.engine
.start()
629 self
.log
.info('Engine started.')
630 cherrypy
.engine
.block()
633 self
.log
.info('Stopping engine...')
634 cherrypy
.engine
.wait(state
=cherrypy
.engine
.states
.STARTED
)
635 cherrypy
.engine
.exit()
636 self
.log
.info('Stopped engine')
639 class StandbyModule(MgrStandbyModule
):
641 server_addr
= self
.get_localized_config('server_addr', '::')
642 server_port
= self
.get_localized_config('server_port', DEFAULT_PORT
)
643 self
.log
.info("server_addr: %s server_port: %s" % (server_addr
, server_port
))
644 cherrypy
.config
.update({
645 'server.socket_host': server_addr
,
646 'server.socket_port': int(server_port
),
647 'engine.autoreload.on': False
656 active_uri
= module
.get_active_uri()
657 return '''<!DOCTYPE html>
659 <head><title>Ceph Exporter</title></head>
661 <h1>Ceph Exporter</h1>
662 <p><a href='{}metrics'>Metrics</a></p>
664 </html>'''.format(active_uri
)
668 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
671 cherrypy
.tree
.mount(Root(), '/', {})
672 self
.log
.info('Starting engine...')
673 cherrypy
.engine
.start()
674 self
.log
.info("Waiting for engine...")
675 cherrypy
.engine
.wait(state
=cherrypy
.engine
.states
.STOPPED
)
676 self
.log
.info('Engine started.')
679 self
.log
.info("Stopping engine...")
680 cherrypy
.engine
.wait(state
=cherrypy
.engine
.states
.STARTED
)
681 cherrypy
.engine
.stop()
682 self
.log
.info("Stopped engine")