]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/prometheus/module.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
index c721a8579d9dba28dfbe26e4241398e150fefd68..2ed5d1714d27ad8e38848278ba75821c938a1e99 100644 (file)
@@ -9,8 +9,12 @@ import socket
 import threading
 import time
 from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
-from mgr_util import get_default_addr
+from mgr_util import get_default_addr, profile_method
 from rbd import RBD
+try:
+    from typing import Optional, Dict, Any, Set
+except:
+    pass
 
 # Defaults for the Prometheus HTTP server.  Can also set in config-key
 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
@@ -43,12 +47,7 @@ os._exit = os_exit_noop
 # to access things in class Module from subclass Root.  Because
 # it's a dict, the writer doesn't need to declare 'global' for access
 
-_global_instance = {'plugin': None}
-
-
-def global_instance():
-    assert _global_instance['plugin'] is not None
-    return _global_instance['plugin']
+_global_instance = None  # type: Optional[Module]
 
 
 def health_status_to_number(status):
@@ -161,8 +160,8 @@ class Metric(object):
 
         for labelvalues, value in self.value.items():
             if self.labelnames:
-                labels = zip(self.labelnames, labelvalues)
-                labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
+                labels_list = zip(self.labelnames, labelvalues)
+                labels = ','.join('%s="%s"' % (k, v) for k, v in labels_list)
             else:
                 labels = ''
             if labels:
@@ -177,6 +176,47 @@ class Metric(object):
         return expfmt
 
 
+class MetricCollectionThread(threading.Thread):
+    def __init__(self, module):
+        # type: (Module) -> None
+        self.mod = module
+        super(MetricCollectionThread, self).__init__(target=self.collect)
+
+    def collect(self):
+        self.mod.log.info('starting metric collection thread')
+        while True:
+            self.mod.log.debug('collecting cache in thread')
+            if self.mod.have_mon_connection():
+                start_time = time.time()
+                data = self.mod.collect()
+                duration = time.time() - start_time
+
+                self.mod.log.debug('collecting cache in thread done')
+                
+                sleep_time = self.mod.scrape_interval - duration
+                if sleep_time < 0:
+                    self.mod.log.warning(
+                        'Collecting data took more time than configured scrape interval. '
+                        'This possibly results in stale data. Please check the '
+                        '`stale_cache_strategy` configuration option. '
+                        'Collecting data took {:.2f} seconds but scrape interval is configured '
+                        'to be {:.0f} seconds.'.format(
+                            duration,
+                            self.mod.scrape_interval,
+                        )
+                    )
+                    sleep_time = 0
+
+                with self.mod.collect_lock:
+                    self.mod.collect_cache = data
+                    self.mod.collect_time = duration
+
+                time.sleep(sleep_time)
+            else:
+                self.mod.log.error('No MON connection')
+                time.sleep(self.mod.scrape_interval)
+
+
 class Module(MgrModule):
     COMMANDS = [
         {
@@ -190,17 +230,22 @@ class Module(MgrModule):
         {'name': 'server_addr'},
         {'name': 'server_port'},
         {'name': 'scrape_interval'},
+        {'name': 'stale_cache_strategy'},
         {'name': 'rbd_stats_pools'},
         {'name': 'rbd_stats_pools_refresh_interval', 'type': 'int', 'default': 300},
     ]
 
+    STALE_CACHE_FAIL = 'fail'
+    STALE_CACHE_RETURN = 'return'
+
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
         self.metrics = self._setup_static_metrics()
         self.shutdown_event = threading.Event()
-        self.collect_lock = threading.RLock()
-        self.collect_time = 0
-        self.collect_timeout = 5.0
+        self.collect_lock = threading.Lock()
+        self.collect_time = 0.0
+        self.scrape_interval = 15.0
+        self.stale_cache_strategy = self.STALE_CACHE_FAIL
         self.collect_cache = None
         self.rbd_stats = {
             'pools': {},
@@ -219,8 +264,10 @@ class Module(MgrModule):
                 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
                                  'desc': 'RBD image reads latency (msec)'},
             },
-        }
-        _global_instance['plugin'] = self
+        }  # type: Dict[str, Any]
+        global _global_instance
+        _global_instance = self
+        MetricCollectionThread(_global_instance).start()
 
     def _setup_static_metrics(self):
         metrics = {}
@@ -386,12 +433,14 @@ class Module(MgrModule):
 
         return metrics
 
+    @profile_method()
     def get_health(self):
         health = json.loads(self.get('health')['json'])
         self.metrics['health_status'].set(
             health_status_to_number(health['status'])
         )
 
+    @profile_method()
     def get_pool_stats(self):
         # retrieve pool stats to provide per pool recovery metrics
         # (osd_pool_stats moved to mgr in Mimic)
@@ -403,6 +452,7 @@ class Module(MgrModule):
                     (pool['pool_id'],)
                 )
 
+    @profile_method()
     def get_df(self):
         # maybe get the to-be-exported metrics from a config?
         df = self.get('df')
@@ -416,6 +466,7 @@ class Module(MgrModule):
                     (pool['id'],)
                 )
 
+    @profile_method()
     def get_fs(self):
         fs_map = self.get('fs_map')
         servers = self.get_service_list()
@@ -449,6 +500,7 @@ class Module(MgrModule):
                     daemon['rank'], host_version[1]
                 ))
 
+    @profile_method()
     def get_quorum_status(self):
         mon_status = json.loads(self.get('mon_status')['json'])
         servers = self.get_service_list()
@@ -466,6 +518,7 @@ class Module(MgrModule):
                 'mon.{}'.format(id_),
             ))
 
+    @profile_method()
     def get_mgr_status(self):
         mgr_map = self.get('mgr_map')
         servers = self.get_service_list()
@@ -511,6 +564,7 @@ class Module(MgrModule):
             self.metrics['mgr_module_status'].set(_state, (mod_name,))
             self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
 
+    @profile_method()
     def get_pg_status(self):
 
         pg_summary = self.get('pg_summary')
@@ -530,6 +584,7 @@ class Module(MgrModule):
                 except KeyError:
                     self.log.warning("skipping pg in unknown state {}".format(state))
 
+    @profile_method()
     def get_osd_stats(self):
         osd_stats = self.get('osd_stats')
         for osd in osd_stats['osd_stats']:
@@ -549,6 +604,7 @@ class Module(MgrModule):
                 ret.update({(service['id'], service['type']): (host, version)})
         return ret
 
+    @profile_method()
     def get_metadata_and_osd_status(self):
         osd_map = self.get('osd_map')
         osd_flags = osd_map['flags'].split(',')
@@ -672,12 +728,14 @@ class Module(MgrModule):
                         for k in RBD_MIRROR_METADATA)
                 )
 
+    @profile_method()
     def get_num_objects(self):
         pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
         for obj in NUM_OBJECTS:
             stat = 'num_objects_{}'.format(obj)
             self.metrics[stat].set(pg_sum[stat])
 
+    @profile_method()
     def get_rbd_stats(self):
         # Per RBD image stats is collected by registering a dynamic osd perf
         # stats query that tells OSDs to group stats for requests associated
@@ -694,24 +752,47 @@ class Module(MgrModule):
 
         # Parse rbd_stats_pools option, which is a comma or space separated
         # list of pool[/namespace] entries. If no namespace is specifed the
-        # stats are collected for every namespace in the pool.
+        # stats are collected for every namespace in the pool. The wildcard
+        # '*' can be used to indicate all pools or namespaces
         pools_string = self.get_localized_module_option('rbd_stats_pools', '')
-        pools = {}
-        for p in [x for x in re.split('[\s,]+', pools_string) if x]:
-            s = p.split('/', 2)
+        pool_keys = []
+        for x in re.split('[\s,]+', pools_string):
+            if not x:
+                continue
+
+            s = x.split('/', 2)
             pool_name = s[0]
-            if len(s) == 1:
+            namespace_name = None
+            if len(s) == 2:
+                namespace_name = s[1]
+
+            if pool_name == "*":
+                # collect for all pools
+                osd_map = self.get('osd_map')
+                for pool in osd_map['pools']:
+                    if 'rbd' not in pool.get('application_metadata', {}):
+                        continue
+                    pool_keys.append((pool['pool_name'], namespace_name))
+            else:
+                pool_keys.append((pool_name, namespace_name))
+
+        pools = {}  # type: Dict[str, Set[str]]
+        for pool_key in pool_keys:
+            pool_name = pool_key[0]
+            namespace_name = pool_key[1]
+            if not namespace_name or namespace_name == "*":
                 # empty set means collect for all namespaces
                 pools[pool_name] = set()
                 continue
+
             if pool_name not in pools:
                 pools[pool_name] = set()
             elif not pools[pool_name]:
                 continue
-            pools[pool_name].add(s[1])
+            pools[pool_name].add(namespace_name)
 
         rbd_stats_pools = {}
-        for pool_id in list(self.rbd_stats['pools']):
+        for pool_id in self.rbd_stats['pools'].keys():
             name = self.rbd_stats['pools'][pool_id]['name']
             if name not in pools:
                 del self.rbd_stats['pools'][pool_id]
@@ -927,6 +1008,7 @@ class Module(MgrModule):
 
         self.metrics.update(new_metrics)
 
+    @profile_method(True)
     def collect(self):
         # Clear the metrics before scraping
         for k in self.metrics.keys():
@@ -1012,7 +1094,8 @@ class Module(MgrModule):
                 # TODO use get_config_prefix or get_config here once
                 # https://github.com/ceph/ceph/pull/20458 is merged
                 result = CommandResult("")
-                global_instance().send_command(
+                assert isinstance(_global_instance, Module)
+                _global_instance.send_command(
                     result, "mon", '',
                     json.dumps({
                         "prefix": "config-key get",
@@ -1021,7 +1104,7 @@ class Module(MgrModule):
                     "")
                 r, outb, outs = result.wait()
                 if r != 0:
-                    global_instance().log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
+                    _global_instance.log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
                     targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
                 else:
                     port = json.loads(outb)
@@ -1068,34 +1151,57 @@ class Module(MgrModule):
 
             @cherrypy.expose
             def metrics(self):
-                instance = global_instance()
                 # Lock the function execution
-                try:
-                    instance.collect_lock.acquire()
-                    return self._metrics(instance)
-                finally:
-                    instance.collect_lock.release()
+                assert isinstance(_global_instance, Module)
+                with _global_instance.collect_lock:
+                    return self._metrics(_global_instance)
 
             @staticmethod
             def _metrics(instance):
-                # Return cached data if available and collected before the
-                # cache times out
-                if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
-                    cherrypy.response.headers['Content-Type'] = 'text/plain'
-                    return instance.collect_cache
+                # type: (Module) -> Any
+                # Return cached data if available
+                if not instance.collect_cache:
+                    raise cherrypy.HTTPError(503, 'No cached data available yet')
 
-                if instance.have_mon_connection():
-                    instance.collect_cache = None
-                    instance.collect_time = time.time()
-                    instance.collect_cache = instance.collect()
+                def respond():
+                    assert isinstance(instance, Module)
                     cherrypy.response.headers['Content-Type'] = 'text/plain'
                     return instance.collect_cache
-                else:
-                    raise cherrypy.HTTPError(503, 'No MON connection')
+
+                if instance.collect_time < instance.scrape_interval:
+                    # Respond if cache isn't stale
+                    return respond()
+
+                if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN:
+                    # Respond even if cache is stale
+                    instance.log.info(
+                        'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
+                        'returning metrics from stale cache.'.format(
+                            instance.collect_time,
+                            instance.collect_time - instance.scrape_interval
+                        )
+                    )
+                    return respond()
+
+                if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL:
+                    # Fail if cache is stale
+                    msg = (
+                        'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
+                        'returning "service unavailable".'.format(
+                            instance.collect_time,
+                            instance.collect_time - instance.scrape_interval,
+                        )
+                    )
+                    instance.log.error(msg)
+                    raise cherrypy.HTTPError(503, msg)
 
         # Make the cache timeout for collecting configurable
-        self.collect_timeout = float(self.get_localized_module_option(
-            'scrape_interval', 5.0))
+        self.scrape_interval = float(self.get_localized_module_option('scrape_interval', 15.0))
+
+        self.stale_cache_strategy = self.get_localized_module_option('stale_cache_strategy', 'log')
+        if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL,
+                                             self.STALE_CACHE_RETURN]:
+            self.stale_cache_strategy = self.STALE_CACHE_FAIL
 
         server_addr = self.get_localized_module_option(
             'server_addr', get_default_addr())