import math
import os
import socket
-from collections import OrderedDict
+import threading
+import time
from mgr_module import MgrModule, MgrStandbyModule
# Defaults for the Prometheus HTTP server. Can also set in config-key
NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
-class Metrics(object):
- def __init__(self):
- self.metrics = self._setup_static_metrics()
- self.pending = {}
-
- def set(self, key, value, labels=('',)):
- '''
- Set the value of a single Metrics. This should be used for static metrics,
- e.g. cluster health.
- '''
- self.metrics[key].set(value, labels)
-
- def append(self, key, value, labels = ('',)):
- '''
- Append a metrics to the staging area. Use this to aggregate daemon specific
- metrics that can appear and go away as daemons are added or removed.
- '''
- if key not in self.pending:
- self.pending[key] = []
- self.pending[key].append((labels, value))
-
- def reset(self):
- '''
- When metrics aggregation is done, call Metrics.reset() to apply the
- aggregated metric. This will remove all label -> value mappings for a
- metric and set the new mapping (from pending). This means daemon specific
- metrics os daemons that do no longer exist, are removed.
- '''
- for k, v in self.pending.items():
- self.metrics[k].reset(v)
- self.pending = {}
-
- def add_metric(self, path, metric):
- if path not in self.metrics:
- self.metrics[path] = metric
+class Metric(object):
+ def __init__(self, mtype, name, desc, labels=None):
+ self.mtype = mtype
+ self.name = name
+ self.desc = desc
+ self.labelnames = labels # tuple if present
+ self.value = {} # indexed by label values
+
+ def clear(self):
+ self.value = {}
+
+ def set(self, value, labelvalues=None):
+ # labelvalues must be a tuple
+ labelvalues = labelvalues or ('',)
+ self.value[labelvalues] = value
+ def str_expfmt(self):
+
+ def promethize(path):
+ ''' replace illegal metric name characters '''
+ result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
+
+ # Hyphens usually turn into underscores, unless they are
+ # trailing
+ if result.endswith("-"):
+ result = result[0:-1] + "_minus"
+ else:
+ result = result.replace("-", "_")
+
+ return "ceph_{0}".format(result)
+
+ def floatstr(value):
+ ''' represent as Go-compatible float '''
+ if value == float('inf'):
+ return '+Inf'
+ if value == float('-inf'):
+ return '-Inf'
+ if math.isnan(value):
+ return 'NaN'
+ return repr(float(value))
+
+ name = promethize(self.name)
+ expfmt = '''
+# HELP {name} {desc}
+# TYPE {name} {mtype}'''.format(
+ name=name,
+ desc=self.desc,
+ mtype=self.mtype,
+ )
+
+ for labelvalues, value in self.value.items():
+ if self.labelnames:
+ labels = zip(self.labelnames, labelvalues)
+ labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
+ else:
+ labels = ''
+ if labels:
+ fmtstr = '\n{name}{{{labels}}} {value}'
+ else:
+ fmtstr = '\n{name} {value}'
+ expfmt += fmtstr.format(
+ name=name,
+ labels=labels,
+ value=floatstr(value),
+ )
+ return expfmt
+
+
+class Module(MgrModule):
+ COMMANDS = [
+ {
+ "cmd": "prometheus self-test",
+ "desc": "Run a self test on the prometheus module",
+ "perm": "rw"
+ },
+ ]
+
+ OPTIONS = [
+ {'name': 'server_addr'},
+ {'name': 'server_port'},
+ {'name': 'scrape_interval'},
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.metrics = self._setup_static_metrics()
+ self.shutdown_event = threading.Event()
+ self.collect_lock = threading.RLock()
+ self.collect_time = 0
+ self.collect_timeout = 5.0
+ self.collect_cache = None
+ _global_instance['plugin'] = self
def _setup_static_metrics(self):
metrics = {}
return metrics
-
-
-class Metric(object):
- def __init__(self, mtype, name, desc, labels=None):
- self.mtype = mtype
- self.name = name
- self.desc = desc
- self.labelnames = labels # tuple if present
- self.value = {} # indexed by label values
-
- def set(self, value, labelvalues=None):
- # labelvalues must be a tuple
- labelvalues = labelvalues or ('',)
- self.value[labelvalues] = value
-
- def reset(self, values):
- self.value = {}
- for labelvalues, value in values:
- self.value[labelvalues] = value
-
- def str_expfmt(self):
-
- def promethize(path):
- ''' replace illegal metric name characters '''
- result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
-
- # Hyphens usually turn into underscores, unless they are
- # trailing
- if result.endswith("-"):
- result = result[0:-1] + "_minus"
- else:
- result = result.replace("-", "_")
-
- return "ceph_{0}".format(result)
-
- def floatstr(value):
- ''' represent as Go-compatible float '''
- if value == float('inf'):
- return '+Inf'
- if value == float('-inf'):
- return '-Inf'
- if math.isnan(value):
- return 'NaN'
- return repr(float(value))
-
- name = promethize(self.name)
- expfmt = '''
-# HELP {name} {desc}
-# TYPE {name} {mtype}'''.format(
- name=name,
- desc=self.desc,
- mtype=self.mtype,
- )
-
- for labelvalues, value in self.value.items():
- if self.labelnames:
- labels = zip(self.labelnames, labelvalues)
- labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
- else:
- labels = ''
- if labels:
- fmtstr = '\n{name}{{{labels}}} {value}'
- else:
- fmtstr = '\n{name} {value}'
- expfmt += fmtstr.format(
- name=name,
- labels=labels,
- value=floatstr(value),
- )
- return expfmt
-
-
-class Module(MgrModule):
- COMMANDS = [
- {
- "cmd": "prometheus self-test",
- "desc": "Run a self test on the prometheus module",
- "perm": "rw"
- },
- ]
-
- def __init__(self, *args, **kwargs):
- super(Module, self).__init__(*args, **kwargs)
- self.metrics = Metrics()
- self.schema = OrderedDict()
- _global_instance['plugin'] = self
-
def get_health(self):
health = json.loads(self.get('health')['json'])
- self.metrics.set('health_status',
- health_status_to_number(health['status'])
+ self.metrics['health_status'].set(
+ health_status_to_number(health['status'])
)
def get_df(self):
# maybe get the to-be-exported metrics from a config?
df = self.get('df')
for stat in DF_CLUSTER:
- self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
+ self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
for pool in df['pools']:
for stat in DF_POOL:
- self.metrics.append('pool_{}'.format(stat),
- pool['stats'][stat],
- (pool['id'],))
+ self.metrics['pool_{}'.format(stat)].set(
+ pool['stats'][stat],
+ (pool['id'],)
+ )
def get_fs(self):
fs_map = self.get('fs_map')
for fs in fs_map['filesystems']:
# collect fs metadata
data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
- self.metrics.append('fs_metadata', 1,
- (data_pools,
- fs['id'],
- fs['mdsmap']['metadata_pool'],
- fs['mdsmap']['fs_name']))
+ self.metrics['fs_metadata'].set(1, (
+ data_pools,
+ fs['id'],
+ fs['mdsmap']['metadata_pool'],
+ fs['mdsmap']['fs_name']
+ ))
self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
for gid, daemon in fs['mdsmap']['info'].items():
id_ = daemon['name']
host_version = servers.get((id_, 'mds'), ('',''))
- self.metrics.append('mds_metadata', 1,
- ('mds.{}'.format(id_), fs['id'],
- host_version[0], daemon['addr'],
- daemon['rank'], host_version[1]))
+ self.metrics['mds_metadata'].set(1, (
+ 'mds.{}'.format(id_), fs['id'],
+ host_version[0], daemon['addr'],
+ daemon['rank'], host_version[1]
+ ))
def get_quorum_status(self):
mon_status = json.loads(self.get('mon_status')['json'])
rank = mon['rank']
id_ = mon['name']
host_version = servers.get((id_, 'mon'), ('',''))
- self.metrics.append('mon_metadata', 1,
- ('mon.{}'.format(id_), host_version[0],
- mon['public_addr'].split(':')[0], rank,
- host_version[1]))
+ self.metrics['mon_metadata'].set(1, (
+ 'mon.{}'.format(id_), host_version[0],
+ mon['public_addr'].split(':')[0], rank,
+ host_version[1]
+ ))
in_quorum = int(rank in mon_status['quorum'])
- self.metrics.append('mon_quorum_status', in_quorum,
- ('mon.{}'.format(id_),))
+ self.metrics['mon_quorum_status'].set(in_quorum, (
+ 'mon.{}'.format(id_),
+ ))
def get_pg_status(self):
# TODO add per pool status?
pg_status = self.get('pg_status')
# Set total count of PGs, first
- self.metrics.set('pg_total', pg_status['num_pgs'])
+ self.metrics['pg_total'].set(pg_status['num_pgs'])
reported_states = {}
for pg in pg_status['pgs_by_state']:
for state in reported_states:
path = 'pg_{}'.format(state)
try:
- self.metrics.set(path, reported_states[state])
+ self.metrics[path].set(reported_states[state])
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
for state in PG_STATES:
if state not in reported_states:
try:
- self.metrics.set('pg_{}'.format(state), 0)
+ self.metrics['pg_{}'.format(state)].set(0)
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
id_ = osd['osd']
for stat in OSD_STATS:
val = osd['perf_stat'][stat]
- self.metrics.append('osd_{}'.format(stat), val,
- ('osd.{}'.format(id_),))
+ self.metrics['osd_{}'.format(stat)].set(val, (
+ 'osd.{}'.format(id_),
+ ))
def get_service_list(self):
ret = {}
osd_map = self.get('osd_map')
osd_flags = osd_map['flags'].split(',')
for flag in OSD_FLAGS:
- self.metrics.set('osd_flag_{}'.format(flag),
- int(flag in osd_flags))
+ self.metrics['osd_flag_{}'.format(flag)].set(
+ int(flag in osd_flags)
+ )
osd_devices = self.get('osd_map_crush')['devices']
servers = self.get_service_list()
host_version = servers.get((str(id_), 'osd'), ('',''))
- self.metrics.append('osd_metadata', 1, (
+ self.metrics['osd_metadata'].set(1, (
'osd.{}'.format(id_),
c_addr,
dev_class,
# collect osd status
for state in OSD_STATUS:
status = osd[state]
- self.metrics.append('osd_{}'.format(state), status,
- ('osd.{}'.format(id_),))
+ self.metrics['osd_{}'.format(state)].set(status, (
+ 'osd.{}'.format(id_),
+ ))
# collect disk occupation metadata
osd_metadata = self.get_metadata("osd", str(id_))
if osd_dev_node and osd_hostname:
self.log.debug("Got dev for osd {0}: {1}/{2}".format(
id_, osd_hostname, osd_dev_node))
- self.metrics.set('disk_occupation', 1, (
+ self.metrics['disk_occupation'].set(1, (
"osd.{0}".format(id_),
osd_dev_node,
osd_hostname
pool_meta = []
for pool in osd_map['pools']:
- self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
+ self.metrics['pool_metadata'].set(1, (pool['pool'], pool['pool_name']))
# Populate rgw_metadata
for key, value in servers.items():
if service_type != 'rgw':
continue
hostname, version = value
- self.metrics.append(
- 'rgw_metadata',
+ self.metrics['rgw_metadata'].set(
1,
('{}.{}'.format(service_type, service_id), hostname, version)
)
pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
for obj in NUM_OBJECTS:
stat = 'num_objects_{}'.format(obj)
- self.metrics.set(stat, pg_sum[stat])
+ self.metrics[stat].set(pg_sum[stat])
def collect(self):
+ # Clear the metrics before scraping
+ for k in self.metrics.keys():
+ self.metrics[k].clear()
+
self.get_health()
self.get_df()
self.get_fs()
# Represent the long running avgs as sum/count pairs
if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
_path = path + '_sum'
- self.metrics.add_metric(_path, Metric(
- stattype,
- _path,
- counter_info['description'] + ' Total',
- ("ceph_daemon",),
- ))
- self.metrics.append(_path, value, (daemon,))
+ if _path not in self.metrics:
+ self.metrics[_path] = Metric(
+ stattype,
+ _path,
+ counter_info['description'] + ' Total',
+ ("ceph_daemon",),
+ )
+ self.metrics[_path].set(value, (daemon,))
_path = path + '_count'
- self.metrics.add_metric(_path, Metric(
- 'counter',
- _path,
- counter_info['description'] + ' Count',
- ("ceph_daemon",),
- ))
- self.metrics.append(_path, counter_info['count'], (daemon,))
+ if _path not in self.metrics:
+ self.metrics[_path] = Metric(
+ 'counter',
+ _path,
+ counter_info['description'] + ' Count',
+ ("ceph_daemon",),
+ )
+ self.metrics[_path].set(counter_info['count'], (daemon,))
else:
- self.metrics.add_metric(path, Metric(
- stattype,
- path,
- counter_info['description'],
- ("ceph_daemon",),
- ))
- self.metrics.append(path, value, (daemon,))
-
- # It is sufficient to reset the pending metrics once per scrape
- self.metrics.reset()
-
- return self.metrics.metrics
+ if path not in self.metrics:
+ self.metrics[path] = Metric(
+ stattype,
+ path,
+ counter_info['description'],
+ ("ceph_daemon",),
+ )
+ self.metrics[path].set(value, (daemon,))
+
+ # Return formatted metrics and clear no longer used data
+ _metrics = [m.str_expfmt() for m in self.metrics.values()]
+ for k in self.metrics.keys():
+ self.metrics[k].clear()
+
+ return ''.join(_metrics) + '\n'
def handle_command(self, cmd):
if cmd['prefix'] == 'prometheus self-test':
cherrypy.request.path = ''
return self
- def format_metrics(self, metrics):
- formatted = ''
- for m in metrics.values():
- formatted += m.str_expfmt()
- return formatted + '\n'
-
@cherrypy.expose
def index(self):
return '''<!DOCTYPE html>
@cherrypy.expose
def metrics(self):
- if global_instance().have_mon_connection():
- metrics = global_instance().collect()
+ instance = global_instance()
+ # Lock the function execution
+ try:
+ instance.collect_lock.acquire()
+ return self._metrics(instance)
+ finally:
+ instance.collect_lock.release()
+
+ def _metrics(self, instance):
+ # Return cached data if available and collected before the cache times out
+ if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
cherrypy.response.headers['Content-Type'] = 'text/plain'
- if metrics:
- return self.format_metrics(metrics)
+ return instance.collect_cache
+
+ if instance.have_mon_connection():
+ instance.collect_cache = None
+ instance.collect_time = time.time()
+ instance.collect_cache = instance.collect()
+ cherrypy.response.headers['Content-Type'] = 'text/plain'
+ return instance.collect_cache
else:
raise cherrypy.HTTPError(503, 'No MON connection')
+ # Make the cache timeout for collecting configurable
+ self.collect_timeout = self.get_localized_config('scrape_interval', 5.0)
+
server_addr = self.get_localized_config('server_addr', DEFAULT_ADDR)
server_port = self.get_localized_config('server_port', DEFAULT_PORT)
self.log.info(
self.log.info('Starting engine...')
cherrypy.engine.start()
self.log.info('Engine started.')
- cherrypy.engine.block()
+ # wait for the shutdown event
+ self.shutdown_event.wait()
+ self.shutdown_event.clear()
+ cherrypy.engine.stop()
+ self.log.info('Engine stopped.')
def shutdown(self):
self.log.info('Stopping engine...')
- cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
- cherrypy.engine.exit()
- self.log.info('Stopped engine')
+ self.shutdown_event.set()
class StandbyModule(MgrStandbyModule):
+ def __init__(self, *args, **kwargs):
+ super(StandbyModule, self).__init__(*args, **kwargs)
+ self.shutdown_event = threading.Event()
+
def serve(self):
server_addr = self.get_localized_config('server_addr', '::')
server_port = self.get_localized_config('server_port', DEFAULT_PORT)
cherrypy.tree.mount(Root(), '/', {})
self.log.info('Starting engine...')
cherrypy.engine.start()
- self.log.info("Waiting for engine...")
- cherrypy.engine.wait(state=cherrypy.engine.states.STOPPED)
self.log.info('Engine started.')
+ # Wait for shutdown event
+ self.shutdown_event.wait()
+ self.shutdown_event.clear()
+ cherrypy.engine.stop()
+ self.log.info('Engine stopped.')
def shutdown(self):
self.log.info("Stopping engine...")
- cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
- cherrypy.engine.stop()
+ self.shutdown_event.set()
self.log.info("Stopped engine")