3 from collections
import defaultdict
4 from pkg_resources
import packaging
# type: ignore
12 from collections
import namedtuple
14 from mgr_module
import CLIReadCommand
, MgrModule
, MgrStandbyModule
, PG_STATES
, Option
, ServiceInfoT
, HandleCommandResult
, CLIWriteCommand
15 from mgr_util
import get_default_addr
, profile_method
, build_url
16 from orchestrator
import OrchestratorClientMixin
, raise_if_exception
, OrchestratorError
19 from typing
import DefaultDict
, Optional
, Dict
, Any
, Set
, cast
, Tuple
, Union
, List
, Callable
21 LabelValues
= Tuple
[str, ...]
22 Number
= Union
[int, float]
23 MetricValue
= Dict
[LabelValues
, Number
]
25 # Defaults for the Prometheus HTTP server. Can also set in config-key
26 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
27 # for Prometheus exporter port registry
31 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
32 # that the ports its listening on are in fact bound. When using the any address
33 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
34 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
36 if cherrypy
is not None:
37 Version
= packaging
.version
.Version
38 v
= Version(cherrypy
.__version
__)
39 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
40 # centos:7) and back to at least 3.0.0.
41 if Version("3.1.2") <= v
< Version("3.2.3"):
42 # https://github.com/cherrypy/cherrypy/issues/1100
43 from cherrypy
.process
import servers
44 servers
.wait_for_occupied_port
= lambda host
, port
: None
47 # cherrypy likes to sys.exit on error. don't let it take us down too!
48 def os_exit_noop(status
: int) -> None:
52 os
._exit
= os_exit_noop
# type: ignore
54 # to access things in class Module from subclass Root. Because
55 # it's a dict, the writer doesn't need to declare 'global' for access
57 _global_instance
= None # type: Optional[Module]
58 cherrypy
.config
.update({
59 'response.headers.server': 'Ceph-Prometheus'
63 def health_status_to_number(status
: str) -> int:
64 if status
== 'HEALTH_OK':
66 elif status
== 'HEALTH_WARN':
68 elif status
== 'HEALTH_ERR':
70 raise ValueError(f
'unknown status "{status}"')
73 DF_CLUSTER
= ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
75 OSD_BLOCKLIST
= ['osd_blocklist_count']
77 DF_POOL
= ['max_avail', 'avail_raw', 'stored', 'stored_raw', 'objects', 'dirty',
78 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
79 'compress_bytes_used', 'compress_under_bytes', 'bytes_used', 'percent_used']
81 OSD_POOL_STATS
= ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
82 'recovering_keys_per_sec', 'num_objects_recovered',
83 'num_bytes_recovered', 'num_bytes_recovered')
85 OSD_FLAGS
= ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
86 'norecover', 'noscrub', 'nodeep-scrub')
88 FS_METADATA
= ('data_pools', 'fs_id', 'metadata_pool', 'name')
90 MDS_METADATA
= ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
93 MON_METADATA
= ('ceph_daemon', 'hostname',
94 'public_addr', 'rank', 'ceph_version')
96 MGR_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version')
98 MGR_STATUS
= ('ceph_daemon',)
100 MGR_MODULE_STATUS
= ('name',)
102 MGR_MODULE_CAN_RUN
= ('name',)
104 OSD_METADATA
= ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
105 'front_iface', 'hostname', 'objectstore', 'public_addr',
108 OSD_STATUS
= ['weight', 'up', 'in']
110 OSD_STATS
= ['apply_latency_ms', 'commit_latency_ms']
112 POOL_METADATA
= ('pool_id', 'name', 'type', 'description', 'compression_mode')
114 RGW_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version', 'instance_id')
116 RBD_MIRROR_METADATA
= ('ceph_daemon', 'id', 'instance_id', 'hostname',
119 DISK_OCCUPATION
= ('ceph_daemon', 'device', 'db_device',
120 'wal_device', 'instance', 'devices', 'device_ids')
122 NUM_OBJECTS
= ['degraded', 'misplaced', 'unfound']
124 alert_metric
= namedtuple('alert_metric', 'name description')
126 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process'),
129 HEALTHCHECK_DETAIL
= ('name', 'severity')
132 class Severity(enum
.Enum
):
138 class Format(enum
.Enum
):
141 json_pretty
= 'json-pretty'
145 class HealthCheckEvent
:
147 def __init__(self
, name
: str, severity
: Severity
, first_seen
: float, last_seen
: float, count
: int, active
: bool = True):
149 self
.severity
= severity
150 self
.first_seen
= first_seen
151 self
.last_seen
= last_seen
155 def as_dict(self
) -> Dict
[str, Any
]:
156 """Return the instance as a dictionary."""
161 kv_name
= 'health_history'
162 titles
= "{healthcheck_name:<24} {first_seen:<20} {last_seen:<20} {count:>5} {active:^6}"
163 date_format
= "%Y/%m/%d %H:%M:%S"
165 def __init__(self
, mgr
: MgrModule
):
167 self
.lock
= threading
.Lock()
168 self
.healthcheck
: Dict
[str, HealthCheckEvent
] = {}
171 def _load(self
) -> None:
172 """Load the current state from the mons KV store."""
173 data
= self
.mgr
.get_store(self
.kv_name
)
176 healthcheck_data
= json
.loads(data
)
177 except json
.JSONDecodeError
:
179 f
"INVALID data read from mgr/prometheus/{self.kv_name}. Resetting")
183 for k
, v
in healthcheck_data
.items():
184 self
.healthcheck
[k
] = HealthCheckEvent(
186 severity
=v
.get('severity'),
187 first_seen
=v
.get('first_seen', 0),
188 last_seen
=v
.get('last_seen', 0),
189 count
=v
.get('count', 1),
190 active
=v
.get('active', True))
194 def reset(self
) -> None:
195 """Reset the healthcheck history."""
197 self
.mgr
.set_store(self
.kv_name
, "{}")
198 self
.healthcheck
= {}
200 def save(self
) -> None:
201 """Save the current in-memory healthcheck history to the KV store."""
203 self
.mgr
.set_store(self
.kv_name
, self
.as_json())
205 def check(self
, health_checks
: Dict
[str, Any
]) -> None:
206 """Look at the current health checks and compare existing the history.
209 health_checks (Dict[str, Any]): current health check data
212 current_checks
= health_checks
.get('checks', {})
215 # first turn off any active states we're tracking
216 for seen_check
in self
.healthcheck
:
217 check
= self
.healthcheck
[seen_check
]
218 if check
.active
and seen_check
not in current_checks
:
222 # now look for any additions to track
224 for name
, info
in current_checks
.items():
225 if name
not in self
.healthcheck
:
226 # this healthcheck is new, so start tracking it
228 self
.healthcheck
[name
] = HealthCheckEvent(
230 severity
=info
.get('severity'),
237 # seen it before, so update its metadata
238 check
= self
.healthcheck
[name
]
240 # check has been registered as active already, so skip
243 check
.last_seen
= now
251 def __str__(self
) -> str:
252 """Print the healthcheck history.
255 str: Human readable representation of the healthcheck history
259 if len(self
.healthcheck
.keys()) == 0:
260 out
.append("No healthchecks have been recorded")
262 out
.append(self
.titles
.format(
263 healthcheck_name
="Healthcheck Name",
264 first_seen
="First Seen (UTC)",
265 last_seen
="Last seen (UTC)",
269 for k
in sorted(self
.healthcheck
.keys()):
270 check
= self
.healthcheck
[k
]
271 out
.append(self
.titles
.format(
272 healthcheck_name
=check
.name
,
273 first_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.first_seen
)),
274 last_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.last_seen
)),
276 active
="Yes" if check
.active
else "No")
278 out
.extend([f
"{len(self.healthcheck)} health check(s) listed", ""])
280 return "\n".join(out
)
282 def as_dict(self
) -> Dict
[str, Any
]:
283 """Return the history in a dictionary.
286 Dict[str, Any]: dictionary indexed by the healthcheck name
288 return {name
: self
.healthcheck
[name
].as_dict() for name
in self
.healthcheck
}
290 def as_json(self
, pretty
: bool = False) -> str:
291 """Return the healthcheck history object as a dict (JSON).
294 pretty (bool, optional): whether to json pretty print the history. Defaults to False.
297 str: str representation of the healthcheck in JSON format
300 return json
.dumps(self
.as_dict(), indent
=2)
302 return json
.dumps(self
.as_dict())
304 def as_yaml(self
) -> str:
305 """Return the healthcheck history in yaml format.
308 str: YAML representation of the healthcheck history
310 return yaml
.safe_dump(self
.as_dict(), explicit_start
=True, default_flow_style
=False)
313 class Metric(object):
314 def __init__(self
, mtype
: str, name
: str, desc
: str, labels
: Optional
[LabelValues
] = None) -> None:
318 self
.labelnames
= labels
# tuple if present
319 self
.value
: Dict
[LabelValues
, Number
] = {}
321 def clear(self
) -> None:
324 def set(self
, value
: Number
, labelvalues
: Optional
[LabelValues
] = None) -> None:
325 # labelvalues must be a tuple
326 labelvalues
= labelvalues
or ('',)
327 self
.value
[labelvalues
] = value
329 def str_expfmt(self
) -> str:
331 # Must be kept in sync with promethize() in src/exporter/util.cc
332 def promethize(path
: str) -> str:
333 ''' replace illegal metric name characters '''
334 result
= re
.sub(r
'[./\s]|::', '_', path
).replace('+', '_plus')
336 # Hyphens usually turn into underscores, unless they are
338 if result
.endswith("-"):
339 result
= result
[0:-1] + "_minus"
341 result
= result
.replace("-", "_")
343 return "ceph_{0}".format(result
)
345 def floatstr(value
: float) -> str:
346 ''' represent as Go-compatible float '''
347 if value
== float('inf'):
349 if value
== float('-inf'):
351 if math
.isnan(value
):
353 return repr(float(value
))
355 name
= promethize(self
.name
)
358 # TYPE {name} {mtype}'''.format(
364 for labelvalues
, value
in self
.value
.items():
366 labels_list
= zip(self
.labelnames
, labelvalues
)
367 labels
= ','.join('%s="%s"' % (k
, v
) for k
, v
in labels_list
)
371 fmtstr
= '\n{name}{{{labels}}} {value}'
373 fmtstr
= '\n{name} {value}'
374 expfmt
+= fmtstr
.format(
377 value
=floatstr(value
),
384 joins
: Dict
[str, Callable
[[List
[str]], str]],
385 name
: Optional
[str] = None,
388 Groups data by label names.
390 Label names not passed are being removed from the resulting metric but
391 by providing a join function, labels of metrics can be grouped.
393 The purpose of this method is to provide a version of a metric that can
394 be used in matching where otherwise multiple results would be returned.
396 As grouping is possible in Prometheus, the only additional value of this
397 method is the possibility to join labels when grouping. For that reason,
398 passing joins is required. Please use PromQL expressions in all other
401 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
406 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
409 The functionality of group by could roughly be compared with Prometheus'
411 group (ceph_disk_occupation) by (device, instance)
413 with the exception that not all labels which aren't used as a condition
414 to group a metric are discarded, but their values can are joined and the
415 label is thereby preserved.
417 This function takes the value of the first entry of a found group to be
418 used for the resulting value of the grouping operation.
420 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
422 ... ('foo', 'x'): 555,
423 ... ('foo', 'y'): 10,
425 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
426 {('foo', 'x,y'): 555}
428 assert self
.labelnames
, "cannot match keys without label names"
430 assert key
in self
.labelnames
, "unknown key: {}".format(key
)
431 assert joins
, "joins must not be empty"
432 assert all(callable(c
) for c
in joins
.values()), "joins must be callable"
435 grouped
: Dict
[LabelValues
, List
[Tuple
[Dict
[str, str], Number
]]] = defaultdict(list)
436 for label_values
, metric_value
in self
.value
.items():
437 labels
= dict(zip(self
.labelnames
, label_values
))
438 if not all(k
in labels
for k
in keys
):
440 group_key
= tuple(labels
[k
] for k
in keys
)
441 grouped
[group_key
].append((labels
, metric_value
))
443 # as there is nothing specified on how to join labels that are not equal
444 # and Prometheus `group` aggregation functions similarly, we simply drop
447 label
for label
in self
.labelnames
if label
in keys
or label
in joins
449 superfluous_labelnames
= [
450 label
for label
in self
.labelnames
if label
not in labelnames
453 # iterate and convert groups with more than one member into a single
455 values
: MetricValue
= {}
456 for group
in grouped
.values():
457 labels
, metric_value
= group
[0]
459 for label
in superfluous_labelnames
:
463 for key
, fn
in joins
.items():
464 labels
[key
] = fn(list(labels
[key
] for labels
, _
in group
))
466 values
[tuple(labels
.values())] = metric_value
468 new_metric
= Metric(self
.mtype
, name
if name
else self
.name
, self
.desc
, labelnames
)
469 new_metric
.value
= values
474 class MetricCounter(Metric
):
478 labels
: Optional
[LabelValues
] = None) -> None:
479 super(MetricCounter
, self
).__init
__('counter', name
, desc
, labels
)
480 self
.value
= defaultdict(lambda: 0)
482 def clear(self
) -> None:
483 pass # Skip calls to clear as we want to keep the counters here.
487 labelvalues
: Optional
[LabelValues
] = None) -> None:
488 msg
= 'This method must not be used for instances of MetricCounter class'
489 raise NotImplementedError(msg
)
493 labelvalues
: Optional
[LabelValues
] = None) -> None:
494 # labelvalues must be a tuple
495 labelvalues
= labelvalues
or ('',)
496 self
.value
[labelvalues
] += value
499 class MetricCollectionThread(threading
.Thread
):
500 def __init__(self
, module
: 'Module') -> None:
503 self
.event
= threading
.Event()
504 super(MetricCollectionThread
, self
).__init
__(target
=self
.collect
)
506 def collect(self
) -> None:
507 self
.mod
.log
.info('starting metric collection thread')
509 self
.mod
.log
.debug('collecting cache in thread')
510 if self
.mod
.have_mon_connection():
511 start_time
= time
.time()
514 data
= self
.mod
.collect()
516 # Log any issues encountered during the data collection and continue
517 self
.mod
.log
.exception("failed to collect metrics:")
518 self
.event
.wait(self
.mod
.scrape_interval
)
521 duration
= time
.time() - start_time
522 self
.mod
.log
.debug('collecting cache in thread done')
524 sleep_time
= self
.mod
.scrape_interval
- duration
526 self
.mod
.log
.warning(
527 'Collecting data took more time than configured scrape interval. '
528 'This possibly results in stale data. Please check the '
529 '`stale_cache_strategy` configuration option. '
530 'Collecting data took {:.2f} seconds but scrape interval is configured '
531 'to be {:.0f} seconds.'.format(
533 self
.mod
.scrape_interval
,
538 with self
.mod
.collect_lock
:
539 self
.mod
.collect_cache
= data
540 self
.mod
.collect_time
= duration
542 self
.event
.wait(sleep_time
)
544 self
.mod
.log
.error('No MON connection')
545 self
.event
.wait(self
.mod
.scrape_interval
)
547 def stop(self
) -> None:
552 class Module(MgrModule
, OrchestratorClientMixin
):
556 default
=get_default_addr(),
557 desc
='the IPv4 or IPv6 address on which the module listens for HTTP requests',
562 default
=DEFAULT_PORT
,
563 desc
='the port on which the module listens for HTTP requests',
572 'stale_cache_strategy',
585 name
='rbd_stats_pools_refresh_interval',
590 name
='standby_behaviour',
593 enum_allowed
=['default', 'error'],
597 name
='standby_error_status_code',
605 name
='exclude_perf_counters',
608 desc
='Do not include perf-counters in the metrics output',
609 long_desc
='Gathering perf-counters from a single Prometheus exporter can degrade ceph-mgr performance, especially in large clusters. Instead, Ceph-exporter daemons are now used by default for perf-counter gathering. This should only be disabled when no ceph-exporters are deployed.',
614 STALE_CACHE_FAIL
= 'fail'
615 STALE_CACHE_RETURN
= 'return'
617 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
618 super(Module
, self
).__init
__(*args
, **kwargs
)
619 self
.metrics
= self
._setup
_static
_metrics
()
620 self
.shutdown_event
= threading
.Event()
621 self
.collect_lock
= threading
.Lock()
622 self
.collect_time
= 0.0
623 self
.scrape_interval
: float = 15.0
625 self
.stale_cache_strategy
: str = self
.STALE_CACHE_FAIL
626 self
.collect_cache
: Optional
[str] = None
629 'pools_refresh_time': 0,
631 'write_ops': {'type': self
.PERFCOUNTER_COUNTER
,
632 'desc': 'RBD image writes count'},
633 'read_ops': {'type': self
.PERFCOUNTER_COUNTER
,
634 'desc': 'RBD image reads count'},
635 'write_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
636 'desc': 'RBD image bytes written'},
637 'read_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
638 'desc': 'RBD image bytes read'},
639 'write_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
640 'desc': 'RBD image writes latency (msec)'},
641 'read_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
642 'desc': 'RBD image reads latency (msec)'},
644 } # type: Dict[str, Any]
645 global _global_instance
646 _global_instance
= self
647 self
.metrics_thread
= MetricCollectionThread(_global_instance
)
648 self
.health_history
= HealthHistory(self
)
650 def _setup_static_metrics(self
) -> Dict
[str, Metric
]:
652 metrics
['health_status'] = Metric(
655 'Cluster health status'
657 metrics
['mon_quorum_status'] = Metric(
660 'Monitors in quorum',
663 metrics
['fs_metadata'] = Metric(
669 metrics
['mds_metadata'] = Metric(
675 metrics
['mon_metadata'] = Metric(
681 metrics
['mgr_metadata'] = Metric(
687 metrics
['mgr_status'] = Metric(
690 'MGR status (0=standby, 1=active)',
693 metrics
['mgr_module_status'] = Metric(
696 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
699 metrics
['mgr_module_can_run'] = Metric(
701 'mgr_module_can_run',
702 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
705 metrics
['osd_metadata'] = Metric(
712 # The reason for having this separate to OSD_METADATA is
713 # so that we can stably use the same tag names that
714 # the Prometheus node_exporter does
715 metrics
['disk_occupation'] = Metric(
718 'Associate Ceph daemon with disk used',
722 metrics
['disk_occupation_human'] = Metric(
724 'disk_occupation_human',
725 'Associate Ceph daemon with disk used for displaying to humans,'
726 ' not for joining tables (vector matching)',
727 DISK_OCCUPATION
, # label names are automatically decimated on grouping
730 metrics
['pool_metadata'] = Metric(
737 metrics
['rgw_metadata'] = Metric(
744 metrics
['rbd_mirror_metadata'] = Metric(
746 'rbd_mirror_metadata',
747 'RBD Mirror Metadata',
751 metrics
['pg_total'] = Metric(
754 'PG Total Count per Pool',
758 metrics
['health_detail'] = Metric(
761 'healthcheck status by type (0=inactive, 1=active)',
765 metrics
['pool_objects_repaired'] = Metric(
767 'pool_objects_repaired',
768 'Number of objects repaired in a pool',
772 metrics
['daemon_health_metrics'] = Metric(
774 'daemon_health_metrics',
775 'Health metrics for Ceph daemons',
776 ('type', 'ceph_daemon',)
779 for flag
in OSD_FLAGS
:
780 path
= 'osd_flag_{}'.format(flag
)
781 metrics
[path
] = Metric(
784 'OSD Flag {}'.format(flag
)
786 for state
in OSD_STATUS
:
787 path
= 'osd_{}'.format(state
)
788 metrics
[path
] = Metric(
791 'OSD status {}'.format(state
),
794 for stat
in OSD_STATS
:
795 path
= 'osd_{}'.format(stat
)
796 metrics
[path
] = Metric(
799 'OSD stat {}'.format(stat
),
802 for stat
in OSD_POOL_STATS
:
803 path
= 'pool_{}'.format(stat
)
804 metrics
[path
] = Metric(
807 "OSD pool stats: {}".format(stat
),
810 for state
in PG_STATES
:
811 path
= 'pg_{}'.format(state
)
812 metrics
[path
] = Metric(
815 'PG {} per pool'.format(state
),
818 for state
in DF_CLUSTER
:
819 path
= 'cluster_{}'.format(state
)
820 metrics
[path
] = Metric(
823 'DF {}'.format(state
),
825 path
= 'cluster_by_class_{}'.format(state
)
826 metrics
[path
] = Metric(
829 'DF {}'.format(state
),
832 for state
in DF_POOL
:
833 path
= 'pool_{}'.format(state
)
834 metrics
[path
] = Metric(
835 'counter' if state
in ('rd', 'rd_bytes', 'wr', 'wr_bytes') else 'gauge',
837 'DF pool {}'.format(state
),
840 for state
in OSD_BLOCKLIST
:
841 path
= 'cluster_{}'.format(state
)
842 metrics
[path
] = Metric(
845 'OSD Blocklist Count {}'.format(state
),
847 for state
in NUM_OBJECTS
:
848 path
= 'num_objects_{}'.format(state
)
849 metrics
[path
] = Metric(
852 'Number of {} objects'.format(state
),
855 for check
in HEALTH_CHECKS
:
856 path
= 'healthcheck_{}'.format(check
.name
.lower())
857 metrics
[path
] = Metric(
865 def orch_is_available(self
) -> bool:
867 return self
.available()[0]
868 except (RuntimeError, OrchestratorError
, ImportError):
869 # import error could happend during startup in case
870 # orchestrator has not been loaded yet by the mgr
873 def get_server_addr(self
) -> str:
875 Return the current mgr server IP.
877 server_addr
= cast(str, self
.get_localized_module_option('server_addr', get_default_addr()))
878 if server_addr
in ['::', '0.0.0.0']:
879 return self
.get_mgr_ip()
882 def config_notify(self
) -> None:
884 This method is called whenever one of our config options is changed.
886 # https://stackoverflow.com/questions/7254845/change-cherrypy-port-and-restart-web-server
887 # if we omit the line: cherrypy.server.httpserver = None
888 # then the cherrypy server is not restarted correctly
889 self
.log
.info('Restarting engine...')
890 cherrypy
.engine
.stop()
891 cherrypy
.server
.httpserver
= None
892 server_addr
= cast(str, self
.get_localized_module_option('server_addr', get_default_addr()))
893 server_port
= cast(int, self
.get_localized_module_option('server_port', DEFAULT_PORT
))
894 self
.configure(server_addr
, server_port
)
895 cherrypy
.engine
.start()
896 self
.log
.info('Engine started.')
899 def get_health(self
) -> None:
901 def _get_value(message
: str, delim
: str = ' ', word_pos
: int = 0) -> Tuple
[int, int]:
902 """Extract value from message (default is 1st field)"""
903 v_str
= message
.split(delim
)[word_pos
]
908 health
= json
.loads(self
.get('health')['json'])
910 self
.metrics
['health_status'].set(
911 health_status_to_number(health
['status'])
914 # Examine the health to see if any health checks triggered need to
915 # become a specific metric with a value from the health detail
916 active_healthchecks
= health
.get('checks', {})
917 active_names
= active_healthchecks
.keys()
919 for check
in HEALTH_CHECKS
:
920 path
= 'healthcheck_{}'.format(check
.name
.lower())
922 if path
in self
.metrics
:
924 if check
.name
in active_names
:
925 check_data
= active_healthchecks
[check
.name
]
926 message
= check_data
['summary'].get('message', '')
929 if check
.name
== "SLOW_OPS":
930 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have
932 v
, err
= _get_value(message
)
936 "healthcheck %s message format is incompatible and has been dropped",
938 # drop the metric, so it's no longer emitted
939 del self
.metrics
[path
]
942 self
.metrics
[path
].set(v
)
944 # health check is not active, so give it a default of 0
945 self
.metrics
[path
].set(0)
947 self
.health_history
.check(health
)
948 for name
, info
in self
.health_history
.healthcheck
.items():
949 v
= 1 if info
.active
else 0
950 self
.metrics
['health_detail'].set(
957 def get_pool_stats(self
) -> None:
958 # retrieve pool stats to provide per pool recovery metrics
959 # (osd_pool_stats moved to mgr in Mimic)
960 pstats
= self
.get('osd_pool_stats')
961 for pool
in pstats
['pool_stats']:
962 for stat
in OSD_POOL_STATS
:
963 self
.metrics
['pool_{}'.format(stat
)].set(
964 pool
['recovery_rate'].get(stat
, 0),
969 def get_df(self
) -> None:
970 # maybe get the to-be-exported metrics from a config?
972 for stat
in DF_CLUSTER
:
973 self
.metrics
['cluster_{}'.format(stat
)].set(df
['stats'][stat
])
974 for device_class
in df
['stats_by_class']:
975 self
.metrics
['cluster_by_class_{}'.format(stat
)].set(
976 df
['stats_by_class'][device_class
][stat
], (device_class
,))
978 for pool
in df
['pools']:
980 self
.metrics
['pool_{}'.format(stat
)].set(
986 def get_osd_blocklisted_entries(self
) -> None:
987 r
= self
.mon_command({
988 'prefix': 'osd blocklist ls',
991 blocklist_entries
= r
[2].split(' ')
992 blocklist_count
= blocklist_entries
[1]
993 for stat
in OSD_BLOCKLIST
:
994 self
.metrics
['cluster_{}'.format(stat
)].set(int(blocklist_count
))
997 def get_fs(self
) -> None:
998 fs_map
= self
.get('fs_map')
999 servers
= self
.get_service_list()
1000 self
.log
.debug('standbys: {}'.format(fs_map
['standbys']))
1001 # export standby mds metadata, default standby fs_id is '-1'
1002 for standby
in fs_map
['standbys']:
1003 id_
= standby
['name']
1004 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
1005 addr
, rank
= standby
['addr'], standby
['rank']
1006 self
.metrics
['mds_metadata'].set(1, (
1007 'mds.{}'.format(id_
), '-1',
1013 for fs
in fs_map
['filesystems']:
1014 # collect fs metadata
1015 data_pools
= ",".join([str(pool
)
1016 for pool
in fs
['mdsmap']['data_pools']])
1017 self
.metrics
['fs_metadata'].set(1, (
1020 fs
['mdsmap']['metadata_pool'],
1021 fs
['mdsmap']['fs_name']
1023 self
.log
.debug('mdsmap: {}'.format(fs
['mdsmap']))
1024 for gid
, daemon
in fs
['mdsmap']['info'].items():
1025 id_
= daemon
['name']
1026 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
1027 self
.metrics
['mds_metadata'].set(1, (
1028 'mds.{}'.format(id_
), fs
['id'],
1029 host
, daemon
['addr'],
1030 daemon
['rank'], version
1034 def get_quorum_status(self
) -> None:
1035 mon_status
= json
.loads(self
.get('mon_status')['json'])
1036 servers
= self
.get_service_list()
1037 for mon
in mon_status
['monmap']['mons']:
1040 mon_version
= servers
.get((id_
, 'mon'), ('', '', ''))
1041 self
.metrics
['mon_metadata'].set(1, (
1042 'mon.{}'.format(id_
), mon_version
[0],
1043 mon
['public_addr'].rsplit(':', 1)[0], rank
,
1046 in_quorum
= int(rank
in mon_status
['quorum'])
1047 self
.metrics
['mon_quorum_status'].set(in_quorum
, (
1048 'mon.{}'.format(id_
),
1052 def get_mgr_status(self
) -> None:
1053 mgr_map
= self
.get('mgr_map')
1054 servers
= self
.get_service_list()
1056 active
= mgr_map
['active_name']
1057 standbys
= [s
.get('name') for s
in mgr_map
['standbys']]
1059 all_mgrs
= list(standbys
)
1060 all_mgrs
.append(active
)
1062 all_modules
= {module
.get('name'): module
.get('can_run')
1063 for module
in mgr_map
['available_modules']}
1065 for mgr
in all_mgrs
:
1066 host
, version
, _
= servers
.get((mgr
, 'mgr'), ('', '', ''))
1072 self
.metrics
['mgr_metadata'].set(1, (
1073 f
'mgr.{mgr}', host
, version
1075 self
.metrics
['mgr_status'].set(_state
, (
1077 always_on_modules
= mgr_map
['always_on_modules'].get(self
.release_name
, [])
1078 active_modules
= list(always_on_modules
)
1079 active_modules
.extend(mgr_map
['modules'])
1081 for mod_name
in all_modules
.keys():
1083 if mod_name
in always_on_modules
:
1085 elif mod_name
in active_modules
:
1090 _can_run
= 1 if all_modules
[mod_name
] else 0
1091 self
.metrics
['mgr_module_status'].set(_state
, (mod_name
,))
1092 self
.metrics
['mgr_module_can_run'].set(_can_run
, (mod_name
,))
1095 def get_pg_status(self
) -> None:
1097 pg_summary
= self
.get('pg_summary')
1099 for pool
in pg_summary
['by_pool']:
1100 num_by_state
: DefaultDict
[str, int] = defaultdict(int)
1101 for state
in PG_STATES
:
1102 num_by_state
[state
] = 0
1104 for state_name
, count
in pg_summary
['by_pool'][pool
].items():
1105 for state
in state_name
.split('+'):
1106 num_by_state
[state
] += count
1107 num_by_state
['total'] += count
1109 for state
, num
in num_by_state
.items():
1111 self
.metrics
["pg_{}".format(state
)].set(num
, (pool
,))
1113 self
.log
.warning("skipping pg in unknown state {}".format(state
))
1116 def get_osd_stats(self
) -> None:
1117 osd_stats
= self
.get('osd_stats')
1118 for osd
in osd_stats
['osd_stats']:
1120 for stat
in OSD_STATS
:
1121 val
= osd
['perf_stat'][stat
]
1122 self
.metrics
['osd_{}'.format(stat
)].set(val
, (
1123 'osd.{}'.format(id_
),
1126 def get_service_list(self
) -> Dict
[Tuple
[str, str], Tuple
[str, str, str]]:
1128 for server
in self
.list_servers():
1129 host
= cast(str, server
.get('hostname', ''))
1130 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1131 ret
.update({(service
['id'], service
['type']): (host
,
1132 service
.get('ceph_version', 'unknown'),
1133 service
.get('name', ''))})
1137 def get_metadata_and_osd_status(self
) -> None:
1138 osd_map
= self
.get('osd_map')
1139 osd_flags
= osd_map
['flags'].split(',')
1140 for flag
in OSD_FLAGS
:
1141 self
.metrics
['osd_flag_{}'.format(flag
)].set(
1142 int(flag
in osd_flags
)
1145 osd_devices
= self
.get('osd_map_crush')['devices']
1146 servers
= self
.get_service_list()
1147 for osd
in osd_map
['osds']:
1148 # id can be used to link osd metrics and metadata
1150 # collect osd metadata
1151 p_addr
= osd
['public_addr'].rsplit(':', 1)[0]
1152 c_addr
= osd
['cluster_addr'].rsplit(':', 1)[0]
1153 if p_addr
== "-" or c_addr
== "-":
1155 "Missing address metadata for osd {0}, skipping occupation"
1156 " and metadata records for this osd".format(id_
)
1161 for osd_device
in osd_devices
:
1162 if osd_device
['id'] == id_
:
1163 dev_class
= osd_device
.get('class', '')
1166 if dev_class
is None:
1167 self
.log
.info("OSD {0} is missing from CRUSH map, "
1168 "skipping output".format(id_
))
1171 osd_version
= servers
.get((str(id_
), 'osd'), ('', '', ''))
1173 # collect disk occupation metadata
1174 osd_metadata
= self
.get_metadata("osd", str(id_
))
1175 if osd_metadata
is None:
1178 obj_store
= osd_metadata
.get('osd_objectstore', '')
1179 f_iface
= osd_metadata
.get('front_iface', '')
1180 b_iface
= osd_metadata
.get('back_iface', '')
1182 self
.metrics
['osd_metadata'].set(1, (
1184 'osd.{}'.format(id_
),
1194 # collect osd status
1195 for state
in OSD_STATUS
:
1197 self
.metrics
['osd_{}'.format(state
)].set(status
, (
1198 'osd.{}'.format(id_
),
1202 osd_wal_dev_node
= ''
1203 osd_db_dev_node
= ''
1204 if obj_store
== "filestore":
1205 # collect filestore backend device
1206 osd_dev_node
= osd_metadata
.get(
1207 'backend_filestore_dev_node', None)
1208 # collect filestore journal device
1209 osd_wal_dev_node
= osd_metadata
.get('osd_journal', '')
1210 osd_db_dev_node
= ''
1211 elif obj_store
== "bluestore":
1212 # collect bluestore backend device
1213 osd_dev_node
= osd_metadata
.get(
1214 'bluestore_bdev_dev_node', None)
1215 # collect bluestore wal backend
1216 osd_wal_dev_node
= osd_metadata
.get('bluefs_wal_dev_node', '')
1217 # collect bluestore db backend
1218 osd_db_dev_node
= osd_metadata
.get('bluefs_db_dev_node', '')
1219 if osd_dev_node
and osd_dev_node
== "unknown":
1222 # fetch the devices and ids (vendor, model, serial) from the
1224 osd_devs
= osd_metadata
.get('devices', '') or 'N/A'
1225 osd_dev_ids
= osd_metadata
.get('device_ids', '') or 'N/A'
1227 osd_hostname
= osd_metadata
.get('hostname', None)
1228 if osd_dev_node
and osd_hostname
:
1229 self
.log
.debug("Got dev for osd {0}: {1}/{2}".format(
1230 id_
, osd_hostname
, osd_dev_node
))
1231 self
.metrics
['disk_occupation'].set(1, (
1232 "osd.{0}".format(id_
),
1241 self
.log
.info("Missing dev node metadata for osd {0}, skipping "
1242 "occupation record for this osd".format(id_
))
1244 if 'disk_occupation' in self
.metrics
:
1246 self
.metrics
['disk_occupation_human'] = \
1247 self
.metrics
['disk_occupation'].group_by(
1248 ['device', 'instance'],
1249 {'ceph_daemon': lambda daemons
: ', '.join(daemons
)},
1250 name
='disk_occupation_human',
1252 except Exception as e
:
1255 ec_profiles
= osd_map
.get('erasure_code_profiles', {})
1257 def _get_pool_info(pool
: Dict
[str, Any
]) -> Tuple
[str, str]:
1258 pool_type
= 'unknown'
1259 description
= 'unknown'
1261 if pool
['type'] == 1:
1262 pool_type
= "replicated"
1263 description
= f
"replica:{pool['size']}"
1264 elif pool
['type'] == 3:
1265 pool_type
= "erasure"
1266 name
= pool
.get('erasure_code_profile', '')
1267 profile
= ec_profiles
.get(name
, {})
1269 description
= f
"ec:{profile['k']}+{profile['m']}"
1271 description
= "ec:unknown"
1273 return pool_type
, description
1275 for pool
in osd_map
['pools']:
1277 compression_mode
= 'none'
1278 pool_type
, pool_description
= _get_pool_info(pool
)
1280 if 'options' in pool
:
1281 compression_mode
= pool
['options'].get('compression_mode', 'none')
1283 self
.metrics
['pool_metadata'].set(
1292 # Populate other servers metadata
1293 # If orchestrator is available and ceph-exporter is running modify rgw instance id
1294 # to match the one from exporter
1295 modify_instance_id
= self
.orch_is_available() and self
.get_module_option('exclude_perf_counters')
1296 if modify_instance_id
:
1297 daemons
= raise_if_exception(self
.list_daemons(daemon_type
='rgw'))
1298 for daemon
in daemons
:
1299 if daemon
.daemon_id
and '.' in daemon
.daemon_id
:
1300 instance_id
= daemon
.daemon_id
.split(".")[2]
1302 instance_id
= daemon
.daemon_id
if daemon
.daemon_id
else ""
1303 self
.metrics
['rgw_metadata'].set(1,
1304 (f
"{daemon.daemon_type}.{daemon.daemon_id}",
1305 str(daemon
.hostname
),
1306 str(daemon
.version
),
1308 for key
, value
in servers
.items():
1309 service_id
, service_type
= key
1310 if service_type
== 'rgw' and not modify_instance_id
:
1311 hostname
, version
, name
= value
1312 self
.metrics
['rgw_metadata'].set(
1314 ('{}.{}'.format(service_type
, name
),
1315 hostname
, version
, service_id
)
1317 elif service_type
== 'rbd-mirror':
1318 mirror_metadata
= self
.get_metadata('rbd-mirror', service_id
)
1319 if mirror_metadata
is None:
1321 mirror_metadata
['ceph_daemon'] = '{}.{}'.format(service_type
,
1323 rbd_mirror_metadata
= cast(LabelValues
,
1324 (mirror_metadata
.get(k
, '')
1325 for k
in RBD_MIRROR_METADATA
))
1326 self
.metrics
['rbd_mirror_metadata'].set(
1327 1, rbd_mirror_metadata
1331 def get_num_objects(self
) -> None:
1332 pg_sum
= self
.get('pg_summary')['pg_stats_sum']['stat_sum']
1333 for obj
in NUM_OBJECTS
:
1334 stat
= 'num_objects_{}'.format(obj
)
1335 self
.metrics
[stat
].set(pg_sum
[stat
])
1338 def get_rbd_stats(self
) -> None:
1339 # Per RBD image stats is collected by registering a dynamic osd perf
1340 # stats query that tells OSDs to group stats for requests associated
1341 # with RBD objects by pool, namespace, and image id, which are
1342 # extracted from the request object names or other attributes.
1343 # The RBD object names have the following prefixes:
1344 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
1345 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
1346 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
1347 # The pool_id in the object name is the id of the pool with the image
1348 # metdata, and should be used in the image spec. If there is no pool_id
1349 # in the object name, the image pool is the pool where the object is
1352 # Parse rbd_stats_pools option, which is a comma or space separated
1353 # list of pool[/namespace] entries. If no namespace is specifed the
1354 # stats are collected for every namespace in the pool. The wildcard
1355 # '*' can be used to indicate all pools or namespaces
1356 pools_string
= cast(str, self
.get_localized_module_option('rbd_stats_pools'))
1358 osd_map
= self
.get('osd_map')
1359 rbd_pools
= [pool
['pool_name'] for pool
in osd_map
['pools']
1360 if 'rbd' in pool
.get('application_metadata', {})]
1361 for x
in re
.split(r
'[\s,]+', pools_string
):
1367 namespace_name
= None
1369 namespace_name
= s
[1]
1371 if pool_name
== "*":
1372 # collect for all pools
1373 for pool
in rbd_pools
:
1374 pool_keys
.add((pool
, namespace_name
))
1376 if pool_name
in rbd_pools
:
1377 pool_keys
.add((pool_name
, namespace_name
)) # avoids adding deleted pool
1379 pools
= {} # type: Dict[str, Set[str]]
1380 for pool_key
in pool_keys
:
1381 pool_name
= pool_key
[0]
1382 namespace_name
= pool_key
[1]
1383 if not namespace_name
or namespace_name
== "*":
1384 # empty set means collect for all namespaces
1385 pools
[pool_name
] = set()
1388 if pool_name
not in pools
:
1389 pools
[pool_name
] = set()
1390 elif not pools
[pool_name
]:
1392 pools
[pool_name
].add(namespace_name
)
1394 rbd_stats_pools
= {}
1395 for pool_id
in self
.rbd_stats
['pools'].keys():
1396 name
= self
.rbd_stats
['pools'][pool_id
]['name']
1397 if name
not in pools
:
1398 del self
.rbd_stats
['pools'][pool_id
]
1400 rbd_stats_pools
[name
] = \
1401 self
.rbd_stats
['pools'][pool_id
]['ns_names']
1403 pools_refreshed
= False
1405 next_refresh
= self
.rbd_stats
['pools_refresh_time'] + \
1406 self
.get_localized_module_option(
1407 'rbd_stats_pools_refresh_interval', 300)
1408 if rbd_stats_pools
!= pools
or time
.time() >= next_refresh
:
1409 self
.refresh_rbd_stats_pools(pools
)
1410 pools_refreshed
= True
1412 pool_ids
= list(self
.rbd_stats
['pools'])
1414 pool_id_regex
= '^(' + '|'.join([str(x
) for x
in pool_ids
]) + ')$'
1417 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1418 if pool
['ns_names']:
1419 nspace_names
.extend(pool
['ns_names'])
1424 namespace_regex
= '^(' + \
1425 "|".join([re
.escape(x
)
1426 for x
in set(nspace_names
)]) + ')$'
1428 namespace_regex
= '^(.*)$'
1430 if ('query' in self
.rbd_stats
1431 and (pool_id_regex
!= self
.rbd_stats
['query']['key_descriptor'][0]['regex']
1432 or namespace_regex
!= self
.rbd_stats
['query']['key_descriptor'][1]['regex'])):
1433 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1434 del self
.rbd_stats
['query_id']
1435 del self
.rbd_stats
['query']
1437 if not self
.rbd_stats
['pools']:
1440 counters_info
= self
.rbd_stats
['counters_info']
1442 if 'query_id' not in self
.rbd_stats
:
1445 {'type': 'pool_id', 'regex': pool_id_regex
},
1446 {'type': 'namespace', 'regex': namespace_regex
},
1447 {'type': 'object_name',
1448 'regex': r
'^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
1450 'performance_counter_descriptors': list(counters_info
),
1452 query_id
= self
.add_osd_perf_query(query
)
1453 if query_id
is None:
1454 self
.log
.error('failed to add query %s' % query
)
1456 self
.rbd_stats
['query'] = query
1457 self
.rbd_stats
['query_id'] = query_id
1459 res
= self
.get_osd_perf_counters(self
.rbd_stats
['query_id'])
1461 for c
in res
['counters']:
1462 # if the pool id is not found in the object name use id of the
1463 # pool where the object is located
1465 pool_id
= int(c
['k'][2][0])
1467 pool_id
= int(c
['k'][0][0])
1468 if pool_id
not in self
.rbd_stats
['pools'] and not pools_refreshed
:
1469 self
.refresh_rbd_stats_pools(pools
)
1470 pools_refreshed
= True
1471 if pool_id
not in self
.rbd_stats
['pools']:
1473 pool
= self
.rbd_stats
['pools'][pool_id
]
1474 nspace_name
= c
['k'][1][0]
1475 if nspace_name
not in pool
['images']:
1477 image_id
= c
['k'][2][1]
1478 if image_id
not in pool
['images'][nspace_name
] and \
1479 not pools_refreshed
:
1480 self
.refresh_rbd_stats_pools(pools
)
1481 pool
= self
.rbd_stats
['pools'][pool_id
]
1482 pools_refreshed
= True
1483 if image_id
not in pool
['images'][nspace_name
]:
1485 counters
= pool
['images'][nspace_name
][image_id
]['c']
1486 for i
in range(len(c
['c'])):
1487 counters
[i
][0] += c
['c'][i
][0]
1488 counters
[i
][1] += c
['c'][i
][1]
1490 label_names
= ("pool", "namespace", "image")
1491 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1492 pool_name
= pool
['name']
1493 for nspace_name
, images
in pool
['images'].items():
1494 for image_id
in images
:
1495 image_name
= images
[image_id
]['n']
1496 counters
= images
[image_id
]['c']
1498 for key
in counters_info
:
1499 counter_info
= counters_info
[key
]
1500 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1501 labels
= (pool_name
, nspace_name
, image_name
)
1502 if counter_info
['type'] == self
.PERFCOUNTER_COUNTER
:
1504 if path
not in self
.metrics
:
1505 self
.metrics
[path
] = Metric(
1508 counter_info
['desc'],
1511 self
.metrics
[path
].set(counters
[i
][0], labels
)
1512 elif counter_info
['type'] == self
.PERFCOUNTER_LONGRUNAVG
:
1513 path
= 'rbd_' + key
+ '_sum'
1514 if path
not in self
.metrics
:
1515 self
.metrics
[path
] = Metric(
1518 counter_info
['desc'] + ' Total',
1521 self
.metrics
[path
].set(counters
[i
][0], labels
)
1522 path
= 'rbd_' + key
+ '_count'
1523 if path
not in self
.metrics
:
1524 self
.metrics
[path
] = Metric(
1527 counter_info
['desc'] + ' Count',
1530 self
.metrics
[path
].set(counters
[i
][1], labels
)
1533 def refresh_rbd_stats_pools(self
, pools
: Dict
[str, Set
[str]]) -> None:
1534 self
.log
.debug('refreshing rbd pools %s' % (pools
))
1537 counters_info
= self
.rbd_stats
['counters_info']
1538 for pool_name
, cfg_ns_names
in pools
.items():
1540 pool_id
= self
.rados
.pool_lookup(pool_name
)
1541 with self
.rados
.open_ioctx(pool_name
) as ioctx
:
1542 if pool_id
not in self
.rbd_stats
['pools']:
1543 self
.rbd_stats
['pools'][pool_id
] = {'images': {}}
1544 pool
= self
.rbd_stats
['pools'][pool_id
]
1545 pool
['name'] = pool_name
1546 pool
['ns_names'] = cfg_ns_names
1548 nspace_names
= list(cfg_ns_names
)
1550 nspace_names
= [''] + rbd
.namespace_list(ioctx
)
1551 for nspace_name
in pool
['images']:
1552 if nspace_name
not in nspace_names
:
1553 del pool
['images'][nspace_name
]
1554 for nspace_name
in nspace_names
:
1556 not rbd
.namespace_exists(ioctx
, nspace_name
):
1557 self
.log
.debug('unknown namespace %s for pool %s' %
1558 (nspace_name
, pool_name
))
1560 ioctx
.set_namespace(nspace_name
)
1561 if nspace_name
not in pool
['images']:
1562 pool
['images'][nspace_name
] = {}
1563 namespace
= pool
['images'][nspace_name
]
1565 for image_meta
in RBD().list2(ioctx
):
1566 image
= {'n': image_meta
['name']}
1567 image_id
= image_meta
['id']
1568 if image_id
in namespace
:
1569 image
['c'] = namespace
[image_id
]['c']
1571 image
['c'] = [[0, 0] for x
in counters_info
]
1572 images
[image_id
] = image
1573 pool
['images'][nspace_name
] = images
1574 except Exception as e
:
1575 self
.log
.error('failed listing pool %s: %s' % (pool_name
, e
))
1576 self
.rbd_stats
['pools_refresh_time'] = time
.time()
1578 def shutdown_rbd_stats(self
) -> None:
1579 if 'query_id' in self
.rbd_stats
:
1580 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1581 del self
.rbd_stats
['query_id']
1582 del self
.rbd_stats
['query']
1583 self
.rbd_stats
['pools'].clear()
1585 def add_fixed_name_metrics(self
) -> None:
1587 Add fixed name metrics from existing ones that have details in their names
1588 that should be in labels (not in name).
1589 For backward compatibility, a new fixed name metric is created (instead of replacing)
1590 and details are put in new labels.
1591 Intended for RGW sync perf. counters but extendable as required.
1592 See: https://tracker.ceph.com/issues/45311
1595 for metric_path
, metrics
in self
.metrics
.items():
1596 # Address RGW sync perf. counters.
1597 match
= re
.search(r
'^data-sync-from-(.*)\.', metric_path
)
1599 new_path
= re
.sub('from-([^.]*)', 'from-zone', metric_path
)
1600 if new_path
not in new_metrics
:
1601 new_metrics
[new_path
] = Metric(
1605 cast(LabelValues
, metrics
.labelnames
) + ('source_zone',)
1607 for label_values
, value
in metrics
.value
.items():
1608 new_metrics
[new_path
].set(value
, label_values
+ (match
.group(1),))
1610 self
.metrics
.update(new_metrics
)
1612 def get_collect_time_metrics(self
) -> None:
1613 sum_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_sum')
1614 count_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_count')
1615 if sum_metric
is None:
1616 sum_metric
= MetricCounter(
1617 'prometheus_collect_duration_seconds_sum',
1618 'The sum of seconds took to collect all metrics of this exporter',
1620 self
.metrics
['prometheus_collect_duration_seconds_sum'] = sum_metric
1621 if count_metric
is None:
1622 count_metric
= MetricCounter(
1623 'prometheus_collect_duration_seconds_count',
1624 'The amount of metrics gathered for this exporter',
1626 self
.metrics
['prometheus_collect_duration_seconds_count'] = count_metric
1628 # Collect all timing data and make it available as metric, excluding the
1629 # `collect` method because it has not finished at this point and hence
1630 # there's no `_execution_duration` attribute to be found. The
1631 # `_execution_duration` attribute is added by the `profile_method`
1633 for method_name
, method
in Module
.__dict
__.items():
1634 duration
= getattr(method
, '_execution_duration', None)
1635 if duration
is not None:
1636 cast(MetricCounter
, sum_metric
).add(duration
, (method_name
,))
1637 cast(MetricCounter
, count_metric
).add(1, (method_name
,))
1639 def get_pool_repaired_objects(self
) -> None:
1640 dump
= self
.get('pg_dump')
1641 for stats
in dump
['pool_stats']:
1642 path
= 'pool_objects_repaired'
1643 self
.metrics
[path
].set(stats
['stat_sum']['num_objects_repaired'],
1644 labelvalues
=(stats
['poolid'],))
1646 def get_all_daemon_health_metrics(self
) -> None:
1647 daemon_metrics
= self
.get_daemon_health_metrics()
1648 self
.log
.debug('metrics jeje %s' % (daemon_metrics
))
1649 for daemon_name
, health_metrics
in daemon_metrics
.items():
1650 for health_metric
in health_metrics
:
1651 path
= 'daemon_health_metrics'
1652 self
.metrics
[path
].set(health_metric
['value'], labelvalues
=(
1653 health_metric
['type'], daemon_name
,))
1655 def get_perf_counters(self
) -> None:
1657 Get the perf counters for all daemons
1659 for daemon
, counters
in self
.get_unlabeled_perf_counters().items():
1660 for path
, counter_info
in counters
.items():
1661 # Skip histograms, they are represented by long running avgs
1662 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1663 if not stattype
or stattype
== 'histogram':
1664 self
.log
.debug('ignoring %s, type %s' % (path
, stattype
))
1667 path
, label_names
, labels
= self
._perfpath
_to
_path
_labels
(
1670 # Get the value of the counter
1671 value
= self
._perfvalue
_to
_value
(
1672 counter_info
['type'], counter_info
['value'])
1674 # Represent the long running avgs as sum/count pairs
1675 if counter_info
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1676 _path
= path
+ '_sum'
1677 if _path
not in self
.metrics
:
1678 self
.metrics
[_path
] = Metric(
1681 counter_info
['description'] + ' Total',
1684 self
.metrics
[_path
].set(value
, labels
)
1685 _path
= path
+ '_count'
1686 if _path
not in self
.metrics
:
1687 self
.metrics
[_path
] = Metric(
1690 counter_info
['description'] + ' Count',
1693 self
.metrics
[_path
].set(counter_info
['count'], labels
,)
1695 if path
not in self
.metrics
:
1696 self
.metrics
[path
] = Metric(
1699 counter_info
['description'],
1702 self
.metrics
[path
].set(value
, labels
)
1703 self
.add_fixed_name_metrics()
1705 @profile_method(True)
1706 def collect(self
) -> str:
1707 # Clear the metrics before scraping
1708 for k
in self
.metrics
.keys():
1709 self
.metrics
[k
].clear()
1713 self
.get_osd_blocklisted_entries()
1714 self
.get_pool_stats()
1716 self
.get_osd_stats()
1717 self
.get_quorum_status()
1718 self
.get_mgr_status()
1719 self
.get_metadata_and_osd_status()
1720 self
.get_pg_status()
1721 self
.get_pool_repaired_objects()
1722 self
.get_num_objects()
1723 self
.get_all_daemon_health_metrics()
1725 if not self
.get_module_option('exclude_perf_counters'):
1726 self
.get_perf_counters()
1727 self
.get_rbd_stats()
1729 self
.get_collect_time_metrics()
1731 # Return formatted metrics and clear no longer used data
1732 _metrics
= [m
.str_expfmt() for m
in self
.metrics
.values()]
1733 for k
in self
.metrics
.keys():
1734 self
.metrics
[k
].clear()
1736 return ''.join(_metrics
) + '\n'
1738 @CLIReadCommand('prometheus file_sd_config')
1739 def get_file_sd_config(self
) -> Tuple
[int, str, str]:
1741 Return file_sd compatible prometheus config for mgr cluster
1743 servers
= self
.list_servers()
1745 for server
in servers
:
1746 hostname
= server
.get('hostname', '')
1747 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1748 if service
['type'] != 'mgr':
1751 port
= self
._get
_module
_option
('server_port', DEFAULT_PORT
, id_
)
1752 targets
.append(f
'{hostname}:{port}')
1759 return 0, json
.dumps(ret
), ""
1761 def self_test(self
) -> None:
1763 self
.get_file_sd_config()
1765 def configure(self
, server_addr
: str, server_port
: int) -> None:
1766 # cephadm deployments have a TLS monitoring stack setup option.
1767 # If the cephadm module is on and the setting is true (defaults to false)
1768 # we should have prometheus be set up to interact with that
1769 cephadm_secure_monitoring_stack
= self
.get_module_option_ex(
1770 'cephadm', 'secure_monitoring_stack', False)
1771 if cephadm_secure_monitoring_stack
:
1773 self
.setup_cephadm_tls_config(server_addr
, server_port
)
1775 except Exception as e
:
1776 self
.log
.exception(f
'Failed to setup cephadm based secure monitoring stack: {e}\n',
1777 'Falling back to default configuration')
1778 self
.setup_default_config(server_addr
, server_port
)
1780 def setup_default_config(self
, server_addr
: str, server_port
: int) -> None:
1781 cherrypy
.config
.update({
1782 'server.socket_host': server_addr
,
1783 'server.socket_port': server_port
,
1784 'engine.autoreload.on': False,
1785 'server.ssl_module': None,
1786 'server.ssl_certificate': None,
1787 'server.ssl_private_key': None,
1789 # Publish the URI that others may use to access the service we're about to start serving
1790 self
.set_uri(build_url(scheme
='http', host
=self
.get_server_addr(),
1791 port
=server_port
, path
='/'))
1793 def setup_cephadm_tls_config(self
, server_addr
: str, server_port
: int) -> None:
1794 from cephadm
.ssl_cert_utils
import SSLCerts
1795 # the ssl certs utils uses a NamedTemporaryFile for the cert files
1796 # generated with generate_cert_files function. We need the SSLCerts
1797 # object to not be cleaned up in order to have those temp files not
1798 # be cleaned up, so making it an attribute of the module instead
1799 # of just a standalone object
1800 self
.cephadm_monitoring_tls_ssl_certs
= SSLCerts()
1801 host
= self
.get_mgr_ip()
1803 old_cert
= self
.get_store('root/cert')
1804 old_key
= self
.get_store('root/key')
1805 if not old_cert
or not old_key
:
1806 raise Exception('No old credentials for mgr-prometheus endpoint')
1807 self
.cephadm_monitoring_tls_ssl_certs
.load_root_credentials(old_cert
, old_key
)
1809 self
.cephadm_monitoring_tls_ssl_certs
.generate_root_cert(host
)
1810 self
.set_store('root/cert', self
.cephadm_monitoring_tls_ssl_certs
.get_root_cert())
1811 self
.set_store('root/key', self
.cephadm_monitoring_tls_ssl_certs
.get_root_key())
1813 cert_file_path
, key_file_path
= self
.cephadm_monitoring_tls_ssl_certs
.generate_cert_files(
1814 self
.get_hostname(), host
)
1816 cherrypy
.config
.update({
1817 'server.socket_host': server_addr
,
1818 'server.socket_port': server_port
,
1819 'engine.autoreload.on': False,
1820 'server.ssl_module': 'builtin',
1821 'server.ssl_certificate': cert_file_path
,
1822 'server.ssl_private_key': key_file_path
,
1824 # Publish the URI that others may use to access the service we're about to start serving
1825 self
.set_uri(build_url(scheme
='https', host
=self
.get_server_addr(),
1826 port
=server_port
, path
='/'))
1828 def serve(self
) -> None:
1832 # collapse everything to '/'
1833 def _cp_dispatch(self
, vpath
: str) -> 'Root':
1834 cherrypy
.request
.path
= ''
1838 def index(self
) -> str:
1839 return '''<!DOCTYPE html>
1841 <head><title>Ceph Exporter</title></head>
1843 <h1>Ceph Exporter</h1>
1844 <p><a href='/metrics'>Metrics</a></p>
1849 def metrics(self
) -> Optional
[str]:
1850 # Lock the function execution
1851 assert isinstance(_global_instance
, Module
)
1852 with _global_instance
.collect_lock
:
1853 return self
._metrics
(_global_instance
)
1856 def _metrics(instance
: 'Module') -> Optional
[str]:
1858 self
.log
.debug('Cache disabled, collecting and returning without cache')
1859 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1860 return self
.collect()
1862 # Return cached data if available
1863 if not instance
.collect_cache
:
1864 raise cherrypy
.HTTPError(503, 'No cached data available yet')
1866 def respond() -> Optional
[str]:
1867 assert isinstance(instance
, Module
)
1868 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1869 return instance
.collect_cache
1871 if instance
.collect_time
< instance
.scrape_interval
:
1872 # Respond if cache isn't stale
1875 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_RETURN
:
1876 # Respond even if cache is stale
1878 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1879 'returning metrics from stale cache.'.format(
1880 instance
.collect_time
,
1881 instance
.collect_time
- instance
.scrape_interval
1886 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_FAIL
:
1887 # Fail if cache is stale
1889 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1890 'returning "service unavailable".'.format(
1891 instance
.collect_time
,
1892 instance
.collect_time
- instance
.scrape_interval
,
1895 instance
.log
.error(msg
)
1896 raise cherrypy
.HTTPError(503, msg
)
1899 # Make the cache timeout for collecting configurable
1900 self
.scrape_interval
= cast(float, self
.get_localized_module_option('scrape_interval'))
1902 self
.stale_cache_strategy
= cast(
1903 str, self
.get_localized_module_option('stale_cache_strategy'))
1904 if self
.stale_cache_strategy
not in [self
.STALE_CACHE_FAIL
,
1905 self
.STALE_CACHE_RETURN
]:
1906 self
.stale_cache_strategy
= self
.STALE_CACHE_FAIL
1908 server_addr
= cast(str, self
.get_localized_module_option('server_addr', get_default_addr()))
1909 server_port
= cast(int, self
.get_localized_module_option('server_port', DEFAULT_PORT
))
1911 "server_addr: %s server_port: %s" %
1912 (server_addr
, server_port
)
1915 self
.cache
= cast(bool, self
.get_localized_module_option('cache', True))
1917 self
.log
.info('Cache enabled')
1918 self
.metrics_thread
.start()
1920 self
.log
.info('Cache disabled')
1922 self
.configure(server_addr
, server_port
)
1924 cherrypy
.tree
.mount(Root(), "/")
1925 self
.log
.info('Starting engine...')
1926 cherrypy
.engine
.start()
1927 self
.log
.info('Engine started.')
1929 # wait for the shutdown event
1930 self
.shutdown_event
.wait()
1931 self
.shutdown_event
.clear()
1932 # tell metrics collection thread to stop collecting new metrics
1933 self
.metrics_thread
.stop()
1934 cherrypy
.engine
.stop()
1935 cherrypy
.server
.httpserver
= None
1936 self
.log
.info('Engine stopped.')
1937 self
.shutdown_rbd_stats()
1938 # wait for the metrics collection thread to stop
1939 self
.metrics_thread
.join()
1941 def shutdown(self
) -> None:
1942 self
.log
.info('Stopping engine...')
1943 self
.shutdown_event
.set()
1945 @CLIReadCommand('healthcheck history ls')
1946 def _list_healthchecks(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1947 """List all the healthchecks being tracked
1949 The format options are parsed in ceph_argparse, before they get evaluated here so
1950 we can safely assume that what we have to process is valid. ceph_argparse will throw
1951 a ValueError if the cast to our Format class fails.
1954 format (Format, optional): output format. Defaults to Format.plain.
1957 HandleCommandResult: return code, stdout and stderr returned to the caller
1961 if format
== Format
.plain
:
1962 out
= str(self
.health_history
)
1963 elif format
== Format
.yaml
:
1964 out
= self
.health_history
.as_yaml()
1966 out
= self
.health_history
.as_json(format
== Format
.json_pretty
)
1968 return HandleCommandResult(retval
=0, stdout
=out
)
1970 @CLIWriteCommand('healthcheck history clear')
1971 def _clear_healthchecks(self
) -> HandleCommandResult
:
1972 """Clear the healthcheck history"""
1973 self
.health_history
.reset()
1974 return HandleCommandResult(retval
=0, stdout
="healthcheck history cleared")
1977 class StandbyModule(MgrStandbyModule
):
1979 MODULE_OPTIONS
= Module
.MODULE_OPTIONS
1981 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
1982 super(StandbyModule
, self
).__init
__(*args
, **kwargs
)
1983 self
.shutdown_event
= threading
.Event()
1985 def serve(self
) -> None:
1986 server_addr
= self
.get_localized_module_option(
1987 'server_addr', get_default_addr())
1988 server_port
= self
.get_localized_module_option(
1989 'server_port', DEFAULT_PORT
)
1990 self
.log
.info("server_addr: %s server_port: %s" %
1991 (server_addr
, server_port
))
1992 cherrypy
.config
.update({
1993 'server.socket_host': server_addr
,
1994 'server.socket_port': server_port
,
1995 'engine.autoreload.on': False,
1996 'request.show_tracebacks': False
2003 def index(self
) -> str:
2004 standby_behaviour
= module
.get_module_option('standby_behaviour')
2005 if standby_behaviour
== 'default':
2006 active_uri
= module
.get_active_uri()
2007 return '''<!DOCTYPE html>
2009 <head><title>Ceph Exporter</title></head>
2011 <h1>Ceph Exporter</h1>
2012 <p><a href='{}metrics'>Metrics</a></p>
2014 </html>'''.format(active_uri
)
2016 status
= module
.get_module_option('standby_error_status_code')
2017 raise cherrypy
.HTTPError(status
, message
="Keep on looking")
2020 def metrics(self
) -> str:
2021 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
2024 cherrypy
.tree
.mount(Root(), '/', {})
2025 self
.log
.info('Starting engine...')
2026 cherrypy
.engine
.start()
2027 self
.log
.info('Engine started.')
2028 # Wait for shutdown event
2029 self
.shutdown_event
.wait()
2030 self
.shutdown_event
.clear()
2031 cherrypy
.engine
.stop()
2032 cherrypy
.server
.httpserver
= None
2033 self
.log
.info('Engine stopped.')
2035 def shutdown(self
) -> None:
2036 self
.log
.info("Stopping engine...")
2037 self
.shutdown_event
.set()
2038 self
.log
.info("Stopped engine")