from rbd import RBD
from collections import namedtuple
try:
- from typing import DefaultDict, Optional, Dict, Any, Set
+ from typing import DefaultDict, Optional, Dict, Any, Set, Tuple, Union, List, Callable
+ LabelValues = Tuple[str, ...]
+ Number = Union[int, float]
+ MetricValue = Dict[LabelValues, Number]
except ImportError:
pass
DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
-DF_POOL = ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
+DF_POOL = ['max_avail', 'avail_raw', 'stored', 'stored_raw', 'objects', 'dirty',
'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
'compress_bytes_used', 'compress_under_bytes']
)
return expfmt
+ def group_by(
+ self,
+ keys: List[str],
+ joins: Dict[str, Callable[[List[str]], str]],
+ name: Optional[str] = None,
+ ) -> "Metric":
+ """
+ Groups data by label names.
+
+ Label names not passed are being removed from the resulting metric but
+ by providing a join function, labels of metrics can be grouped.
+
+ The purpose of this method is to provide a version of a metric that can
+ be used in matching where otherwise multiple results would be returned.
+
+ As grouping is possible in Prometheus, the only additional value of this
+ method is the possibility to join labels when grouping. For that reason,
+ passing joins is required. Please use PromQL expressions in all other
+ cases.
+
+ >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
+ >>> m.value = {
+ ... ('foo', 'x'): 1,
+ ... ('foo', 'y'): 1,
+ ... }
+ >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
+ {('foo', 'x,y'): 1}
+
+ The functionality of group by could roughly be compared with Prometheus'
+
+ group (ceph_disk_occupation) by (device, instance)
+
+ with the exception that not all labels which aren't used as a condition
+ to group a metric are discarded, but their values can are joined and the
+ label is thereby preserved.
+
+ This function takes the value of the first entry of a found group to be
+ used for the resulting value of the grouping operation.
+
+ >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
+ >>> m.value = {
+ ... ('foo', 'x'): 555,
+ ... ('foo', 'y'): 10,
+ ... }
+ >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
+ {('foo', 'x,y'): 555}
+ """
+ assert self.labelnames, "cannot match keys without label names"
+ for key in keys:
+ assert key in self.labelnames, "unknown key: {}".format(key)
+ assert joins, "joins must not be empty"
+ assert all(callable(c) for c in joins.values()), "joins must be callable"
+
+ # group
+ grouped = defaultdict(list) # type: Dict[LabelValues, List[Tuple[Dict[str, str], Number]]]
+ for label_values, metric_value in self.value.items():
+ labels = dict(zip(self.labelnames, label_values))
+ if not all(k in labels for k in keys):
+ continue
+ group_key = tuple(labels[k] for k in keys)
+ grouped[group_key].append((labels, metric_value))
+
+ # as there is nothing specified on how to join labels that are not equal
+ # and Prometheus `group` aggregation functions similarly, we simply drop
+ # those labels.
+ labelnames = tuple(
+ label for label in self.labelnames if label in keys or label in joins
+ )
+ superfluous_labelnames = [
+ label for label in self.labelnames if label not in labelnames
+ ]
+
+ # iterate and convert groups with more than one member into a single
+ # entry
+ values = {} # type: MetricValue
+ for group in grouped.values():
+ labels, metric_value = group[0]
+
+ for label in superfluous_labelnames:
+ del labels[label]
+
+ if len(group) > 1:
+ for key, fn in joins.items():
+ labels[key] = fn(list(labels[key] for labels, _ in group))
+
+ values[tuple(labels.values())] = metric_value
+
+ new_metric = Metric(self.mtype, name if name else self.name, self.desc, labelnames)
+ new_metric.value = values
+
+ return new_metric
+
class MetricCollectionThread(threading.Thread):
def __init__(self, module):
DISK_OCCUPATION
)
+ metrics['disk_occupation_human'] = Metric(
+ 'untyped',
+ 'disk_occupation_human',
+ 'Associate Ceph daemon with disk used for displaying to humans,'
+ ' not for joining tables (vector matching)',
+ DISK_OCCUPATION, # label names are automatically decimated on grouping
+ )
+
metrics['pool_metadata'] = Metric(
'untyped',
'pool_metadata',
self.log.info("Missing dev node metadata for osd {0}, skipping "
"occupation record for this osd".format(id_))
+ if 'disk_occupation' in self.metrics:
+ try:
+ self.metrics['disk_occupation_human'] = \
+ self.metrics['disk_occupation'].group_by(
+ ['device', 'instance'],
+ {'ceph_daemon': lambda daemons: ', '.join(daemons)},
+ name='disk_occupation_human',
+ )
+ except Exception as e:
+ self.log.error(e)
+
for pool in osd_map['pools']:
self.metrics['pool_metadata'].set(
1, (pool['pool'], pool['pool_name']))