import cherrypy
+from collections import defaultdict
from distutils.version import StrictVersion
import json
import errno
import socket
import threading
import time
-from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
+from mgr_module import MgrModule, MgrStandbyModule, PG_STATES
from mgr_util import get_default_addr, profile_method
from rbd import RBD
+from collections import namedtuple
try:
- from typing import Optional, Dict, Any, Set
-except:
+ from typing import DefaultDict, Optional, Dict, Any, Set
+except ImportError:
pass
# Defaults for the Prometheus HTTP server. Can also set in config-key
NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
+alert_metric = namedtuple('alert_metric', 'name description')
+HEALTH_CHECKS = [
+ alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process' ),
+]
+
class Metric(object):
def __init__(self, mtype, name, desc, labels=None):
def __init__(self, module):
# type: (Module) -> None
self.mod = module
+ self.active = True
+ self.event = threading.Event()
super(MetricCollectionThread, self).__init__(target=self.collect)
def collect(self):
self.mod.log.info('starting metric collection thread')
- while True:
+ while self.active:
self.mod.log.debug('collecting cache in thread')
if self.mod.have_mon_connection():
start_time = time.time()
- data = self.mod.collect()
- duration = time.time() - start_time
+ try:
+ data = self.mod.collect()
+ except:
+ # Log any issues encountered during the data collection and continue
+ self.mod.log.exception("failed to collect metrics:")
+ self.event.wait(self.mod.scrape_interval)
+ continue
+
+ duration = time.time() - start_time
self.mod.log.debug('collecting cache in thread done')
-
+
sleep_time = self.mod.scrape_interval - duration
if sleep_time < 0:
self.mod.log.warning(
self.mod.collect_cache = data
self.mod.collect_time = duration
- time.sleep(sleep_time)
+ self.event.wait(sleep_time)
else:
self.mod.log.error('No MON connection')
- time.sleep(self.mod.scrape_interval)
+ self.event.wait(self.mod.scrape_interval)
+ def stop(self):
+ self.active = False
+ self.event.set()
class Module(MgrModule):
COMMANDS = [
} # type: Dict[str, Any]
global _global_instance
_global_instance = self
- MetricCollectionThread(_global_instance).start()
+ self.metrics_thread = MetricCollectionThread(_global_instance)
def _setup_static_metrics(self):
metrics = {}
'Number of {} objects'.format(state),
)
+ for check in HEALTH_CHECKS:
+ path = 'healthcheck_{}'.format(check.name.lower())
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ check.description,
+ )
+
return metrics
@profile_method()
def get_health(self):
+
+ def _get_value(message, delim=' ', word_pos=0):
+ """Extract value from message (default is 1st field)"""
+ v_str = message.split(delim)[word_pos]
+ if v_str.isdigit():
+ return int(v_str), 0
+ return 0, 1
+
health = json.loads(self.get('health')['json'])
+ # set overall health
self.metrics['health_status'].set(
health_status_to_number(health['status'])
)
+ # Examine the health to see if any health checks triggered need to
+ # become a metric.
+ active_healthchecks = health.get('checks', {})
+ active_names = active_healthchecks.keys()
+
+ for check in HEALTH_CHECKS:
+ path = 'healthcheck_{}'.format(check.name.lower())
+
+ if path in self.metrics:
+
+ if check.name in active_names:
+ check_data = active_healthchecks[check.name]
+ message = check_data['summary'].get('message', '')
+ v, err = 0, 0
+
+ if check.name == "SLOW_OPS":
+ # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have slow ops.
+ v, err = _get_value(message)
+
+ if err:
+ self.log.error("healthcheck {} message format is incompatible and has been dropped".format(check.name))
+ # drop the metric, so it's no longer emitted
+ del self.metrics[path]
+ continue
+ else:
+ self.metrics[path].set(v)
+ else:
+ # health check is not active, so give it a default of 0
+ self.metrics[path].set(0)
+
@profile_method()
def get_pool_stats(self):
# retrieve pool stats to provide per pool recovery metrics
all_modules = {module.get('name'):module.get('can_run') for module in mgr_map['available_modules']}
- ceph_release = None
for mgr in all_mgrs:
host_version = servers.get((mgr, 'mgr'), ('', ''))
if mgr == active:
_state = 1
- ceph_release = host_version[1].split()[-2] # e.g. nautilus
else:
_state = 0
self.metrics['mgr_status'].set(_state, (
'mgr.{}'.format(mgr),
))
- always_on_modules = mgr_map['always_on_modules'].get(ceph_release, [])
+ always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
active_modules = list(always_on_modules)
active_modules.extend(mgr_map['modules'])
pg_summary = self.get('pg_summary')
for pool in pg_summary['by_pool']:
- num_by_state = dict((state, 0) for state in PG_STATES)
- num_by_state['total'] = 0
+ num_by_state = defaultdict(int) # type: DefaultDict[str, int]
for state_name, count in pg_summary['by_pool'][pool].items():
for state in state_name.split('+'):
if service['type'] != 'mgr':
continue
id_ = service['id']
- # get port for prometheus module at mgr with id_
- # TODO use get_config_prefix or get_config here once
- # https://github.com/ceph/ceph/pull/20458 is merged
- result = CommandResult("")
- assert isinstance(_global_instance, Module)
- _global_instance.send_command(
- result, "mon", '',
- json.dumps({
- "prefix": "config-key get",
- 'key': "config/mgr/mgr/prometheus/{}/server_port".format(id_),
- }),
- "")
- r, outb, outs = result.wait()
- if r != 0:
- _global_instance.log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
- targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
- else:
- port = json.loads(outb)
- targets.append('{}:{}'.format(hostname, port))
-
+ port = self._get_module_option('server_port', DEFAULT_PORT, id_)
+ targets.append(f'{hostname}:{port}')
ret = [
{
"targets": targets,
(server_addr, server_port)
)
+ self.metrics_thread.start()
+
# Publish the URI that others may use to access the service we're
# about to start serving
self.set_uri('http://{0}:{1}/'.format(
# wait for the shutdown event
self.shutdown_event.wait()
self.shutdown_event.clear()
+ # tell metrics collection thread to stop collecting new metrics
+ self.metrics_thread.stop()
cherrypy.engine.stop()
self.log.info('Engine stopped.')
self.shutdown_rbd_stats()
+ # wait for the metrics collection thread to stop
+ self.metrics_thread.join()
def shutdown(self):
self.log.info('Stopping engine...')