import threading
import time
from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
-from mgr_util import get_default_addr
+from mgr_util import get_default_addr, profile_method
from rbd import RBD
+try:
+ from typing import Optional, Dict, Any, Set
+except:
+ pass
# Defaults for the Prometheus HTTP server. Can also set in config-key
# see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
# to access things in class Module from subclass Root. Because
# it's a dict, the writer doesn't need to declare 'global' for access
-_global_instance = {'plugin': None}
-
-
-def global_instance():
- assert _global_instance['plugin'] is not None
- return _global_instance['plugin']
+_global_instance = None # type: Optional[Module]
def health_status_to_number(status):
for labelvalues, value in self.value.items():
if self.labelnames:
- labels = zip(self.labelnames, labelvalues)
- labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
+ labels_list = zip(self.labelnames, labelvalues)
+ labels = ','.join('%s="%s"' % (k, v) for k, v in labels_list)
else:
labels = ''
if labels:
return expfmt
+class MetricCollectionThread(threading.Thread):
+ def __init__(self, module):
+ # type: (Module) -> None
+ self.mod = module
+ super(MetricCollectionThread, self).__init__(target=self.collect)
+
+ def collect(self):
+ self.mod.log.info('starting metric collection thread')
+ while True:
+ self.mod.log.debug('collecting cache in thread')
+ if self.mod.have_mon_connection():
+ start_time = time.time()
+ data = self.mod.collect()
+ duration = time.time() - start_time
+
+ self.mod.log.debug('collecting cache in thread done')
+
+ sleep_time = self.mod.scrape_interval - duration
+ if sleep_time < 0:
+ self.mod.log.warning(
+ 'Collecting data took more time than configured scrape interval. '
+ 'This possibly results in stale data. Please check the '
+ '`stale_cache_strategy` configuration option. '
+ 'Collecting data took {:.2f} seconds but scrape interval is configured '
+ 'to be {:.0f} seconds.'.format(
+ duration,
+ self.mod.scrape_interval,
+ )
+ )
+ sleep_time = 0
+
+ with self.mod.collect_lock:
+ self.mod.collect_cache = data
+ self.mod.collect_time = duration
+
+ time.sleep(sleep_time)
+ else:
+ self.mod.log.error('No MON connection')
+ time.sleep(self.mod.scrape_interval)
+
+
class Module(MgrModule):
COMMANDS = [
{
{'name': 'server_addr'},
{'name': 'server_port'},
{'name': 'scrape_interval'},
+ {'name': 'stale_cache_strategy'},
{'name': 'rbd_stats_pools'},
{'name': 'rbd_stats_pools_refresh_interval', 'type': 'int', 'default': 300},
]
+ STALE_CACHE_FAIL = 'fail'
+ STALE_CACHE_RETURN = 'return'
+
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.metrics = self._setup_static_metrics()
self.shutdown_event = threading.Event()
- self.collect_lock = threading.RLock()
- self.collect_time = 0
- self.collect_timeout = 5.0
+ self.collect_lock = threading.Lock()
+ self.collect_time = 0.0
+ self.scrape_interval = 15.0
+ self.stale_cache_strategy = self.STALE_CACHE_FAIL
self.collect_cache = None
self.rbd_stats = {
'pools': {},
'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
'desc': 'RBD image reads latency (msec)'},
},
- }
- _global_instance['plugin'] = self
+ } # type: Dict[str, Any]
+ global _global_instance
+ _global_instance = self
+ MetricCollectionThread(_global_instance).start()
def _setup_static_metrics(self):
metrics = {}
return metrics
+ @profile_method()
def get_health(self):
health = json.loads(self.get('health')['json'])
self.metrics['health_status'].set(
health_status_to_number(health['status'])
)
+ @profile_method()
def get_pool_stats(self):
# retrieve pool stats to provide per pool recovery metrics
# (osd_pool_stats moved to mgr in Mimic)
(pool['pool_id'],)
)
+ @profile_method()
def get_df(self):
# maybe get the to-be-exported metrics from a config?
df = self.get('df')
(pool['id'],)
)
+ @profile_method()
def get_fs(self):
fs_map = self.get('fs_map')
servers = self.get_service_list()
daemon['rank'], host_version[1]
))
+ @profile_method()
def get_quorum_status(self):
mon_status = json.loads(self.get('mon_status')['json'])
servers = self.get_service_list()
'mon.{}'.format(id_),
))
+ @profile_method()
def get_mgr_status(self):
mgr_map = self.get('mgr_map')
servers = self.get_service_list()
self.metrics['mgr_module_status'].set(_state, (mod_name,))
self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
+ @profile_method()
def get_pg_status(self):
pg_summary = self.get('pg_summary')
except KeyError:
self.log.warning("skipping pg in unknown state {}".format(state))
+ @profile_method()
def get_osd_stats(self):
osd_stats = self.get('osd_stats')
for osd in osd_stats['osd_stats']:
ret.update({(service['id'], service['type']): (host, version)})
return ret
+ @profile_method()
def get_metadata_and_osd_status(self):
osd_map = self.get('osd_map')
osd_flags = osd_map['flags'].split(',')
for k in RBD_MIRROR_METADATA)
)
+ @profile_method()
def get_num_objects(self):
pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
for obj in NUM_OBJECTS:
stat = 'num_objects_{}'.format(obj)
self.metrics[stat].set(pg_sum[stat])
+ @profile_method()
def get_rbd_stats(self):
# Per RBD image stats is collected by registering a dynamic osd perf
# stats query that tells OSDs to group stats for requests associated
# Parse rbd_stats_pools option, which is a comma or space separated
# list of pool[/namespace] entries. If no namespace is specifed the
- # stats are collected for every namespace in the pool.
+ # stats are collected for every namespace in the pool. The wildcard
+ # '*' can be used to indicate all pools or namespaces
pools_string = self.get_localized_module_option('rbd_stats_pools', '')
- pools = {}
- for p in [x for x in re.split('[\s,]+', pools_string) if x]:
- s = p.split('/', 2)
+ pool_keys = []
+ for x in re.split('[\s,]+', pools_string):
+ if not x:
+ continue
+
+ s = x.split('/', 2)
pool_name = s[0]
- if len(s) == 1:
+ namespace_name = None
+ if len(s) == 2:
+ namespace_name = s[1]
+
+ if pool_name == "*":
+ # collect for all pools
+ osd_map = self.get('osd_map')
+ for pool in osd_map['pools']:
+ if 'rbd' not in pool.get('application_metadata', {}):
+ continue
+ pool_keys.append((pool['pool_name'], namespace_name))
+ else:
+ pool_keys.append((pool_name, namespace_name))
+
+ pools = {} # type: Dict[str, Set[str]]
+ for pool_key in pool_keys:
+ pool_name = pool_key[0]
+ namespace_name = pool_key[1]
+ if not namespace_name or namespace_name == "*":
# empty set means collect for all namespaces
pools[pool_name] = set()
continue
+
if pool_name not in pools:
pools[pool_name] = set()
elif not pools[pool_name]:
continue
- pools[pool_name].add(s[1])
+ pools[pool_name].add(namespace_name)
rbd_stats_pools = {}
- for pool_id in list(self.rbd_stats['pools']):
+ for pool_id in self.rbd_stats['pools'].keys():
name = self.rbd_stats['pools'][pool_id]['name']
if name not in pools:
del self.rbd_stats['pools'][pool_id]
self.metrics.update(new_metrics)
+ @profile_method(True)
def collect(self):
# Clear the metrics before scraping
for k in self.metrics.keys():
# TODO use get_config_prefix or get_config here once
# https://github.com/ceph/ceph/pull/20458 is merged
result = CommandResult("")
- global_instance().send_command(
+ assert isinstance(_global_instance, Module)
+ _global_instance.send_command(
result, "mon", '',
json.dumps({
"prefix": "config-key get",
"")
r, outb, outs = result.wait()
if r != 0:
- global_instance().log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
+ _global_instance.log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
else:
port = json.loads(outb)
@cherrypy.expose
def metrics(self):
- instance = global_instance()
# Lock the function execution
- try:
- instance.collect_lock.acquire()
- return self._metrics(instance)
- finally:
- instance.collect_lock.release()
+ assert isinstance(_global_instance, Module)
+ with _global_instance.collect_lock:
+ return self._metrics(_global_instance)
@staticmethod
def _metrics(instance):
- # Return cached data if available and collected before the
- # cache times out
- if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
- cherrypy.response.headers['Content-Type'] = 'text/plain'
- return instance.collect_cache
+ # type: (Module) -> Any
+ # Return cached data if available
+ if not instance.collect_cache:
+ raise cherrypy.HTTPError(503, 'No cached data available yet')
- if instance.have_mon_connection():
- instance.collect_cache = None
- instance.collect_time = time.time()
- instance.collect_cache = instance.collect()
+ def respond():
+ assert isinstance(instance, Module)
cherrypy.response.headers['Content-Type'] = 'text/plain'
return instance.collect_cache
- else:
- raise cherrypy.HTTPError(503, 'No MON connection')
+
+ if instance.collect_time < instance.scrape_interval:
+ # Respond if cache isn't stale
+ return respond()
+
+ if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN:
+ # Respond even if cache is stale
+ instance.log.info(
+ 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
+ 'returning metrics from stale cache.'.format(
+ instance.collect_time,
+ instance.collect_time - instance.scrape_interval
+ )
+ )
+ return respond()
+
+ if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL:
+ # Fail if cache is stale
+ msg = (
+ 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
+ 'returning "service unavailable".'.format(
+ instance.collect_time,
+ instance.collect_time - instance.scrape_interval,
+ )
+ )
+ instance.log.error(msg)
+ raise cherrypy.HTTPError(503, msg)
# Make the cache timeout for collecting configurable
- self.collect_timeout = float(self.get_localized_module_option(
- 'scrape_interval', 5.0))
+ self.scrape_interval = float(self.get_localized_module_option('scrape_interval', 15.0))
+
+ self.stale_cache_strategy = self.get_localized_module_option('stale_cache_strategy', 'log')
+ if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL,
+ self.STALE_CACHE_RETURN]:
+ self.stale_cache_strategy = self.STALE_CACHE_FAIL
server_addr = self.get_localized_module_option(
'server_addr', get_default_addr())