]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/prometheus/module.py
update sources to 12.2.10
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
index 2c4598a39e254737f8ec17e8a0ff85f6c557b0d9..03dd9962a7ec01a25a8039cb42d346c5f3796af7 100644 (file)
@@ -4,7 +4,8 @@ import errno
 import math
 import os
 import socket
-from collections import OrderedDict
+import threading
+import time
 from mgr_module import MgrModule, MgrStandbyModule
 
 # Defaults for the Prometheus HTTP server.  Can also set in config-key
@@ -106,42 +107,98 @@ DISK_OCCUPATION = ( 'ceph_daemon', 'device','instance')
 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
 
 
-class Metrics(object):
-    def __init__(self):
-        self.metrics = self._setup_static_metrics()
-        self.pending = {}
-
-    def set(self, key, value, labels=('',)):
-        '''
-        Set the value of a single Metrics. This should be used for static metrics,
-        e.g. cluster health.
-        '''
-        self.metrics[key].set(value, labels)
-
-    def append(self, key, value, labels = ('',)):
-        '''
-        Append a metrics to the staging area. Use this to aggregate daemon specific
-        metrics that can appear and go away as daemons are added or removed.
-        '''
-        if key not in self.pending:
-            self.pending[key] = []
-        self.pending[key].append((labels, value))
-
-    def reset(self):
-        '''
-        When metrics aggregation is done, call Metrics.reset() to apply the
-        aggregated metric. This will remove all label -> value mappings for a
-        metric and set the new mapping (from pending). This means daemon specific
-        metrics os daemons that do no longer exist, are removed.
-        '''
-        for k, v in self.pending.items():
-            self.metrics[k].reset(v)
-        self.pending = {}
-
-    def add_metric(self, path, metric):
-        if path not in self.metrics:
-            self.metrics[path] = metric
+class Metric(object):
+    def __init__(self, mtype, name, desc, labels=None):
+        self.mtype = mtype
+        self.name = name
+        self.desc = desc
+        self.labelnames = labels    # tuple if present
+        self.value = {}             # indexed by label values
+
+    def clear(self):
+        self.value = {}
+
+    def set(self, value, labelvalues=None):
+        # labelvalues must be a tuple
+        labelvalues = labelvalues or ('',)
+        self.value[labelvalues] = value
 
+    def str_expfmt(self):
+
+        def promethize(path):
+            ''' replace illegal metric name characters '''
+            result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
+
+            # Hyphens usually turn into underscores, unless they are
+            # trailing
+            if result.endswith("-"):
+                result = result[0:-1] + "_minus"
+            else:
+                result = result.replace("-", "_")
+
+            return "ceph_{0}".format(result)
+
+        def floatstr(value):
+            ''' represent as Go-compatible float '''
+            if value == float('inf'):
+                return '+Inf'
+            if value == float('-inf'):
+                return '-Inf'
+            if math.isnan(value):
+                return 'NaN'
+            return repr(float(value))
+
+        name = promethize(self.name)
+        expfmt = '''
+# HELP {name} {desc}
+# TYPE {name} {mtype}'''.format(
+            name=name,
+            desc=self.desc,
+            mtype=self.mtype,
+        )
+
+        for labelvalues, value in self.value.items():
+            if self.labelnames:
+                labels = zip(self.labelnames, labelvalues)
+                labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
+            else:
+                labels = ''
+            if labels:
+                fmtstr = '\n{name}{{{labels}}} {value}'
+            else:
+                fmtstr = '\n{name} {value}'
+            expfmt += fmtstr.format(
+                name=name,
+                labels=labels,
+                value=floatstr(value),
+            )
+        return expfmt
+
+
+class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "prometheus self-test",
+            "desc": "Run a self test on the prometheus module",
+            "perm": "rw"
+        },
+    ]
+
+    OPTIONS = [
+            {'name': 'server_addr'},
+            {'name': 'server_port'},
+            {'name': 'scrape_interval'},
+    ]
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        self.metrics = self._setup_static_metrics()
+        self.shutdown_event = threading.Event()
+        self.collect_lock = threading.RLock()
+        self.collect_time = 0
+        self.collect_timeout = 5.0
+        self.collect_cache = None
+        _global_instance['plugin'] = self
 
     def _setup_static_metrics(self):
         metrics = {}
@@ -266,110 +323,24 @@ class Metrics(object):
 
         return metrics
 
-
-
-class Metric(object):
-    def __init__(self, mtype, name, desc, labels=None):
-        self.mtype = mtype
-        self.name = name
-        self.desc = desc
-        self.labelnames = labels    # tuple if present
-        self.value = {}             # indexed by label values
-
-    def set(self, value, labelvalues=None):
-        # labelvalues must be a tuple
-        labelvalues = labelvalues or ('',)
-        self.value[labelvalues] = value
-
-    def reset(self, values):
-        self.value = {}
-        for labelvalues, value in values:
-            self.value[labelvalues] = value
-
-    def str_expfmt(self):
-
-        def promethize(path):
-            ''' replace illegal metric name characters '''
-            result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
-
-            # Hyphens usually turn into underscores, unless they are
-            # trailing
-            if result.endswith("-"):
-                result = result[0:-1] + "_minus"
-            else:
-                result = result.replace("-", "_")
-
-            return "ceph_{0}".format(result)
-
-        def floatstr(value):
-            ''' represent as Go-compatible float '''
-            if value == float('inf'):
-                return '+Inf'
-            if value == float('-inf'):
-                return '-Inf'
-            if math.isnan(value):
-                return 'NaN'
-            return repr(float(value))
-
-        name = promethize(self.name)
-        expfmt = '''
-# HELP {name} {desc}
-# TYPE {name} {mtype}'''.format(
-            name=name,
-            desc=self.desc,
-            mtype=self.mtype,
-        )
-
-        for labelvalues, value in self.value.items():
-            if self.labelnames:
-                labels = zip(self.labelnames, labelvalues)
-                labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
-            else:
-                labels = ''
-            if labels:
-                fmtstr = '\n{name}{{{labels}}} {value}'
-            else:
-                fmtstr = '\n{name} {value}'
-            expfmt += fmtstr.format(
-                name=name,
-                labels=labels,
-                value=floatstr(value),
-            )
-        return expfmt
-
-
-class Module(MgrModule):
-    COMMANDS = [
-        {
-            "cmd": "prometheus self-test",
-            "desc": "Run a self test on the prometheus module",
-            "perm": "rw"
-        },
-    ]
-
-    def __init__(self, *args, **kwargs):
-        super(Module, self).__init__(*args, **kwargs)
-        self.metrics = Metrics()
-        self.schema = OrderedDict()
-        _global_instance['plugin'] = self
-
     def get_health(self):
         health = json.loads(self.get('health')['json'])
-        self.metrics.set('health_status',
-                         health_status_to_number(health['status'])
+        self.metrics['health_status'].set(
+            health_status_to_number(health['status'])
         )
 
     def get_df(self):
         # maybe get the to-be-exported metrics from a config?
         df = self.get('df')
         for stat in DF_CLUSTER:
-            self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
+            self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
 
         for pool in df['pools']:
             for stat in DF_POOL:
-                self.metrics.append('pool_{}'.format(stat),
-                                    pool['stats'][stat],
-                                    (pool['id'],))
+                self.metrics['pool_{}'.format(stat)].set(
+                    pool['stats'][stat],
+                    (pool['id'],)
+                )
 
     def get_fs(self):
         fs_map = self.get('fs_map')
@@ -378,19 +349,21 @@ class Module(MgrModule):
         for fs in fs_map['filesystems']:
             # collect fs metadata
             data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
-            self.metrics.append('fs_metadata', 1,
-                                (data_pools,
-                                 fs['id'],
-                                 fs['mdsmap']['metadata_pool'],
-                                 fs['mdsmap']['fs_name']))
+            self.metrics['fs_metadata'].set(1, (
+                data_pools,
+                fs['id'],
+                fs['mdsmap']['metadata_pool'],
+                fs['mdsmap']['fs_name']
+            ))
             self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
             for gid, daemon in fs['mdsmap']['info'].items():
                 id_ = daemon['name']
                 host_version = servers.get((id_, 'mds'), ('',''))
-                self.metrics.append('mds_metadata', 1,
-                                    ('mds.{}'.format(id_), fs['id'],
-                                     host_version[0], daemon['addr'],
-                                     daemon['rank'], host_version[1]))
+                self.metrics['mds_metadata'].set(1, (
+                    'mds.{}'.format(id_), fs['id'],
+                    host_version[0], daemon['addr'],
+                    daemon['rank'], host_version[1]
+                ))
 
     def get_quorum_status(self):
         mon_status = json.loads(self.get('mon_status')['json'])
@@ -399,20 +372,22 @@ class Module(MgrModule):
             rank = mon['rank']
             id_ = mon['name']
             host_version = servers.get((id_, 'mon'), ('',''))
-            self.metrics.append('mon_metadata', 1,
-                                ('mon.{}'.format(id_), host_version[0],
-                                 mon['public_addr'].split(':')[0], rank,
-                                 host_version[1]))
+            self.metrics['mon_metadata'].set(1, (
+                'mon.{}'.format(id_), host_version[0],
+                mon['public_addr'].split(':')[0], rank,
+                host_version[1]
+            ))
             in_quorum = int(rank in mon_status['quorum'])
-            self.metrics.append('mon_quorum_status', in_quorum,
-                                ('mon.{}'.format(id_),))
+            self.metrics['mon_quorum_status'].set(in_quorum, (
+                'mon.{}'.format(id_),
+            ))
 
     def get_pg_status(self):
         # TODO add per pool status?
         pg_status = self.get('pg_status')
 
         # Set total count of PGs, first
-        self.metrics.set('pg_total', pg_status['num_pgs'])
+        self.metrics['pg_total'].set(pg_status['num_pgs'])
 
         reported_states = {}
         for pg in pg_status['pgs_by_state']:
@@ -422,14 +397,14 @@ class Module(MgrModule):
         for state in reported_states:
             path = 'pg_{}'.format(state)
             try:
-                self.metrics.set(path, reported_states[state])
+                self.metrics[path].set(reported_states[state])
             except KeyError:
                 self.log.warn("skipping pg in unknown state {}".format(state))
 
         for state in PG_STATES:
             if state not in reported_states:
                 try:
-                    self.metrics.set('pg_{}'.format(state), 0)
+                    self.metrics['pg_{}'.format(state)].set(0)
                 except KeyError:
                     self.log.warn("skipping pg in unknown state {}".format(state))
 
@@ -439,8 +414,9 @@ class Module(MgrModule):
             id_ = osd['osd']
             for stat in OSD_STATS:
                 val = osd['perf_stat'][stat]
-                self.metrics.append('osd_{}'.format(stat), val,
-                                    ('osd.{}'.format(id_),))
+                self.metrics['osd_{}'.format(stat)].set(val, (
+                    'osd.{}'.format(id_),
+                ))
 
     def get_service_list(self):
         ret = {}
@@ -455,8 +431,9 @@ class Module(MgrModule):
         osd_map = self.get('osd_map')
         osd_flags = osd_map['flags'].split(',')
         for flag in OSD_FLAGS:
-            self.metrics.set('osd_flag_{}'.format(flag),
-                int(flag in osd_flags))
+            self.metrics['osd_flag_{}'.format(flag)].set(
+                int(flag in osd_flags)
+            )
 
         osd_devices = self.get('osd_map_crush')['devices']
         servers = self.get_service_list()
@@ -487,7 +464,7 @@ class Module(MgrModule):
 
             host_version = servers.get((str(id_), 'osd'), ('',''))
 
-            self.metrics.append('osd_metadata', 1, (
+            self.metrics['osd_metadata'].set(1, (
                 'osd.{}'.format(id_),
                 c_addr,
                 dev_class,
@@ -498,8 +475,9 @@ class Module(MgrModule):
             # collect osd status
             for state in OSD_STATUS:
                 status = osd[state]
-                self.metrics.append('osd_{}'.format(state), status,
-                                    ('osd.{}'.format(id_),))
+                self.metrics['osd_{}'.format(state)].set(status, (
+                    'osd.{}'.format(id_),
+                ))
 
             # collect disk occupation metadata
             osd_metadata = self.get_metadata("osd", str(id_))
@@ -516,7 +494,7 @@ class Module(MgrModule):
             if osd_dev_node and osd_hostname:
                 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
                     id_, osd_hostname, osd_dev_node))
-                self.metrics.set('disk_occupation', 1, (
+                self.metrics['disk_occupation'].set(1, (
                     "osd.{0}".format(id_),
                     osd_dev_node,
                     osd_hostname
@@ -527,7 +505,7 @@ class Module(MgrModule):
 
         pool_meta = []
         for pool in osd_map['pools']:
-            self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
+            self.metrics['pool_metadata'].set(1, (pool['pool'], pool['pool_name']))
 
         # Populate rgw_metadata
         for key, value in servers.items():
@@ -535,8 +513,7 @@ class Module(MgrModule):
             if service_type != 'rgw':
                 continue
             hostname, version = value
-            self.metrics.append(
-                'rgw_metadata',
+            self.metrics['rgw_metadata'].set(
                 1,
                 ('{}.{}'.format(service_type, service_id), hostname, version)
             )
@@ -545,9 +522,13 @@ class Module(MgrModule):
         pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
         for obj in NUM_OBJECTS:
             stat = 'num_objects_{}'.format(obj)
-            self.metrics.set(stat, pg_sum[stat])
+            self.metrics[stat].set(pg_sum[stat])
 
     def collect(self):
+        # Clear the metrics before scraping
+        for k in self.metrics.keys():
+            self.metrics[k].clear()
+
         self.get_health()
         self.get_df()
         self.get_fs()
@@ -571,35 +552,40 @@ class Module(MgrModule):
                 # Represent the long running avgs as sum/count pairs
                 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
                     _path = path + '_sum'
-                    self.metrics.add_metric(_path, Metric(
-                        stattype,
-                        _path,
-                        counter_info['description'] + ' Total',
-                        ("ceph_daemon",),
-                    ))
-                    self.metrics.append(_path, value, (daemon,))
+                    if _path not in self.metrics:
+                        self.metrics[_path] = Metric(
+                            stattype,
+                            _path,
+                            counter_info['description'] + ' Total',
+                            ("ceph_daemon",),
+                        )
+                    self.metrics[_path].set(value, (daemon,))
 
                     _path = path + '_count'
-                    self.metrics.add_metric(_path, Metric(
-                        'counter',
-                        _path,
-                        counter_info['description'] + ' Count',
-                        ("ceph_daemon",),
-                    ))
-                    self.metrics.append(_path, counter_info['count'], (daemon,))
+                    if _path not in self.metrics:
+                        self.metrics[_path] = Metric(
+                            'counter',
+                            _path,
+                            counter_info['description'] + ' Count',
+                            ("ceph_daemon",),
+                        )
+                    self.metrics[_path].set(counter_info['count'], (daemon,))
                 else:
-                    self.metrics.add_metric(path, Metric(
-                        stattype,
-                        path,
-                        counter_info['description'],
-                        ("ceph_daemon",),
-                    ))
-                    self.metrics.append(path, value, (daemon,))
-
-        # It is sufficient to reset the pending metrics once per scrape
-        self.metrics.reset()
-
-        return self.metrics.metrics
+                    if path not in self.metrics:
+                        self.metrics[path] = Metric(
+                            stattype,
+                            path,
+                            counter_info['description'],
+                            ("ceph_daemon",),
+                        )
+                    self.metrics[path].set(value, (daemon,))
+
+        # Return formatted metrics and clear no longer used data
+        _metrics = [m.str_expfmt() for m in self.metrics.values()]
+        for k in self.metrics.keys():
+            self.metrics[k].clear()
+
+        return ''.join(_metrics) + '\n'
 
     def handle_command(self, cmd):
         if cmd['prefix'] == 'prometheus self-test':
@@ -618,12 +604,6 @@ class Module(MgrModule):
                 cherrypy.request.path = ''
                 return self
 
-            def format_metrics(self, metrics):
-                formatted = ''
-                for m in metrics.values():
-                    formatted += m.str_expfmt()
-                return formatted + '\n'
-
             @cherrypy.expose
             def index(self):
                 return '''<!DOCTYPE html>
@@ -637,14 +617,32 @@ class Module(MgrModule):
 
             @cherrypy.expose
             def metrics(self):
-                if global_instance().have_mon_connection():
-                    metrics = global_instance().collect()
+                instance = global_instance()
+                # Lock the function execution
+                try:
+                    instance.collect_lock.acquire()
+                    return self._metrics(instance)
+                finally:
+                    instance.collect_lock.release()
+
+            def _metrics(self, instance):
+                # Return cached data if available and collected before the cache times out
+                if instance.collect_cache and time.time() - instance.collect_time  < instance.collect_timeout:
                     cherrypy.response.headers['Content-Type'] = 'text/plain'
-                    if metrics:
-                        return self.format_metrics(metrics)
+                    return instance.collect_cache
+
+                if instance.have_mon_connection():
+                    instance.collect_cache = None
+                    instance.collect_time = time.time()
+                    instance.collect_cache = instance.collect()
+                    cherrypy.response.headers['Content-Type'] = 'text/plain'
+                    return instance.collect_cache
                 else:
                     raise cherrypy.HTTPError(503, 'No MON connection')
 
+        # Make the cache timeout for collecting configurable
+        self.collect_timeout = self.get_localized_config('scrape_interval', 5.0)
+
         server_addr = self.get_localized_config('server_addr', DEFAULT_ADDR)
         server_port = self.get_localized_config('server_port', DEFAULT_PORT)
         self.log.info(
@@ -668,16 +666,22 @@ class Module(MgrModule):
         self.log.info('Starting engine...')
         cherrypy.engine.start()
         self.log.info('Engine started.')
-        cherrypy.engine.block()
+        # wait for the shutdown event
+        self.shutdown_event.wait()
+        self.shutdown_event.clear()
+        cherrypy.engine.stop()
+        self.log.info('Engine stopped.')
 
     def shutdown(self):
         self.log.info('Stopping engine...')
-        cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
-        cherrypy.engine.exit()
-        self.log.info('Stopped engine')
+        self.shutdown_event.set()
 
 
 class StandbyModule(MgrStandbyModule):
+    def __init__(self, *args, **kwargs):
+        super(StandbyModule, self).__init__(*args, **kwargs)
+        self.shutdown_event = threading.Event()
+
     def serve(self):
         server_addr = self.get_localized_config('server_addr', '::')
         server_port = self.get_localized_config('server_port', DEFAULT_PORT)
@@ -712,12 +716,14 @@ class StandbyModule(MgrStandbyModule):
         cherrypy.tree.mount(Root(), '/', {})
         self.log.info('Starting engine...')
         cherrypy.engine.start()
-        self.log.info("Waiting for engine...")
-        cherrypy.engine.wait(state=cherrypy.engine.states.STOPPED)
         self.log.info('Engine started.')
+        # Wait for shutdown event
+        self.shutdown_event.wait()
+        self.shutdown_event.clear()
+        cherrypy.engine.stop()
+        self.log.info('Engine stopped.')
 
     def shutdown(self):
         self.log.info("Stopping engine...")
-        cherrypy.engine.wait(state=cherrypy.engine.states.STARTED)
-        cherrypy.engine.stop()
+        self.shutdown_event.set()
         self.log.info("Stopped engine")