3 from collections
import defaultdict
4 from pkg_resources
import packaging
# type: ignore
12 from collections
import namedtuple
14 from mgr_module
import CLIReadCommand
, MgrModule
, MgrStandbyModule
, PG_STATES
, Option
, ServiceInfoT
, HandleCommandResult
, CLIWriteCommand
15 from mgr_util
import get_default_addr
, profile_method
, build_url
18 from typing
import DefaultDict
, Optional
, Dict
, Any
, Set
, cast
, Tuple
, Union
, List
, Callable
20 LabelValues
= Tuple
[str, ...]
21 Number
= Union
[int, float]
22 MetricValue
= Dict
[LabelValues
, Number
]
24 # Defaults for the Prometheus HTTP server. Can also set in config-key
25 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
26 # for Prometheus exporter port registry
30 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
31 # that the ports its listening on are in fact bound. When using the any address
32 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
33 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
35 if cherrypy
is not None:
36 Version
= packaging
.version
.Version
37 v
= Version(cherrypy
.__version
__)
38 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
39 # centos:7) and back to at least 3.0.0.
40 if Version("3.1.2") <= v
< Version("3.2.3"):
41 # https://github.com/cherrypy/cherrypy/issues/1100
42 from cherrypy
.process
import servers
43 servers
.wait_for_occupied_port
= lambda host
, port
: None
46 # cherrypy likes to sys.exit on error. don't let it take us down too!
47 def os_exit_noop(status
: int) -> None:
51 os
._exit
= os_exit_noop
# type: ignore
53 # to access things in class Module from subclass Root. Because
54 # it's a dict, the writer doesn't need to declare 'global' for access
56 _global_instance
= None # type: Optional[Module]
57 cherrypy
.config
.update({
58 'response.headers.server': 'Ceph-Prometheus'
62 def health_status_to_number(status
: str) -> int:
63 if status
== 'HEALTH_OK':
65 elif status
== 'HEALTH_WARN':
67 elif status
== 'HEALTH_ERR':
69 raise ValueError(f
'unknown status "{status}"')
72 DF_CLUSTER
= ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
74 OSD_BLOCKLIST
= ['osd_blocklist_count']
76 DF_POOL
= ['max_avail', 'avail_raw', 'stored', 'stored_raw', 'objects', 'dirty',
77 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
78 'compress_bytes_used', 'compress_under_bytes', 'bytes_used', 'percent_used']
80 OSD_POOL_STATS
= ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
81 'recovering_keys_per_sec', 'num_objects_recovered',
82 'num_bytes_recovered', 'num_bytes_recovered')
84 OSD_FLAGS
= ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
85 'norecover', 'noscrub', 'nodeep-scrub')
87 FS_METADATA
= ('data_pools', 'fs_id', 'metadata_pool', 'name')
89 MDS_METADATA
= ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
92 MON_METADATA
= ('ceph_daemon', 'hostname',
93 'public_addr', 'rank', 'ceph_version')
95 MGR_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version')
97 MGR_STATUS
= ('ceph_daemon',)
99 MGR_MODULE_STATUS
= ('name',)
101 MGR_MODULE_CAN_RUN
= ('name',)
103 OSD_METADATA
= ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
104 'front_iface', 'hostname', 'objectstore', 'public_addr',
107 OSD_STATUS
= ['weight', 'up', 'in']
109 OSD_STATS
= ['apply_latency_ms', 'commit_latency_ms']
111 POOL_METADATA
= ('pool_id', 'name', 'type', 'description', 'compression_mode')
113 RGW_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version', 'instance_id')
115 RBD_MIRROR_METADATA
= ('ceph_daemon', 'id', 'instance_id', 'hostname',
118 DISK_OCCUPATION
= ('ceph_daemon', 'device', 'db_device',
119 'wal_device', 'instance', 'devices', 'device_ids')
121 NUM_OBJECTS
= ['degraded', 'misplaced', 'unfound']
123 alert_metric
= namedtuple('alert_metric', 'name description')
125 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process'),
128 HEALTHCHECK_DETAIL
= ('name', 'severity')
131 class Severity(enum
.Enum
):
137 class Format(enum
.Enum
):
140 json_pretty
= 'json-pretty'
144 class HealthCheckEvent
:
146 def __init__(self
, name
: str, severity
: Severity
, first_seen
: float, last_seen
: float, count
: int, active
: bool = True):
148 self
.severity
= severity
149 self
.first_seen
= first_seen
150 self
.last_seen
= last_seen
154 def as_dict(self
) -> Dict
[str, Any
]:
155 """Return the instance as a dictionary."""
160 kv_name
= 'health_history'
161 titles
= "{healthcheck_name:<24} {first_seen:<20} {last_seen:<20} {count:>5} {active:^6}"
162 date_format
= "%Y/%m/%d %H:%M:%S"
164 def __init__(self
, mgr
: MgrModule
):
166 self
.lock
= threading
.Lock()
167 self
.healthcheck
: Dict
[str, HealthCheckEvent
] = {}
170 def _load(self
) -> None:
171 """Load the current state from the mons KV store."""
172 data
= self
.mgr
.get_store(self
.kv_name
)
175 healthcheck_data
= json
.loads(data
)
176 except json
.JSONDecodeError
:
178 f
"INVALID data read from mgr/prometheus/{self.kv_name}. Resetting")
182 for k
, v
in healthcheck_data
.items():
183 self
.healthcheck
[k
] = HealthCheckEvent(
185 severity
=v
.get('severity'),
186 first_seen
=v
.get('first_seen', 0),
187 last_seen
=v
.get('last_seen', 0),
188 count
=v
.get('count', 1),
189 active
=v
.get('active', True))
193 def reset(self
) -> None:
194 """Reset the healthcheck history."""
196 self
.mgr
.set_store(self
.kv_name
, "{}")
197 self
.healthcheck
= {}
199 def save(self
) -> None:
200 """Save the current in-memory healthcheck history to the KV store."""
202 self
.mgr
.set_store(self
.kv_name
, self
.as_json())
204 def check(self
, health_checks
: Dict
[str, Any
]) -> None:
205 """Look at the current health checks and compare existing the history.
208 health_checks (Dict[str, Any]): current health check data
211 current_checks
= health_checks
.get('checks', {})
214 # first turn off any active states we're tracking
215 for seen_check
in self
.healthcheck
:
216 check
= self
.healthcheck
[seen_check
]
217 if check
.active
and seen_check
not in current_checks
:
221 # now look for any additions to track
223 for name
, info
in current_checks
.items():
224 if name
not in self
.healthcheck
:
225 # this healthcheck is new, so start tracking it
227 self
.healthcheck
[name
] = HealthCheckEvent(
229 severity
=info
.get('severity'),
236 # seen it before, so update its metadata
237 check
= self
.healthcheck
[name
]
239 # check has been registered as active already, so skip
242 check
.last_seen
= now
250 def __str__(self
) -> str:
251 """Print the healthcheck history.
254 str: Human readable representation of the healthcheck history
258 if len(self
.healthcheck
.keys()) == 0:
259 out
.append("No healthchecks have been recorded")
261 out
.append(self
.titles
.format(
262 healthcheck_name
="Healthcheck Name",
263 first_seen
="First Seen (UTC)",
264 last_seen
="Last seen (UTC)",
268 for k
in sorted(self
.healthcheck
.keys()):
269 check
= self
.healthcheck
[k
]
270 out
.append(self
.titles
.format(
271 healthcheck_name
=check
.name
,
272 first_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.first_seen
)),
273 last_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.last_seen
)),
275 active
="Yes" if check
.active
else "No")
277 out
.extend([f
"{len(self.healthcheck)} health check(s) listed", ""])
279 return "\n".join(out
)
281 def as_dict(self
) -> Dict
[str, Any
]:
282 """Return the history in a dictionary.
285 Dict[str, Any]: dictionary indexed by the healthcheck name
287 return {name
: self
.healthcheck
[name
].as_dict() for name
in self
.healthcheck
}
289 def as_json(self
, pretty
: bool = False) -> str:
290 """Return the healthcheck history object as a dict (JSON).
293 pretty (bool, optional): whether to json pretty print the history. Defaults to False.
296 str: str representation of the healthcheck in JSON format
299 return json
.dumps(self
.as_dict(), indent
=2)
301 return json
.dumps(self
.as_dict())
303 def as_yaml(self
) -> str:
304 """Return the healthcheck history in yaml format.
307 str: YAML representation of the healthcheck history
309 return yaml
.safe_dump(self
.as_dict(), explicit_start
=True, default_flow_style
=False)
312 class Metric(object):
313 def __init__(self
, mtype
: str, name
: str, desc
: str, labels
: Optional
[LabelValues
] = None) -> None:
317 self
.labelnames
= labels
# tuple if present
318 self
.value
: Dict
[LabelValues
, Number
] = {}
320 def clear(self
) -> None:
323 def set(self
, value
: Number
, labelvalues
: Optional
[LabelValues
] = None) -> None:
324 # labelvalues must be a tuple
325 labelvalues
= labelvalues
or ('',)
326 self
.value
[labelvalues
] = value
328 def str_expfmt(self
) -> str:
330 # Must be kept in sync with promethize() in src/exporter/util.cc
331 def promethize(path
: str) -> str:
332 ''' replace illegal metric name characters '''
333 result
= re
.sub(r
'[./\s]|::', '_', path
).replace('+', '_plus')
335 # Hyphens usually turn into underscores, unless they are
337 if result
.endswith("-"):
338 result
= result
[0:-1] + "_minus"
340 result
= result
.replace("-", "_")
342 return "ceph_{0}".format(result
)
344 def floatstr(value
: float) -> str:
345 ''' represent as Go-compatible float '''
346 if value
== float('inf'):
348 if value
== float('-inf'):
350 if math
.isnan(value
):
352 return repr(float(value
))
354 name
= promethize(self
.name
)
357 # TYPE {name} {mtype}'''.format(
363 for labelvalues
, value
in self
.value
.items():
365 labels_list
= zip(self
.labelnames
, labelvalues
)
366 labels
= ','.join('%s="%s"' % (k
, v
) for k
, v
in labels_list
)
370 fmtstr
= '\n{name}{{{labels}}} {value}'
372 fmtstr
= '\n{name} {value}'
373 expfmt
+= fmtstr
.format(
376 value
=floatstr(value
),
383 joins
: Dict
[str, Callable
[[List
[str]], str]],
384 name
: Optional
[str] = None,
387 Groups data by label names.
389 Label names not passed are being removed from the resulting metric but
390 by providing a join function, labels of metrics can be grouped.
392 The purpose of this method is to provide a version of a metric that can
393 be used in matching where otherwise multiple results would be returned.
395 As grouping is possible in Prometheus, the only additional value of this
396 method is the possibility to join labels when grouping. For that reason,
397 passing joins is required. Please use PromQL expressions in all other
400 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
405 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
408 The functionality of group by could roughly be compared with Prometheus'
410 group (ceph_disk_occupation) by (device, instance)
412 with the exception that not all labels which aren't used as a condition
413 to group a metric are discarded, but their values can are joined and the
414 label is thereby preserved.
416 This function takes the value of the first entry of a found group to be
417 used for the resulting value of the grouping operation.
419 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
421 ... ('foo', 'x'): 555,
422 ... ('foo', 'y'): 10,
424 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
425 {('foo', 'x,y'): 555}
427 assert self
.labelnames
, "cannot match keys without label names"
429 assert key
in self
.labelnames
, "unknown key: {}".format(key
)
430 assert joins
, "joins must not be empty"
431 assert all(callable(c
) for c
in joins
.values()), "joins must be callable"
434 grouped
: Dict
[LabelValues
, List
[Tuple
[Dict
[str, str], Number
]]] = defaultdict(list)
435 for label_values
, metric_value
in self
.value
.items():
436 labels
= dict(zip(self
.labelnames
, label_values
))
437 if not all(k
in labels
for k
in keys
):
439 group_key
= tuple(labels
[k
] for k
in keys
)
440 grouped
[group_key
].append((labels
, metric_value
))
442 # as there is nothing specified on how to join labels that are not equal
443 # and Prometheus `group` aggregation functions similarly, we simply drop
446 label
for label
in self
.labelnames
if label
in keys
or label
in joins
448 superfluous_labelnames
= [
449 label
for label
in self
.labelnames
if label
not in labelnames
452 # iterate and convert groups with more than one member into a single
454 values
: MetricValue
= {}
455 for group
in grouped
.values():
456 labels
, metric_value
= group
[0]
458 for label
in superfluous_labelnames
:
462 for key
, fn
in joins
.items():
463 labels
[key
] = fn(list(labels
[key
] for labels
, _
in group
))
465 values
[tuple(labels
.values())] = metric_value
467 new_metric
= Metric(self
.mtype
, name
if name
else self
.name
, self
.desc
, labelnames
)
468 new_metric
.value
= values
473 class MetricCounter(Metric
):
477 labels
: Optional
[LabelValues
] = None) -> None:
478 super(MetricCounter
, self
).__init
__('counter', name
, desc
, labels
)
479 self
.value
= defaultdict(lambda: 0)
481 def clear(self
) -> None:
482 pass # Skip calls to clear as we want to keep the counters here.
486 labelvalues
: Optional
[LabelValues
] = None) -> None:
487 msg
= 'This method must not be used for instances of MetricCounter class'
488 raise NotImplementedError(msg
)
492 labelvalues
: Optional
[LabelValues
] = None) -> None:
493 # labelvalues must be a tuple
494 labelvalues
= labelvalues
or ('',)
495 self
.value
[labelvalues
] += value
498 class MetricCollectionThread(threading
.Thread
):
499 def __init__(self
, module
: 'Module') -> None:
502 self
.event
= threading
.Event()
503 super(MetricCollectionThread
, self
).__init
__(target
=self
.collect
)
505 def collect(self
) -> None:
506 self
.mod
.log
.info('starting metric collection thread')
508 self
.mod
.log
.debug('collecting cache in thread')
509 if self
.mod
.have_mon_connection():
510 start_time
= time
.time()
513 data
= self
.mod
.collect()
515 # Log any issues encountered during the data collection and continue
516 self
.mod
.log
.exception("failed to collect metrics:")
517 self
.event
.wait(self
.mod
.scrape_interval
)
520 duration
= time
.time() - start_time
521 self
.mod
.log
.debug('collecting cache in thread done')
523 sleep_time
= self
.mod
.scrape_interval
- duration
525 self
.mod
.log
.warning(
526 'Collecting data took more time than configured scrape interval. '
527 'This possibly results in stale data. Please check the '
528 '`stale_cache_strategy` configuration option. '
529 'Collecting data took {:.2f} seconds but scrape interval is configured '
530 'to be {:.0f} seconds.'.format(
532 self
.mod
.scrape_interval
,
537 with self
.mod
.collect_lock
:
538 self
.mod
.collect_cache
= data
539 self
.mod
.collect_time
= duration
541 self
.event
.wait(sleep_time
)
543 self
.mod
.log
.error('No MON connection')
544 self
.event
.wait(self
.mod
.scrape_interval
)
546 def stop(self
) -> None:
551 class Module(MgrModule
):
555 default
=get_default_addr(),
556 desc
='the IPv4 or IPv6 address on which the module listens for HTTP requests',
561 default
=DEFAULT_PORT
,
562 desc
='the port on which the module listens for HTTP requests',
571 'stale_cache_strategy',
584 name
='rbd_stats_pools_refresh_interval',
589 name
='standby_behaviour',
592 enum_allowed
=['default', 'error'],
596 name
='standby_error_status_code',
605 STALE_CACHE_FAIL
= 'fail'
606 STALE_CACHE_RETURN
= 'return'
608 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
609 super(Module
, self
).__init
__(*args
, **kwargs
)
610 self
.metrics
= self
._setup
_static
_metrics
()
611 self
.shutdown_event
= threading
.Event()
612 self
.collect_lock
= threading
.Lock()
613 self
.collect_time
= 0.0
614 self
.scrape_interval
: float = 15.0
616 self
.stale_cache_strategy
: str = self
.STALE_CACHE_FAIL
617 self
.collect_cache
: Optional
[str] = None
620 'pools_refresh_time': 0,
622 'write_ops': {'type': self
.PERFCOUNTER_COUNTER
,
623 'desc': 'RBD image writes count'},
624 'read_ops': {'type': self
.PERFCOUNTER_COUNTER
,
625 'desc': 'RBD image reads count'},
626 'write_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
627 'desc': 'RBD image bytes written'},
628 'read_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
629 'desc': 'RBD image bytes read'},
630 'write_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
631 'desc': 'RBD image writes latency (msec)'},
632 'read_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
633 'desc': 'RBD image reads latency (msec)'},
635 } # type: Dict[str, Any]
636 global _global_instance
637 _global_instance
= self
638 self
.metrics_thread
= MetricCollectionThread(_global_instance
)
639 self
.health_history
= HealthHistory(self
)
641 def _setup_static_metrics(self
) -> Dict
[str, Metric
]:
643 metrics
['health_status'] = Metric(
646 'Cluster health status'
648 metrics
['mon_quorum_status'] = Metric(
651 'Monitors in quorum',
654 metrics
['fs_metadata'] = Metric(
660 metrics
['mds_metadata'] = Metric(
666 metrics
['mon_metadata'] = Metric(
672 metrics
['mgr_metadata'] = Metric(
678 metrics
['mgr_status'] = Metric(
681 'MGR status (0=standby, 1=active)',
684 metrics
['mgr_module_status'] = Metric(
687 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
690 metrics
['mgr_module_can_run'] = Metric(
692 'mgr_module_can_run',
693 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
696 metrics
['osd_metadata'] = Metric(
703 # The reason for having this separate to OSD_METADATA is
704 # so that we can stably use the same tag names that
705 # the Prometheus node_exporter does
706 metrics
['disk_occupation'] = Metric(
709 'Associate Ceph daemon with disk used',
713 metrics
['disk_occupation_human'] = Metric(
715 'disk_occupation_human',
716 'Associate Ceph daemon with disk used for displaying to humans,'
717 ' not for joining tables (vector matching)',
718 DISK_OCCUPATION
, # label names are automatically decimated on grouping
721 metrics
['pool_metadata'] = Metric(
728 metrics
['rgw_metadata'] = Metric(
735 metrics
['rbd_mirror_metadata'] = Metric(
737 'rbd_mirror_metadata',
738 'RBD Mirror Metadata',
742 metrics
['pg_total'] = Metric(
745 'PG Total Count per Pool',
749 metrics
['health_detail'] = Metric(
752 'healthcheck status by type (0=inactive, 1=active)',
756 metrics
['pool_objects_repaired'] = Metric(
758 'pool_objects_repaired',
759 'Number of objects repaired in a pool',
763 metrics
['daemon_health_metrics'] = Metric(
765 'daemon_health_metrics',
766 'Health metrics for Ceph daemons',
767 ('type', 'ceph_daemon',)
770 for flag
in OSD_FLAGS
:
771 path
= 'osd_flag_{}'.format(flag
)
772 metrics
[path
] = Metric(
775 'OSD Flag {}'.format(flag
)
777 for state
in OSD_STATUS
:
778 path
= 'osd_{}'.format(state
)
779 metrics
[path
] = Metric(
782 'OSD status {}'.format(state
),
785 for stat
in OSD_STATS
:
786 path
= 'osd_{}'.format(stat
)
787 metrics
[path
] = Metric(
790 'OSD stat {}'.format(stat
),
793 for stat
in OSD_POOL_STATS
:
794 path
= 'pool_{}'.format(stat
)
795 metrics
[path
] = Metric(
798 "OSD pool stats: {}".format(stat
),
801 for state
in PG_STATES
:
802 path
= 'pg_{}'.format(state
)
803 metrics
[path
] = Metric(
806 'PG {} per pool'.format(state
),
809 for state
in DF_CLUSTER
:
810 path
= 'cluster_{}'.format(state
)
811 metrics
[path
] = Metric(
814 'DF {}'.format(state
),
816 path
= 'cluster_by_class_{}'.format(state
)
817 metrics
[path
] = Metric(
820 'DF {}'.format(state
),
823 for state
in DF_POOL
:
824 path
= 'pool_{}'.format(state
)
825 metrics
[path
] = Metric(
826 'counter' if state
in ('rd', 'rd_bytes', 'wr', 'wr_bytes') else 'gauge',
828 'DF pool {}'.format(state
),
831 for state
in OSD_BLOCKLIST
:
832 path
= 'cluster_{}'.format(state
)
833 metrics
[path
] = Metric(
836 'OSD Blocklist Count {}'.format(state
),
838 for state
in NUM_OBJECTS
:
839 path
= 'num_objects_{}'.format(state
)
840 metrics
[path
] = Metric(
843 'Number of {} objects'.format(state
),
846 for check
in HEALTH_CHECKS
:
847 path
= 'healthcheck_{}'.format(check
.name
.lower())
848 metrics
[path
] = Metric(
856 def get_server_addr(self
) -> str:
858 Return the current mgr server IP.
860 server_addr
= cast(str, self
.get_localized_module_option('server_addr', get_default_addr()))
861 if server_addr
in ['::', '0.0.0.0']:
862 return self
.get_mgr_ip()
865 def config_notify(self
) -> None:
867 This method is called whenever one of our config options is changed.
869 # https://stackoverflow.com/questions/7254845/change-cherrypy-port-and-restart-web-server
870 # if we omit the line: cherrypy.server.httpserver = None
871 # then the cherrypy server is not restarted correctly
872 self
.log
.info('Restarting engine...')
873 cherrypy
.engine
.stop()
874 cherrypy
.server
.httpserver
= None
875 server_addr
= cast(str, self
.get_localized_module_option('server_addr', get_default_addr()))
876 server_port
= cast(int, self
.get_localized_module_option('server_port', DEFAULT_PORT
))
877 self
.configure(server_addr
, server_port
)
878 cherrypy
.engine
.start()
879 self
.log
.info('Engine started.')
882 def get_health(self
) -> None:
884 def _get_value(message
: str, delim
: str = ' ', word_pos
: int = 0) -> Tuple
[int, int]:
885 """Extract value from message (default is 1st field)"""
886 v_str
= message
.split(delim
)[word_pos
]
891 health
= json
.loads(self
.get('health')['json'])
893 self
.metrics
['health_status'].set(
894 health_status_to_number(health
['status'])
897 # Examine the health to see if any health checks triggered need to
898 # become a specific metric with a value from the health detail
899 active_healthchecks
= health
.get('checks', {})
900 active_names
= active_healthchecks
.keys()
902 for check
in HEALTH_CHECKS
:
903 path
= 'healthcheck_{}'.format(check
.name
.lower())
905 if path
in self
.metrics
:
907 if check
.name
in active_names
:
908 check_data
= active_healthchecks
[check
.name
]
909 message
= check_data
['summary'].get('message', '')
912 if check
.name
== "SLOW_OPS":
913 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have
915 v
, err
= _get_value(message
)
919 "healthcheck %s message format is incompatible and has been dropped",
921 # drop the metric, so it's no longer emitted
922 del self
.metrics
[path
]
925 self
.metrics
[path
].set(v
)
927 # health check is not active, so give it a default of 0
928 self
.metrics
[path
].set(0)
930 self
.health_history
.check(health
)
931 for name
, info
in self
.health_history
.healthcheck
.items():
932 v
= 1 if info
.active
else 0
933 self
.metrics
['health_detail'].set(
940 def get_pool_stats(self
) -> None:
941 # retrieve pool stats to provide per pool recovery metrics
942 # (osd_pool_stats moved to mgr in Mimic)
943 pstats
= self
.get('osd_pool_stats')
944 for pool
in pstats
['pool_stats']:
945 for stat
in OSD_POOL_STATS
:
946 self
.metrics
['pool_{}'.format(stat
)].set(
947 pool
['recovery_rate'].get(stat
, 0),
952 def get_df(self
) -> None:
953 # maybe get the to-be-exported metrics from a config?
955 for stat
in DF_CLUSTER
:
956 self
.metrics
['cluster_{}'.format(stat
)].set(df
['stats'][stat
])
957 for device_class
in df
['stats_by_class']:
958 self
.metrics
['cluster_by_class_{}'.format(stat
)].set(
959 df
['stats_by_class'][device_class
][stat
], (device_class
,))
961 for pool
in df
['pools']:
963 self
.metrics
['pool_{}'.format(stat
)].set(
969 def get_osd_blocklisted_entries(self
) -> None:
970 r
= self
.mon_command({
971 'prefix': 'osd blocklist ls',
974 blocklist_entries
= r
[2].split(' ')
975 blocklist_count
= blocklist_entries
[1]
976 for stat
in OSD_BLOCKLIST
:
977 self
.metrics
['cluster_{}'.format(stat
)].set(int(blocklist_count
))
980 def get_fs(self
) -> None:
981 fs_map
= self
.get('fs_map')
982 servers
= self
.get_service_list()
983 self
.log
.debug('standbys: {}'.format(fs_map
['standbys']))
984 # export standby mds metadata, default standby fs_id is '-1'
985 for standby
in fs_map
['standbys']:
986 id_
= standby
['name']
987 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
988 addr
, rank
= standby
['addr'], standby
['rank']
989 self
.metrics
['mds_metadata'].set(1, (
990 'mds.{}'.format(id_
), '-1',
996 for fs
in fs_map
['filesystems']:
997 # collect fs metadata
998 data_pools
= ",".join([str(pool
)
999 for pool
in fs
['mdsmap']['data_pools']])
1000 self
.metrics
['fs_metadata'].set(1, (
1003 fs
['mdsmap']['metadata_pool'],
1004 fs
['mdsmap']['fs_name']
1006 self
.log
.debug('mdsmap: {}'.format(fs
['mdsmap']))
1007 for gid
, daemon
in fs
['mdsmap']['info'].items():
1008 id_
= daemon
['name']
1009 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
1010 self
.metrics
['mds_metadata'].set(1, (
1011 'mds.{}'.format(id_
), fs
['id'],
1012 host
, daemon
['addr'],
1013 daemon
['rank'], version
1017 def get_quorum_status(self
) -> None:
1018 mon_status
= json
.loads(self
.get('mon_status')['json'])
1019 servers
= self
.get_service_list()
1020 for mon
in mon_status
['monmap']['mons']:
1023 mon_version
= servers
.get((id_
, 'mon'), ('', '', ''))
1024 self
.metrics
['mon_metadata'].set(1, (
1025 'mon.{}'.format(id_
), mon_version
[0],
1026 mon
['public_addr'].rsplit(':', 1)[0], rank
,
1029 in_quorum
= int(rank
in mon_status
['quorum'])
1030 self
.metrics
['mon_quorum_status'].set(in_quorum
, (
1031 'mon.{}'.format(id_
),
1035 def get_mgr_status(self
) -> None:
1036 mgr_map
= self
.get('mgr_map')
1037 servers
= self
.get_service_list()
1039 active
= mgr_map
['active_name']
1040 standbys
= [s
.get('name') for s
in mgr_map
['standbys']]
1042 all_mgrs
= list(standbys
)
1043 all_mgrs
.append(active
)
1045 all_modules
= {module
.get('name'): module
.get('can_run')
1046 for module
in mgr_map
['available_modules']}
1048 for mgr
in all_mgrs
:
1049 host
, version
, _
= servers
.get((mgr
, 'mgr'), ('', '', ''))
1055 self
.metrics
['mgr_metadata'].set(1, (
1056 f
'mgr.{mgr}', host
, version
1058 self
.metrics
['mgr_status'].set(_state
, (
1060 always_on_modules
= mgr_map
['always_on_modules'].get(self
.release_name
, [])
1061 active_modules
= list(always_on_modules
)
1062 active_modules
.extend(mgr_map
['modules'])
1064 for mod_name
in all_modules
.keys():
1066 if mod_name
in always_on_modules
:
1068 elif mod_name
in active_modules
:
1073 _can_run
= 1 if all_modules
[mod_name
] else 0
1074 self
.metrics
['mgr_module_status'].set(_state
, (mod_name
,))
1075 self
.metrics
['mgr_module_can_run'].set(_can_run
, (mod_name
,))
1078 def get_pg_status(self
) -> None:
1080 pg_summary
= self
.get('pg_summary')
1082 for pool
in pg_summary
['by_pool']:
1083 num_by_state
: DefaultDict
[str, int] = defaultdict(int)
1084 for state
in PG_STATES
:
1085 num_by_state
[state
] = 0
1087 for state_name
, count
in pg_summary
['by_pool'][pool
].items():
1088 for state
in state_name
.split('+'):
1089 num_by_state
[state
] += count
1090 num_by_state
['total'] += count
1092 for state
, num
in num_by_state
.items():
1094 self
.metrics
["pg_{}".format(state
)].set(num
, (pool
,))
1096 self
.log
.warning("skipping pg in unknown state {}".format(state
))
1099 def get_osd_stats(self
) -> None:
1100 osd_stats
= self
.get('osd_stats')
1101 for osd
in osd_stats
['osd_stats']:
1103 for stat
in OSD_STATS
:
1104 val
= osd
['perf_stat'][stat
]
1105 self
.metrics
['osd_{}'.format(stat
)].set(val
, (
1106 'osd.{}'.format(id_
),
1109 def get_service_list(self
) -> Dict
[Tuple
[str, str], Tuple
[str, str, str]]:
1111 for server
in self
.list_servers():
1112 host
= cast(str, server
.get('hostname', ''))
1113 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1114 ret
.update({(service
['id'], service
['type']): (host
,
1115 service
.get('ceph_version', 'unknown'),
1116 service
.get('name', ''))})
1120 def get_metadata_and_osd_status(self
) -> None:
1121 osd_map
= self
.get('osd_map')
1122 osd_flags
= osd_map
['flags'].split(',')
1123 for flag
in OSD_FLAGS
:
1124 self
.metrics
['osd_flag_{}'.format(flag
)].set(
1125 int(flag
in osd_flags
)
1128 osd_devices
= self
.get('osd_map_crush')['devices']
1129 servers
= self
.get_service_list()
1130 for osd
in osd_map
['osds']:
1131 # id can be used to link osd metrics and metadata
1133 # collect osd metadata
1134 p_addr
= osd
['public_addr'].rsplit(':', 1)[0]
1135 c_addr
= osd
['cluster_addr'].rsplit(':', 1)[0]
1136 if p_addr
== "-" or c_addr
== "-":
1138 "Missing address metadata for osd {0}, skipping occupation"
1139 " and metadata records for this osd".format(id_
)
1144 for osd_device
in osd_devices
:
1145 if osd_device
['id'] == id_
:
1146 dev_class
= osd_device
.get('class', '')
1149 if dev_class
is None:
1150 self
.log
.info("OSD {0} is missing from CRUSH map, "
1151 "skipping output".format(id_
))
1154 osd_version
= servers
.get((str(id_
), 'osd'), ('', '', ''))
1156 # collect disk occupation metadata
1157 osd_metadata
= self
.get_metadata("osd", str(id_
))
1158 if osd_metadata
is None:
1161 obj_store
= osd_metadata
.get('osd_objectstore', '')
1162 f_iface
= osd_metadata
.get('front_iface', '')
1163 b_iface
= osd_metadata
.get('back_iface', '')
1165 self
.metrics
['osd_metadata'].set(1, (
1167 'osd.{}'.format(id_
),
1177 # collect osd status
1178 for state
in OSD_STATUS
:
1180 self
.metrics
['osd_{}'.format(state
)].set(status
, (
1181 'osd.{}'.format(id_
),
1185 osd_wal_dev_node
= ''
1186 osd_db_dev_node
= ''
1187 if obj_store
== "filestore":
1188 # collect filestore backend device
1189 osd_dev_node
= osd_metadata
.get(
1190 'backend_filestore_dev_node', None)
1191 # collect filestore journal device
1192 osd_wal_dev_node
= osd_metadata
.get('osd_journal', '')
1193 osd_db_dev_node
= ''
1194 elif obj_store
== "bluestore":
1195 # collect bluestore backend device
1196 osd_dev_node
= osd_metadata
.get(
1197 'bluestore_bdev_dev_node', None)
1198 # collect bluestore wal backend
1199 osd_wal_dev_node
= osd_metadata
.get('bluefs_wal_dev_node', '')
1200 # collect bluestore db backend
1201 osd_db_dev_node
= osd_metadata
.get('bluefs_db_dev_node', '')
1202 if osd_dev_node
and osd_dev_node
== "unknown":
1205 # fetch the devices and ids (vendor, model, serial) from the
1207 osd_devs
= osd_metadata
.get('devices', '') or 'N/A'
1208 osd_dev_ids
= osd_metadata
.get('device_ids', '') or 'N/A'
1210 osd_hostname
= osd_metadata
.get('hostname', None)
1211 if osd_dev_node
and osd_hostname
:
1212 self
.log
.debug("Got dev for osd {0}: {1}/{2}".format(
1213 id_
, osd_hostname
, osd_dev_node
))
1214 self
.metrics
['disk_occupation'].set(1, (
1215 "osd.{0}".format(id_
),
1224 self
.log
.info("Missing dev node metadata for osd {0}, skipping "
1225 "occupation record for this osd".format(id_
))
1227 if 'disk_occupation' in self
.metrics
:
1229 self
.metrics
['disk_occupation_human'] = \
1230 self
.metrics
['disk_occupation'].group_by(
1231 ['device', 'instance'],
1232 {'ceph_daemon': lambda daemons
: ', '.join(daemons
)},
1233 name
='disk_occupation_human',
1235 except Exception as e
:
1238 ec_profiles
= osd_map
.get('erasure_code_profiles', {})
1240 def _get_pool_info(pool
: Dict
[str, Any
]) -> Tuple
[str, str]:
1241 pool_type
= 'unknown'
1242 description
= 'unknown'
1244 if pool
['type'] == 1:
1245 pool_type
= "replicated"
1246 description
= f
"replica:{pool['size']}"
1247 elif pool
['type'] == 3:
1248 pool_type
= "erasure"
1249 name
= pool
.get('erasure_code_profile', '')
1250 profile
= ec_profiles
.get(name
, {})
1252 description
= f
"ec:{profile['k']}+{profile['m']}"
1254 description
= "ec:unknown"
1256 return pool_type
, description
1258 for pool
in osd_map
['pools']:
1260 compression_mode
= 'none'
1261 pool_type
, pool_description
= _get_pool_info(pool
)
1263 if 'options' in pool
:
1264 compression_mode
= pool
['options'].get('compression_mode', 'none')
1266 self
.metrics
['pool_metadata'].set(
1275 # Populate other servers metadata
1276 for key
, value
in servers
.items():
1277 service_id
, service_type
= key
1278 if service_type
== 'rgw':
1279 hostname
, version
, name
= value
1280 self
.metrics
['rgw_metadata'].set(
1282 ('{}.{}'.format(service_type
, name
),
1283 hostname
, version
, service_id
)
1285 elif service_type
== 'rbd-mirror':
1286 mirror_metadata
= self
.get_metadata('rbd-mirror', service_id
)
1287 if mirror_metadata
is None:
1289 mirror_metadata
['ceph_daemon'] = '{}.{}'.format(service_type
,
1291 rbd_mirror_metadata
= cast(LabelValues
,
1292 (mirror_metadata
.get(k
, '')
1293 for k
in RBD_MIRROR_METADATA
))
1294 self
.metrics
['rbd_mirror_metadata'].set(
1295 1, rbd_mirror_metadata
1299 def get_num_objects(self
) -> None:
1300 pg_sum
= self
.get('pg_summary')['pg_stats_sum']['stat_sum']
1301 for obj
in NUM_OBJECTS
:
1302 stat
= 'num_objects_{}'.format(obj
)
1303 self
.metrics
[stat
].set(pg_sum
[stat
])
1306 def get_rbd_stats(self
) -> None:
1307 # Per RBD image stats is collected by registering a dynamic osd perf
1308 # stats query that tells OSDs to group stats for requests associated
1309 # with RBD objects by pool, namespace, and image id, which are
1310 # extracted from the request object names or other attributes.
1311 # The RBD object names have the following prefixes:
1312 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
1313 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
1314 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
1315 # The pool_id in the object name is the id of the pool with the image
1316 # metdata, and should be used in the image spec. If there is no pool_id
1317 # in the object name, the image pool is the pool where the object is
1320 # Parse rbd_stats_pools option, which is a comma or space separated
1321 # list of pool[/namespace] entries. If no namespace is specifed the
1322 # stats are collected for every namespace in the pool. The wildcard
1323 # '*' can be used to indicate all pools or namespaces
1324 pools_string
= cast(str, self
.get_localized_module_option('rbd_stats_pools'))
1326 osd_map
= self
.get('osd_map')
1327 rbd_pools
= [pool
['pool_name'] for pool
in osd_map
['pools']
1328 if 'rbd' in pool
.get('application_metadata', {})]
1329 for x
in re
.split(r
'[\s,]+', pools_string
):
1335 namespace_name
= None
1337 namespace_name
= s
[1]
1339 if pool_name
== "*":
1340 # collect for all pools
1341 for pool
in rbd_pools
:
1342 pool_keys
.add((pool
, namespace_name
))
1344 if pool_name
in rbd_pools
:
1345 pool_keys
.add((pool_name
, namespace_name
)) # avoids adding deleted pool
1347 pools
= {} # type: Dict[str, Set[str]]
1348 for pool_key
in pool_keys
:
1349 pool_name
= pool_key
[0]
1350 namespace_name
= pool_key
[1]
1351 if not namespace_name
or namespace_name
== "*":
1352 # empty set means collect for all namespaces
1353 pools
[pool_name
] = set()
1356 if pool_name
not in pools
:
1357 pools
[pool_name
] = set()
1358 elif not pools
[pool_name
]:
1360 pools
[pool_name
].add(namespace_name
)
1362 rbd_stats_pools
= {}
1363 for pool_id
in self
.rbd_stats
['pools'].keys():
1364 name
= self
.rbd_stats
['pools'][pool_id
]['name']
1365 if name
not in pools
:
1366 del self
.rbd_stats
['pools'][pool_id
]
1368 rbd_stats_pools
[name
] = \
1369 self
.rbd_stats
['pools'][pool_id
]['ns_names']
1371 pools_refreshed
= False
1373 next_refresh
= self
.rbd_stats
['pools_refresh_time'] + \
1374 self
.get_localized_module_option(
1375 'rbd_stats_pools_refresh_interval', 300)
1376 if rbd_stats_pools
!= pools
or time
.time() >= next_refresh
:
1377 self
.refresh_rbd_stats_pools(pools
)
1378 pools_refreshed
= True
1380 pool_ids
= list(self
.rbd_stats
['pools'])
1382 pool_id_regex
= '^(' + '|'.join([str(x
) for x
in pool_ids
]) + ')$'
1385 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1386 if pool
['ns_names']:
1387 nspace_names
.extend(pool
['ns_names'])
1392 namespace_regex
= '^(' + \
1393 "|".join([re
.escape(x
)
1394 for x
in set(nspace_names
)]) + ')$'
1396 namespace_regex
= '^(.*)$'
1398 if ('query' in self
.rbd_stats
1399 and (pool_id_regex
!= self
.rbd_stats
['query']['key_descriptor'][0]['regex']
1400 or namespace_regex
!= self
.rbd_stats
['query']['key_descriptor'][1]['regex'])):
1401 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1402 del self
.rbd_stats
['query_id']
1403 del self
.rbd_stats
['query']
1405 if not self
.rbd_stats
['pools']:
1408 counters_info
= self
.rbd_stats
['counters_info']
1410 if 'query_id' not in self
.rbd_stats
:
1413 {'type': 'pool_id', 'regex': pool_id_regex
},
1414 {'type': 'namespace', 'regex': namespace_regex
},
1415 {'type': 'object_name',
1416 'regex': r
'^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
1418 'performance_counter_descriptors': list(counters_info
),
1420 query_id
= self
.add_osd_perf_query(query
)
1421 if query_id
is None:
1422 self
.log
.error('failed to add query %s' % query
)
1424 self
.rbd_stats
['query'] = query
1425 self
.rbd_stats
['query_id'] = query_id
1427 res
= self
.get_osd_perf_counters(self
.rbd_stats
['query_id'])
1429 for c
in res
['counters']:
1430 # if the pool id is not found in the object name use id of the
1431 # pool where the object is located
1433 pool_id
= int(c
['k'][2][0])
1435 pool_id
= int(c
['k'][0][0])
1436 if pool_id
not in self
.rbd_stats
['pools'] and not pools_refreshed
:
1437 self
.refresh_rbd_stats_pools(pools
)
1438 pools_refreshed
= True
1439 if pool_id
not in self
.rbd_stats
['pools']:
1441 pool
= self
.rbd_stats
['pools'][pool_id
]
1442 nspace_name
= c
['k'][1][0]
1443 if nspace_name
not in pool
['images']:
1445 image_id
= c
['k'][2][1]
1446 if image_id
not in pool
['images'][nspace_name
] and \
1447 not pools_refreshed
:
1448 self
.refresh_rbd_stats_pools(pools
)
1449 pool
= self
.rbd_stats
['pools'][pool_id
]
1450 pools_refreshed
= True
1451 if image_id
not in pool
['images'][nspace_name
]:
1453 counters
= pool
['images'][nspace_name
][image_id
]['c']
1454 for i
in range(len(c
['c'])):
1455 counters
[i
][0] += c
['c'][i
][0]
1456 counters
[i
][1] += c
['c'][i
][1]
1458 label_names
= ("pool", "namespace", "image")
1459 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1460 pool_name
= pool
['name']
1461 for nspace_name
, images
in pool
['images'].items():
1462 for image_id
in images
:
1463 image_name
= images
[image_id
]['n']
1464 counters
= images
[image_id
]['c']
1466 for key
in counters_info
:
1467 counter_info
= counters_info
[key
]
1468 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1469 labels
= (pool_name
, nspace_name
, image_name
)
1470 if counter_info
['type'] == self
.PERFCOUNTER_COUNTER
:
1472 if path
not in self
.metrics
:
1473 self
.metrics
[path
] = Metric(
1476 counter_info
['desc'],
1479 self
.metrics
[path
].set(counters
[i
][0], labels
)
1480 elif counter_info
['type'] == self
.PERFCOUNTER_LONGRUNAVG
:
1481 path
= 'rbd_' + key
+ '_sum'
1482 if path
not in self
.metrics
:
1483 self
.metrics
[path
] = Metric(
1486 counter_info
['desc'] + ' Total',
1489 self
.metrics
[path
].set(counters
[i
][0], labels
)
1490 path
= 'rbd_' + key
+ '_count'
1491 if path
not in self
.metrics
:
1492 self
.metrics
[path
] = Metric(
1495 counter_info
['desc'] + ' Count',
1498 self
.metrics
[path
].set(counters
[i
][1], labels
)
1501 def refresh_rbd_stats_pools(self
, pools
: Dict
[str, Set
[str]]) -> None:
1502 self
.log
.debug('refreshing rbd pools %s' % (pools
))
1505 counters_info
= self
.rbd_stats
['counters_info']
1506 for pool_name
, cfg_ns_names
in pools
.items():
1508 pool_id
= self
.rados
.pool_lookup(pool_name
)
1509 with self
.rados
.open_ioctx(pool_name
) as ioctx
:
1510 if pool_id
not in self
.rbd_stats
['pools']:
1511 self
.rbd_stats
['pools'][pool_id
] = {'images': {}}
1512 pool
= self
.rbd_stats
['pools'][pool_id
]
1513 pool
['name'] = pool_name
1514 pool
['ns_names'] = cfg_ns_names
1516 nspace_names
= list(cfg_ns_names
)
1518 nspace_names
= [''] + rbd
.namespace_list(ioctx
)
1519 for nspace_name
in pool
['images']:
1520 if nspace_name
not in nspace_names
:
1521 del pool
['images'][nspace_name
]
1522 for nspace_name
in nspace_names
:
1524 not rbd
.namespace_exists(ioctx
, nspace_name
):
1525 self
.log
.debug('unknown namespace %s for pool %s' %
1526 (nspace_name
, pool_name
))
1528 ioctx
.set_namespace(nspace_name
)
1529 if nspace_name
not in pool
['images']:
1530 pool
['images'][nspace_name
] = {}
1531 namespace
= pool
['images'][nspace_name
]
1533 for image_meta
in RBD().list2(ioctx
):
1534 image
= {'n': image_meta
['name']}
1535 image_id
= image_meta
['id']
1536 if image_id
in namespace
:
1537 image
['c'] = namespace
[image_id
]['c']
1539 image
['c'] = [[0, 0] for x
in counters_info
]
1540 images
[image_id
] = image
1541 pool
['images'][nspace_name
] = images
1542 except Exception as e
:
1543 self
.log
.error('failed listing pool %s: %s' % (pool_name
, e
))
1544 self
.rbd_stats
['pools_refresh_time'] = time
.time()
1546 def shutdown_rbd_stats(self
) -> None:
1547 if 'query_id' in self
.rbd_stats
:
1548 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1549 del self
.rbd_stats
['query_id']
1550 del self
.rbd_stats
['query']
1551 self
.rbd_stats
['pools'].clear()
1553 def add_fixed_name_metrics(self
) -> None:
1555 Add fixed name metrics from existing ones that have details in their names
1556 that should be in labels (not in name).
1557 For backward compatibility, a new fixed name metric is created (instead of replacing)
1558 and details are put in new labels.
1559 Intended for RGW sync perf. counters but extendable as required.
1560 See: https://tracker.ceph.com/issues/45311
1563 for metric_path
, metrics
in self
.metrics
.items():
1564 # Address RGW sync perf. counters.
1565 match
= re
.search(r
'^data-sync-from-(.*)\.', metric_path
)
1567 new_path
= re
.sub('from-([^.]*)', 'from-zone', metric_path
)
1568 if new_path
not in new_metrics
:
1569 new_metrics
[new_path
] = Metric(
1573 cast(LabelValues
, metrics
.labelnames
) + ('source_zone',)
1575 for label_values
, value
in metrics
.value
.items():
1576 new_metrics
[new_path
].set(value
, label_values
+ (match
.group(1),))
1578 self
.metrics
.update(new_metrics
)
1580 def get_collect_time_metrics(self
) -> None:
1581 sum_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_sum')
1582 count_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_count')
1583 if sum_metric
is None:
1584 sum_metric
= MetricCounter(
1585 'prometheus_collect_duration_seconds_sum',
1586 'The sum of seconds took to collect all metrics of this exporter',
1588 self
.metrics
['prometheus_collect_duration_seconds_sum'] = sum_metric
1589 if count_metric
is None:
1590 count_metric
= MetricCounter(
1591 'prometheus_collect_duration_seconds_count',
1592 'The amount of metrics gathered for this exporter',
1594 self
.metrics
['prometheus_collect_duration_seconds_count'] = count_metric
1596 # Collect all timing data and make it available as metric, excluding the
1597 # `collect` method because it has not finished at this point and hence
1598 # there's no `_execution_duration` attribute to be found. The
1599 # `_execution_duration` attribute is added by the `profile_method`
1601 for method_name
, method
in Module
.__dict
__.items():
1602 duration
= getattr(method
, '_execution_duration', None)
1603 if duration
is not None:
1604 cast(MetricCounter
, sum_metric
).add(duration
, (method_name
,))
1605 cast(MetricCounter
, count_metric
).add(1, (method_name
,))
1607 def get_pool_repaired_objects(self
) -> None:
1608 dump
= self
.get('pg_dump')
1609 for stats
in dump
['pool_stats']:
1610 path
= 'pool_objects_repaired'
1611 self
.metrics
[path
].set(stats
['stat_sum']['num_objects_repaired'],
1612 labelvalues
=(stats
['poolid'],))
1614 def get_all_daemon_health_metrics(self
) -> None:
1615 daemon_metrics
= self
.get_daemon_health_metrics()
1616 self
.log
.debug('metrics jeje %s' % (daemon_metrics
))
1617 for daemon_name
, health_metrics
in daemon_metrics
.items():
1618 for health_metric
in health_metrics
:
1619 path
= 'daemon_health_metrics'
1620 self
.metrics
[path
].set(health_metric
['value'], labelvalues
=(
1621 health_metric
['type'], daemon_name
,))
1623 @profile_method(True)
1624 def collect(self
) -> str:
1625 # Clear the metrics before scraping
1626 for k
in self
.metrics
.keys():
1627 self
.metrics
[k
].clear()
1631 self
.get_osd_blocklisted_entries()
1632 self
.get_pool_stats()
1634 self
.get_osd_stats()
1635 self
.get_quorum_status()
1636 self
.get_mgr_status()
1637 self
.get_metadata_and_osd_status()
1638 self
.get_pg_status()
1639 self
.get_pool_repaired_objects()
1640 self
.get_num_objects()
1641 self
.get_all_daemon_health_metrics()
1643 for daemon
, counters
in self
.get_all_perf_counters().items():
1644 for path
, counter_info
in counters
.items():
1645 # Skip histograms, they are represented by long running avgs
1646 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1647 if not stattype
or stattype
== 'histogram':
1648 self
.log
.debug('ignoring %s, type %s' % (path
, stattype
))
1651 path
, label_names
, labels
= self
._perfpath
_to
_path
_labels
(
1654 # Get the value of the counter
1655 value
= self
._perfvalue
_to
_value
(
1656 counter_info
['type'], counter_info
['value'])
1658 # Represent the long running avgs as sum/count pairs
1659 if counter_info
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1660 _path
= path
+ '_sum'
1661 if _path
not in self
.metrics
:
1662 self
.metrics
[_path
] = Metric(
1665 counter_info
['description'] + ' Total',
1668 self
.metrics
[_path
].set(value
, labels
)
1670 _path
= path
+ '_count'
1671 if _path
not in self
.metrics
:
1672 self
.metrics
[_path
] = Metric(
1675 counter_info
['description'] + ' Count',
1678 self
.metrics
[_path
].set(counter_info
['count'], labels
,)
1680 if path
not in self
.metrics
:
1681 self
.metrics
[path
] = Metric(
1684 counter_info
['description'],
1687 self
.metrics
[path
].set(value
, labels
)
1689 self
.add_fixed_name_metrics()
1690 self
.get_rbd_stats()
1692 self
.get_collect_time_metrics()
1694 # Return formatted metrics and clear no longer used data
1695 _metrics
= [m
.str_expfmt() for m
in self
.metrics
.values()]
1696 for k
in self
.metrics
.keys():
1697 self
.metrics
[k
].clear()
1699 return ''.join(_metrics
) + '\n'
1701 @CLIReadCommand('prometheus file_sd_config')
1702 def get_file_sd_config(self
) -> Tuple
[int, str, str]:
1704 Return file_sd compatible prometheus config for mgr cluster
1706 servers
= self
.list_servers()
1708 for server
in servers
:
1709 hostname
= server
.get('hostname', '')
1710 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1711 if service
['type'] != 'mgr':
1714 port
= self
._get
_module
_option
('server_port', DEFAULT_PORT
, id_
)
1715 targets
.append(f
'{hostname}:{port}')
1722 return 0, json
.dumps(ret
), ""
1724 def self_test(self
) -> None:
1726 self
.get_file_sd_config()
1728 def configure(self
, server_addr
: str, server_port
: int) -> None:
1729 # cephadm deployments have a TLS monitoring stack setup option.
1730 # If the cephadm module is on and the setting is true (defaults to false)
1731 # we should have prometheus be set up to interact with that
1732 cephadm_secure_monitoring_stack
= self
.get_module_option_ex(
1733 'cephadm', 'secure_monitoring_stack', False)
1734 if cephadm_secure_monitoring_stack
:
1736 self
.setup_cephadm_tls_config(server_addr
, server_port
)
1738 except Exception as e
:
1739 self
.log
.exception(f
'Failed to setup cephadm based secure monitoring stack: {e}\n',
1740 'Falling back to default configuration')
1741 self
.setup_default_config(server_addr
, server_port
)
1743 def setup_default_config(self
, server_addr
: str, server_port
: int) -> None:
1744 cherrypy
.config
.update({
1745 'server.socket_host': server_addr
,
1746 'server.socket_port': server_port
,
1747 'engine.autoreload.on': False,
1748 'server.ssl_module': None,
1749 'server.ssl_certificate': None,
1750 'server.ssl_private_key': None,
1752 # Publish the URI that others may use to access the service we're about to start serving
1753 self
.set_uri(build_url(scheme
='http', host
=self
.get_server_addr(),
1754 port
=server_port
, path
='/'))
1756 def setup_cephadm_tls_config(self
, server_addr
: str, server_port
: int) -> None:
1757 from cephadm
.ssl_cert_utils
import SSLCerts
1758 # the ssl certs utils uses a NamedTemporaryFile for the cert files
1759 # generated with generate_cert_files function. We need the SSLCerts
1760 # object to not be cleaned up in order to have those temp files not
1761 # be cleaned up, so making it an attribute of the module instead
1762 # of just a standalone object
1763 self
.cephadm_monitoring_tls_ssl_certs
= SSLCerts()
1764 host
= self
.get_mgr_ip()
1766 old_cert
= self
.get_store('root/cert')
1767 old_key
= self
.get_store('root/key')
1768 if not old_cert
or not old_key
:
1769 raise Exception('No old credentials for mgr-prometheus endpoint')
1770 self
.cephadm_monitoring_tls_ssl_certs
.load_root_credentials(old_cert
, old_key
)
1772 self
.cephadm_monitoring_tls_ssl_certs
.generate_root_cert(host
)
1773 self
.set_store('root/cert', self
.cephadm_monitoring_tls_ssl_certs
.get_root_cert())
1774 self
.set_store('root/key', self
.cephadm_monitoring_tls_ssl_certs
.get_root_key())
1776 cert_file_path
, key_file_path
= self
.cephadm_monitoring_tls_ssl_certs
.generate_cert_files(
1777 self
.get_hostname(), host
)
1779 cherrypy
.config
.update({
1780 'server.socket_host': server_addr
,
1781 'server.socket_port': server_port
,
1782 'engine.autoreload.on': False,
1783 'server.ssl_module': 'builtin',
1784 'server.ssl_certificate': cert_file_path
,
1785 'server.ssl_private_key': key_file_path
,
1787 # Publish the URI that others may use to access the service we're about to start serving
1788 self
.set_uri(build_url(scheme
='https', host
=self
.get_server_addr(),
1789 port
=server_port
, path
='/'))
1791 def serve(self
) -> None:
1795 # collapse everything to '/'
1796 def _cp_dispatch(self
, vpath
: str) -> 'Root':
1797 cherrypy
.request
.path
= ''
1801 def index(self
) -> str:
1802 return '''<!DOCTYPE html>
1804 <head><title>Ceph Exporter</title></head>
1806 <h1>Ceph Exporter</h1>
1807 <p><a href='/metrics'>Metrics</a></p>
1812 def metrics(self
) -> Optional
[str]:
1813 # Lock the function execution
1814 assert isinstance(_global_instance
, Module
)
1815 with _global_instance
.collect_lock
:
1816 return self
._metrics
(_global_instance
)
1819 def _metrics(instance
: 'Module') -> Optional
[str]:
1821 self
.log
.debug('Cache disabled, collecting and returning without cache')
1822 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1823 return self
.collect()
1825 # Return cached data if available
1826 if not instance
.collect_cache
:
1827 raise cherrypy
.HTTPError(503, 'No cached data available yet')
1829 def respond() -> Optional
[str]:
1830 assert isinstance(instance
, Module
)
1831 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1832 return instance
.collect_cache
1834 if instance
.collect_time
< instance
.scrape_interval
:
1835 # Respond if cache isn't stale
1838 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_RETURN
:
1839 # Respond even if cache is stale
1841 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1842 'returning metrics from stale cache.'.format(
1843 instance
.collect_time
,
1844 instance
.collect_time
- instance
.scrape_interval
1849 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_FAIL
:
1850 # Fail if cache is stale
1852 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1853 'returning "service unavailable".'.format(
1854 instance
.collect_time
,
1855 instance
.collect_time
- instance
.scrape_interval
,
1858 instance
.log
.error(msg
)
1859 raise cherrypy
.HTTPError(503, msg
)
1862 # Make the cache timeout for collecting configurable
1863 self
.scrape_interval
= cast(float, self
.get_localized_module_option('scrape_interval'))
1865 self
.stale_cache_strategy
= cast(
1866 str, self
.get_localized_module_option('stale_cache_strategy'))
1867 if self
.stale_cache_strategy
not in [self
.STALE_CACHE_FAIL
,
1868 self
.STALE_CACHE_RETURN
]:
1869 self
.stale_cache_strategy
= self
.STALE_CACHE_FAIL
1871 server_addr
= cast(str, self
.get_localized_module_option('server_addr', get_default_addr()))
1872 server_port
= cast(int, self
.get_localized_module_option('server_port', DEFAULT_PORT
))
1874 "server_addr: %s server_port: %s" %
1875 (server_addr
, server_port
)
1878 self
.cache
= cast(bool, self
.get_localized_module_option('cache', True))
1880 self
.log
.info('Cache enabled')
1881 self
.metrics_thread
.start()
1883 self
.log
.info('Cache disabled')
1885 self
.configure(server_addr
, server_port
)
1887 cherrypy
.tree
.mount(Root(), "/")
1888 self
.log
.info('Starting engine...')
1889 cherrypy
.engine
.start()
1890 self
.log
.info('Engine started.')
1892 # wait for the shutdown event
1893 self
.shutdown_event
.wait()
1894 self
.shutdown_event
.clear()
1895 # tell metrics collection thread to stop collecting new metrics
1896 self
.metrics_thread
.stop()
1897 cherrypy
.engine
.stop()
1898 cherrypy
.server
.httpserver
= None
1899 self
.log
.info('Engine stopped.')
1900 self
.shutdown_rbd_stats()
1901 # wait for the metrics collection thread to stop
1902 self
.metrics_thread
.join()
1904 def shutdown(self
) -> None:
1905 self
.log
.info('Stopping engine...')
1906 self
.shutdown_event
.set()
1908 @CLIReadCommand('healthcheck history ls')
1909 def _list_healthchecks(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1910 """List all the healthchecks being tracked
1912 The format options are parsed in ceph_argparse, before they get evaluated here so
1913 we can safely assume that what we have to process is valid. ceph_argparse will throw
1914 a ValueError if the cast to our Format class fails.
1917 format (Format, optional): output format. Defaults to Format.plain.
1920 HandleCommandResult: return code, stdout and stderr returned to the caller
1924 if format
== Format
.plain
:
1925 out
= str(self
.health_history
)
1926 elif format
== Format
.yaml
:
1927 out
= self
.health_history
.as_yaml()
1929 out
= self
.health_history
.as_json(format
== Format
.json_pretty
)
1931 return HandleCommandResult(retval
=0, stdout
=out
)
1933 @CLIWriteCommand('healthcheck history clear')
1934 def _clear_healthchecks(self
) -> HandleCommandResult
:
1935 """Clear the healthcheck history"""
1936 self
.health_history
.reset()
1937 return HandleCommandResult(retval
=0, stdout
="healthcheck history cleared")
1940 class StandbyModule(MgrStandbyModule
):
1942 MODULE_OPTIONS
= Module
.MODULE_OPTIONS
1944 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
1945 super(StandbyModule
, self
).__init
__(*args
, **kwargs
)
1946 self
.shutdown_event
= threading
.Event()
1948 def serve(self
) -> None:
1949 server_addr
= self
.get_localized_module_option(
1950 'server_addr', get_default_addr())
1951 server_port
= self
.get_localized_module_option(
1952 'server_port', DEFAULT_PORT
)
1953 self
.log
.info("server_addr: %s server_port: %s" %
1954 (server_addr
, server_port
))
1955 cherrypy
.config
.update({
1956 'server.socket_host': server_addr
,
1957 'server.socket_port': server_port
,
1958 'engine.autoreload.on': False,
1959 'request.show_tracebacks': False
1966 def index(self
) -> str:
1967 standby_behaviour
= module
.get_module_option('standby_behaviour')
1968 if standby_behaviour
== 'default':
1969 active_uri
= module
.get_active_uri()
1970 return '''<!DOCTYPE html>
1972 <head><title>Ceph Exporter</title></head>
1974 <h1>Ceph Exporter</h1>
1975 <p><a href='{}metrics'>Metrics</a></p>
1977 </html>'''.format(active_uri
)
1979 status
= module
.get_module_option('standby_error_status_code')
1980 raise cherrypy
.HTTPError(status
, message
="Keep on looking")
1983 def metrics(self
) -> str:
1984 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1987 cherrypy
.tree
.mount(Root(), '/', {})
1988 self
.log
.info('Starting engine...')
1989 cherrypy
.engine
.start()
1990 self
.log
.info('Engine started.')
1991 # Wait for shutdown event
1992 self
.shutdown_event
.wait()
1993 self
.shutdown_event
.clear()
1994 cherrypy
.engine
.stop()
1995 cherrypy
.server
.httpserver
= None
1996 self
.log
.info('Engine stopped.')
1998 def shutdown(self
) -> None:
1999 self
.log
.info("Stopping engine...")
2000 self
.shutdown_event
.set()
2001 self
.log
.info("Stopped engine")