]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
7 from collections
import OrderedDict
8 from mgr_module
import MgrModule
, MgrStandbyModule
10 # Defaults for the Prometheus HTTP server. Can also set in config-key
11 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
12 # for Prometheus exporter port registry
18 # cherrypy likes to sys.exit on error. don't let it take us down too!
19 def os_exit_noop(*args
, **kwargs
):
23 os
._exit
= os_exit_noop
26 # to access things in class Module from subclass Root. Because
27 # it's a dict, the writer doesn't need to declare 'global' for access
29 _global_instance
= {'plugin': None}
32 def global_instance():
33 assert _global_instance
['plugin'] is not None
34 return _global_instance
['plugin']
37 def health_status_to_number(status
):
39 if status
== 'HEALTH_OK':
41 elif status
== 'HEALTH_WARN':
43 elif status
== 'HEALTH_ERR':
78 DF_CLUSTER
= ['total_bytes', 'total_used_bytes', 'total_objects']
80 DF_POOL
= ['max_avail', 'bytes_used', 'raw_bytes_used', 'objects', 'dirty',
81 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
83 OSD_FLAGS
= ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
84 'norecover', 'noscrub', 'nodeep-scrub')
86 FS_METADATA
= ('data_pools', 'fs_id', 'metadata_pool', 'name')
88 MDS_METADATA
= ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
91 MON_METADATA
= ('ceph_daemon', 'hostname', 'public_addr', 'rank', 'ceph_version')
93 OSD_METADATA
= ('ceph_daemon', 'cluster_addr', 'device_class', 'hostname',
94 'public_addr', 'ceph_version')
96 OSD_STATUS
= ['weight', 'up', 'in']
98 OSD_STATS
= ['apply_latency_ms', 'commit_latency_ms']
100 POOL_METADATA
= ('pool_id', 'name')
102 RGW_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version')
104 DISK_OCCUPATION
= ( 'ceph_daemon', 'device','instance')
106 NUM_OBJECTS
= ['degraded', 'misplaced', 'unfound']
109 class Metrics(object):
111 self
.metrics
= self
._setup
_static
_metrics
()
114 def set(self
, key
, value
, labels
=('',)):
116 Set the value of a single Metrics. This should be used for static metrics,
119 self
.metrics
[key
].set(value
, labels
)
121 def append(self
, key
, value
, labels
= ('',)):
123 Append a metrics to the staging area. Use this to aggregate daemon specific
124 metrics that can appear and go away as daemons are added or removed.
126 if key
not in self
.pending
:
127 self
.pending
[key
] = []
128 self
.pending
[key
].append((labels
, value
))
132 When metrics aggregation is done, call Metrics.reset() to apply the
133 aggregated metric. This will remove all label -> value mappings for a
134 metric and set the new mapping (from pending). This means daemon specific
135 metrics os daemons that do no longer exist, are removed.
137 for k
, v
in self
.pending
.items():
138 self
.metrics
[k
].reset(v
)
141 def add_metric(self
, path
, metric
):
142 if path
not in self
.metrics
:
143 self
.metrics
[path
] = metric
146 def _setup_static_metrics(self
):
148 metrics
['health_status'] = Metric(
151 'Cluster health status'
153 metrics
['mon_quorum_status'] = Metric(
156 'Monitors in quorum',
159 metrics
['fs_metadata'] = Metric(
165 metrics
['mds_metadata'] = Metric(
171 metrics
['mon_metadata'] = Metric(
177 metrics
['osd_metadata'] = Metric(
184 # The reason for having this separate to OSD_METADATA is
185 # so that we can stably use the same tag names that
186 # the Prometheus node_exporter does
187 metrics
['disk_occupation'] = Metric(
190 'Associate Ceph daemon with disk used',
194 metrics
['pool_metadata'] = Metric(
201 metrics
['rgw_metadata'] = Metric(
208 metrics
['pg_total'] = Metric(
214 for flag
in OSD_FLAGS
:
215 path
= 'osd_flag_{}'.format(flag
)
216 metrics
[path
] = Metric(
219 'OSD Flag {}'.format(flag
)
221 for state
in OSD_STATUS
:
222 path
= 'osd_{}'.format(state
)
223 metrics
[path
] = Metric(
226 'OSD status {}'.format(state
),
229 for stat
in OSD_STATS
:
230 path
= 'osd_{}'.format(stat
)
231 metrics
[path
] = Metric(
234 'OSD stat {}'.format(stat
),
237 for state
in PG_STATES
:
238 path
= 'pg_{}'.format(state
)
239 metrics
[path
] = Metric(
242 'PG {}'.format(state
),
244 for state
in DF_CLUSTER
:
245 path
= 'cluster_{}'.format(state
)
246 metrics
[path
] = Metric(
249 'DF {}'.format(state
),
251 for state
in DF_POOL
:
252 path
= 'pool_{}'.format(state
)
253 metrics
[path
] = Metric(
256 'DF pool {}'.format(state
),
259 for state
in NUM_OBJECTS
:
260 path
= 'num_objects_{}'.format(state
)
261 metrics
[path
] = Metric(
264 'Number of {} objects'.format(state
),
271 class Metric(object):
272 def __init__(self
, mtype
, name
, desc
, labels
=None):
276 self
.labelnames
= labels
# tuple if present
277 self
.value
= {} # indexed by label values
279 def set(self
, value
, labelvalues
=None):
280 # labelvalues must be a tuple
281 labelvalues
= labelvalues
or ('',)
282 self
.value
[labelvalues
] = value
284 def reset(self
, values
):
286 for labelvalues
, value
in values
:
287 self
.value
[labelvalues
] = value
289 def str_expfmt(self
):
291 def promethize(path
):
292 ''' replace illegal metric name characters '''
293 result
= path
.replace('.', '_').replace('+', '_plus').replace('::', '_')
295 # Hyphens usually turn into underscores, unless they are
297 if result
.endswith("-"):
298 result
= result
[0:-1] + "_minus"
300 result
= result
.replace("-", "_")
302 return "ceph_{0}".format(result
)
305 ''' represent as Go-compatible float '''
306 if value
== float('inf'):
308 if value
== float('-inf'):
310 if math
.isnan(value
):
312 return repr(float(value
))
314 name
= promethize(self
.name
)
317 # TYPE {name} {mtype}'''.format(
323 for labelvalues
, value
in self
.value
.items():
325 labels
= zip(self
.labelnames
, labelvalues
)
326 labels
= ','.join('%s="%s"' % (k
, v
) for k
, v
in labels
)
330 fmtstr
= '\n{name}{{{labels}}} {value}'
332 fmtstr
= '\n{name} {value}'
333 expfmt
+= fmtstr
.format(
336 value
=floatstr(value
),
341 class Module(MgrModule
):
344 "cmd": "prometheus self-test",
345 "desc": "Run a self test on the prometheus module",
350 def __init__(self
, *args
, **kwargs
):
351 super(Module
, self
).__init
__(*args
, **kwargs
)
352 self
.metrics
= Metrics()
353 self
.schema
= OrderedDict()
354 _global_instance
['plugin'] = self
356 def get_health(self
):
357 health
= json
.loads(self
.get('health')['json'])
358 self
.metrics
.set('health_status',
359 health_status_to_number(health
['status'])
363 # maybe get the to-be-exported metrics from a config?
365 for stat
in DF_CLUSTER
:
366 self
.metrics
.set('cluster_{}'.format(stat
), df
['stats'][stat
])
368 for pool
in df
['pools']:
370 self
.metrics
.append('pool_{}'.format(stat
),
375 fs_map
= self
.get('fs_map')
376 servers
= self
.get_service_list()
378 for fs
in fs_map
['filesystems']:
379 # collect fs metadata
380 data_pools
= ",".join([str(pool
) for pool
in fs
['mdsmap']['data_pools']])
381 self
.metrics
.append('fs_metadata', 1,
384 fs
['mdsmap']['metadata_pool'],
385 fs
['mdsmap']['fs_name']))
386 self
.log
.debug('mdsmap: {}'.format(fs
['mdsmap']))
387 for gid
, daemon
in fs
['mdsmap']['info'].items():
389 host_version
= servers
.get((id_
, 'mds'), ('',''))
390 self
.metrics
.append('mds_metadata', 1,
391 ('mds.{}'.format(id_
), fs
['id'],
392 host_version
[0], daemon
['addr'],
393 daemon
['rank'], host_version
[1]))
395 def get_quorum_status(self
):
396 mon_status
= json
.loads(self
.get('mon_status')['json'])
397 servers
= self
.get_service_list()
398 for mon
in mon_status
['monmap']['mons']:
401 host_version
= servers
.get((id_
, 'mon'), ('',''))
402 self
.metrics
.append('mon_metadata', 1,
403 ('mon.{}'.format(id_
), host_version
[0],
404 mon
['public_addr'].split(':')[0], rank
,
406 in_quorum
= int(rank
in mon_status
['quorum'])
407 self
.metrics
.append('mon_quorum_status', in_quorum
,
408 ('mon.{}'.format(id_
),))
410 def get_pg_status(self
):
411 # TODO add per pool status?
412 pg_status
= self
.get('pg_status')
414 # Set total count of PGs, first
415 self
.metrics
.set('pg_total', pg_status
['num_pgs'])
418 for pg
in pg_status
['pgs_by_state']:
419 for state
in pg
['state_name'].split('+'):
420 reported_states
[state
] = reported_states
.get(state
, 0) + pg
['count']
422 for state
in reported_states
:
423 path
= 'pg_{}'.format(state
)
425 self
.metrics
.set(path
, reported_states
[state
])
427 self
.log
.warn("skipping pg in unknown state {}".format(state
))
429 for state
in PG_STATES
:
430 if state
not in reported_states
:
432 self
.metrics
.set('pg_{}'.format(state
), 0)
434 self
.log
.warn("skipping pg in unknown state {}".format(state
))
436 def get_osd_stats(self
):
437 osd_stats
= self
.get('osd_stats')
438 for osd
in osd_stats
['osd_stats']:
440 for stat
in OSD_STATS
:
441 val
= osd
['perf_stat'][stat
]
442 self
.metrics
.append('osd_{}'.format(stat
), val
,
443 ('osd.{}'.format(id_
),))
445 def get_service_list(self
):
447 for server
in self
.list_servers():
448 version
= server
.get('ceph_version', '')
449 host
= server
.get('hostname', '')
450 for service
in server
.get('services', []):
451 ret
.update({(service
['id'], service
['type']): (host
, version
)})
454 def get_metadata_and_osd_status(self
):
455 osd_map
= self
.get('osd_map')
456 osd_flags
= osd_map
['flags'].split(',')
457 for flag
in OSD_FLAGS
:
458 self
.metrics
.set('osd_flag_{}'.format(flag
),
459 int(flag
in osd_flags
))
461 osd_devices
= self
.get('osd_map_crush')['devices']
462 servers
= self
.get_service_list()
463 for osd
in osd_map
['osds']:
464 # id can be used to link osd metrics and metadata
466 # collect osd metadata
467 p_addr
= osd
['public_addr'].split(':')[0]
468 c_addr
= osd
['cluster_addr'].split(':')[0]
469 if p_addr
== "-" or c_addr
== "-":
471 "Missing address metadata for osd {0}, skipping occupation"
472 " and metadata records for this osd".format(id_
)
477 for osd_device
in osd_devices
:
478 if osd_device
['id'] == id_
:
479 dev_class
= osd_device
.get('class', '')
482 if dev_class
is None:
484 "OSD {0} is missing from CRUSH map, skipping output".format(
488 host_version
= servers
.get((str(id_
), 'osd'), ('',''))
490 self
.metrics
.append('osd_metadata', 1, (
491 'osd.{}'.format(id_
),
495 p_addr
, host_version
[1]
499 for state
in OSD_STATUS
:
501 self
.metrics
.append('osd_{}'.format(state
), status
,
502 ('osd.{}'.format(id_
),))
504 # collect disk occupation metadata
505 osd_metadata
= self
.get_metadata("osd", str(id_
))
506 if osd_metadata
is None:
508 dev_keys
= ("backend_filestore_dev_node", "bluestore_bdev_dev_node")
510 for dev_key
in dev_keys
:
511 val
= osd_metadata
.get(dev_key
, None)
512 if val
and val
!= "unknown":
515 osd_hostname
= osd_metadata
.get('hostname', None)
516 if osd_dev_node
and osd_hostname
:
517 self
.log
.debug("Got dev for osd {0}: {1}/{2}".format(
518 id_
, osd_hostname
, osd_dev_node
))
519 self
.metrics
.set('disk_occupation', 1, (
520 "osd.{0}".format(id_
),
525 self
.log
.info("Missing dev node metadata for osd {0}, skipping "
526 "occupation record for this osd".format(id_
))
529 for pool
in osd_map
['pools']:
530 self
.metrics
.append('pool_metadata', 1, (pool
['pool'], pool
['pool_name']))
532 # Populate rgw_metadata
533 for key
, value
in servers
.items():
534 service_id
, service_type
= key
535 if service_type
!= 'rgw':
537 hostname
, version
= value
541 ('{}.{}'.format(service_type
, service_id
), hostname
, version
)
544 def get_num_objects(self
):
545 pg_sum
= self
.get('pg_summary')['pg_stats_sum']['stat_sum']
546 for obj
in NUM_OBJECTS
:
547 stat
= 'num_objects_{}'.format(obj
)
548 self
.metrics
.set(stat
, pg_sum
[stat
])
555 self
.get_quorum_status()
556 self
.get_metadata_and_osd_status()
558 self
.get_num_objects()
560 for daemon
, counters
in self
.get_all_perf_counters().items():
561 for path
, counter_info
in counters
.items():
562 # Skip histograms, they are represented by long running avgs
563 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
564 if not stattype
or stattype
== 'histogram':
565 self
.log
.debug('ignoring %s, type %s' % (path
, stattype
))
568 # Get the value of the counter
569 value
= self
._perfvalue
_to
_value
(counter_info
['type'], counter_info
['value'])
571 # Represent the long running avgs as sum/count pairs
572 if counter_info
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
573 _path
= path
+ '_sum'
574 self
.metrics
.add_metric(_path
, Metric(
577 counter_info
['description'] + ' Total',
580 self
.metrics
.append(_path
, value
, (daemon
,))
582 _path
= path
+ '_count'
583 self
.metrics
.add_metric(_path
, Metric(
586 counter_info
['description'] + ' Count',
589 self
.metrics
.append(_path
, counter_info
['count'], (daemon
,))
591 self
.metrics
.add_metric(path
, Metric(
594 counter_info
['description'],
597 self
.metrics
.append(path
, value
, (daemon
,))
599 # It is sufficient to reset the pending metrics once per scrape
602 return self
.metrics
.metrics
604 def handle_command(self
, cmd
):
605 if cmd
['prefix'] == 'prometheus self-test':
607 return 0, '', 'Self-test OK'
609 return (-errno
.EINVAL
, '',
610 "Command not found '{0}'".format(cmd
['prefix']))
616 # collapse everything to '/'
617 def _cp_dispatch(self
, vpath
):
618 cherrypy
.request
.path
= ''
621 def format_metrics(self
, metrics
):
623 for m
in metrics
.values():
624 formatted
+= m
.str_expfmt()
625 return formatted
+ '\n'
629 return '''<!DOCTYPE html>
631 <head><title>Ceph Exporter</title></head>
633 <h1>Ceph Exporter</h1>
634 <p><a href='/metrics'>Metrics</a></p>
640 if global_instance().have_mon_connection():
641 metrics
= global_instance().collect()
642 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
644 return self
.format_metrics(metrics
)
646 raise cherrypy
.HTTPError(503, 'No MON connection')
648 server_addr
= self
.get_localized_config('server_addr', DEFAULT_ADDR
)
649 server_port
= self
.get_localized_config('server_port', DEFAULT_PORT
)
651 "server_addr: %s server_port: %s" %
652 (server_addr
, server_port
)
655 # Publish the URI that others may use to access the service we're
656 # about to start serving
657 self
.set_uri('http://{0}:{1}/'.format(
658 socket
.getfqdn() if server_addr
== '::' else server_addr
,
662 cherrypy
.config
.update({
663 'server.socket_host': server_addr
,
664 'server.socket_port': int(server_port
),
665 'engine.autoreload.on': False
667 cherrypy
.tree
.mount(Root(), "/")
668 self
.log
.info('Starting engine...')
669 cherrypy
.engine
.start()
670 self
.log
.info('Engine started.')
671 cherrypy
.engine
.block()
674 self
.log
.info('Stopping engine...')
675 cherrypy
.engine
.wait(state
=cherrypy
.engine
.states
.STARTED
)
676 cherrypy
.engine
.exit()
677 self
.log
.info('Stopped engine')
680 class StandbyModule(MgrStandbyModule
):
682 server_addr
= self
.get_localized_config('server_addr', '::')
683 server_port
= self
.get_localized_config('server_port', DEFAULT_PORT
)
684 self
.log
.info("server_addr: %s server_port: %s" % (server_addr
, server_port
))
685 cherrypy
.config
.update({
686 'server.socket_host': server_addr
,
687 'server.socket_port': int(server_port
),
688 'engine.autoreload.on': False
697 active_uri
= module
.get_active_uri()
698 return '''<!DOCTYPE html>
700 <head><title>Ceph Exporter</title></head>
702 <h1>Ceph Exporter</h1>
703 <p><a href='{}metrics'>Metrics</a></p>
705 </html>'''.format(active_uri
)
709 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
712 cherrypy
.tree
.mount(Root(), '/', {})
713 self
.log
.info('Starting engine...')
714 cherrypy
.engine
.start()
715 self
.log
.info("Waiting for engine...")
716 cherrypy
.engine
.wait(state
=cherrypy
.engine
.states
.STOPPED
)
717 self
.log
.info('Engine started.')
720 self
.log
.info("Stopping engine...")
721 cherrypy
.engine
.wait(state
=cherrypy
.engine
.states
.STARTED
)
722 cherrypy
.engine
.stop()
723 self
.log
.info("Stopped engine")