]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/prometheus/module.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
index 390828b07bb31f3adc76fb4f151f60bacf7e4aa6..f3068250fd9fd023e33fdb2b214feff98817d916 100644 (file)
@@ -1,4 +1,5 @@
 import cherrypy
+from collections import defaultdict
 from distutils.version import StrictVersion
 import json
 import errno
@@ -8,12 +9,13 @@ import re
 import socket
 import threading
 import time
-from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
+from mgr_module import MgrModule, MgrStandbyModule, PG_STATES
 from mgr_util import get_default_addr, profile_method
 from rbd import RBD
+from collections import namedtuple
 try:
-    from typing import Optional, Dict, Any, Set
-except:
+    from typing import DefaultDict, Optional, Dict, Any, Set
+except ImportError:
     pass
 
 # Defaults for the Prometheus HTTP server.  Can also set in config-key
@@ -108,6 +110,11 @@ DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
 
 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
 
+alert_metric = namedtuple('alert_metric', 'name description')
+HEALTH_CHECKS = [
+    alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process' ),
+]
+
 
 class Metric(object):
     def __init__(self, mtype, name, desc, labels=None):
@@ -181,19 +188,28 @@ class MetricCollectionThread(threading.Thread):
     def __init__(self, module):
         # type: (Module) -> None
         self.mod = module
+        self.active = True
+        self.event = threading.Event()
         super(MetricCollectionThread, self).__init__(target=self.collect)
 
     def collect(self):
         self.mod.log.info('starting metric collection thread')
-        while True:
+        while self.active:
             self.mod.log.debug('collecting cache in thread')
             if self.mod.have_mon_connection():
                 start_time = time.time()
-                data = self.mod.collect()
-                duration = time.time() - start_time
 
+                try:
+                    data = self.mod.collect()
+                except:
+                    # Log any issues encountered during the data collection and continue
+                    self.mod.log.exception("failed to collect metrics:")
+                    self.event.wait(self.mod.scrape_interval)
+                    continue
+
+                duration = time.time() - start_time
                 self.mod.log.debug('collecting cache in thread done')
-                
+
                 sleep_time = self.mod.scrape_interval - duration
                 if sleep_time < 0:
                     self.mod.log.warning(
@@ -212,11 +228,14 @@ class MetricCollectionThread(threading.Thread):
                     self.mod.collect_cache = data
                     self.mod.collect_time = duration
 
-                time.sleep(sleep_time)
+                self.event.wait(sleep_time)
             else:
                 self.mod.log.error('No MON connection')
-                time.sleep(self.mod.scrape_interval)
+                self.event.wait(self.mod.scrape_interval)
 
+    def stop(self):
+        self.active = False
+        self.event.set()
 
 class Module(MgrModule):
     COMMANDS = [
@@ -268,7 +287,7 @@ class Module(MgrModule):
         }  # type: Dict[str, Any]
         global _global_instance
         _global_instance = self
-        MetricCollectionThread(_global_instance).start()
+        self.metrics_thread = MetricCollectionThread(_global_instance)
 
     def _setup_static_metrics(self):
         metrics = {}
@@ -432,15 +451,62 @@ class Module(MgrModule):
                 'Number of {} objects'.format(state),
             )
 
+        for check in HEALTH_CHECKS:
+            path = 'healthcheck_{}'.format(check.name.lower())
+            metrics[path] = Metric(
+                'gauge',
+                path,
+                check.description,
+            )
+
         return metrics
 
     @profile_method()
     def get_health(self):
+
+        def _get_value(message, delim=' ', word_pos=0):
+            """Extract value from message (default is 1st field)"""
+            v_str = message.split(delim)[word_pos]
+            if v_str.isdigit():
+                return int(v_str), 0
+            return 0, 1
+
         health = json.loads(self.get('health')['json'])
+        # set overall health
         self.metrics['health_status'].set(
             health_status_to_number(health['status'])
         )
 
+        # Examine the health to see if any health checks triggered need to
+        # become a metric.
+        active_healthchecks = health.get('checks', {})
+        active_names = active_healthchecks.keys()
+
+        for check in HEALTH_CHECKS:
+            path = 'healthcheck_{}'.format(check.name.lower())
+
+            if path in self.metrics:
+
+                if check.name in active_names:
+                    check_data = active_healthchecks[check.name]
+                    message = check_data['summary'].get('message', '')
+                    v, err = 0, 0
+
+                    if check.name == "SLOW_OPS":
+                        # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have slow ops.
+                        v, err = _get_value(message)
+
+                    if err:
+                        self.log.error("healthcheck {} message format is incompatible and has been dropped".format(check.name))
+                        # drop the metric, so it's no longer emitted
+                        del self.metrics[path]
+                        continue
+                    else:
+                        self.metrics[path].set(v)
+                else:
+                    # health check is not active, so give it a default of 0
+                    self.metrics[path].set(0)
+
     @profile_method()
     def get_pool_stats(self):
         # retrieve pool stats to provide per pool recovery metrics
@@ -532,12 +598,10 @@ class Module(MgrModule):
 
         all_modules = {module.get('name'):module.get('can_run') for module in mgr_map['available_modules']}
 
-        ceph_release = None
         for mgr in all_mgrs:
             host_version = servers.get((mgr, 'mgr'), ('', ''))
             if mgr == active:
                 _state = 1
-                ceph_release = host_version[1].split()[-2] # e.g. nautilus
             else:
                 _state = 0
 
@@ -548,7 +612,7 @@ class Module(MgrModule):
             self.metrics['mgr_status'].set(_state, (
                 'mgr.{}'.format(mgr),
             ))
-        always_on_modules = mgr_map['always_on_modules'].get(ceph_release, [])
+        always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
         active_modules = list(always_on_modules)
         active_modules.extend(mgr_map['modules'])
 
@@ -571,8 +635,7 @@ class Module(MgrModule):
         pg_summary = self.get('pg_summary')
 
         for pool in pg_summary['by_pool']:
-            num_by_state = dict((state, 0) for state in PG_STATES)
-            num_by_state['total'] = 0
+            num_by_state = defaultdict(int)  # type: DefaultDict[str, int]
 
             for state_name, count in pg_summary['by_pool'][pool].items():
                 for state in state_name.split('+'):
@@ -1091,26 +1154,8 @@ class Module(MgrModule):
                 if service['type'] != 'mgr':
                     continue
                 id_ = service['id']
-                # get port for prometheus module at mgr with id_
-                # TODO use get_config_prefix or get_config here once
-                # https://github.com/ceph/ceph/pull/20458 is merged
-                result = CommandResult("")
-                assert isinstance(_global_instance, Module)
-                _global_instance.send_command(
-                    result, "mon", '',
-                    json.dumps({
-                        "prefix": "config-key get",
-                        'key': "config/mgr/mgr/prometheus/{}/server_port".format(id_),
-                    }),
-                    "")
-                r, outb, outs = result.wait()
-                if r != 0:
-                    _global_instance.log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
-                    targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
-                else:
-                    port = json.loads(outb)
-                    targets.append('{}:{}'.format(hostname, port))
-
+                port = self._get_module_option('server_port', DEFAULT_PORT, id_)
+                targets.append(f'{hostname}:{port}')
         ret = [
             {
                 "targets": targets,
@@ -1213,6 +1258,8 @@ class Module(MgrModule):
             (server_addr, server_port)
         )
 
+        self.metrics_thread.start()
+
         # Publish the URI that others may use to access the service we're
         # about to start serving
         self.set_uri('http://{0}:{1}/'.format(
@@ -1232,9 +1279,13 @@ class Module(MgrModule):
         # wait for the shutdown event
         self.shutdown_event.wait()
         self.shutdown_event.clear()
+        # tell metrics collection thread to stop collecting new metrics
+        self.metrics_thread.stop()
         cherrypy.engine.stop()
         self.log.info('Engine stopped.')
         self.shutdown_rbd_stats()
+        # wait for the metrics collection thread to stop
+        self.metrics_thread.join()
 
     def shutdown(self):
         self.log.info('Stopping engine...')