]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
2 from collections
import defaultdict
3 from distutils
.version
import StrictVersion
11 from mgr_module
import CLIReadCommand
, MgrModule
, MgrStandbyModule
, PG_STATES
, Option
, ServiceInfoT
, HandleCommandResult
, CLIWriteCommand
12 from mgr_util
import get_default_addr
, profile_method
, build_url
14 from collections
import namedtuple
17 from typing
import DefaultDict
, Optional
, Dict
, Any
, Set
, cast
, Tuple
, Union
, List
, Callable
19 LabelValues
= Tuple
[str, ...]
20 Number
= Union
[int, float]
21 MetricValue
= Dict
[LabelValues
, Number
]
23 # Defaults for the Prometheus HTTP server. Can also set in config-key
24 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
25 # for Prometheus exporter port registry
29 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
30 # that the ports its listening on are in fact bound. When using the any address
31 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
32 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
34 if cherrypy
is not None:
35 v
= StrictVersion(cherrypy
.__version
__)
36 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
37 # centos:7) and back to at least 3.0.0.
38 if StrictVersion("3.1.2") <= v
< StrictVersion("3.2.3"):
39 # https://github.com/cherrypy/cherrypy/issues/1100
40 from cherrypy
.process
import servers
41 servers
.wait_for_occupied_port
= lambda host
, port
: None
44 # cherrypy likes to sys.exit on error. don't let it take us down too!
45 def os_exit_noop(status
: int) -> None:
49 os
._exit
= os_exit_noop
# type: ignore
51 # to access things in class Module from subclass Root. Because
52 # it's a dict, the writer doesn't need to declare 'global' for access
54 _global_instance
= None # type: Optional[Module]
55 cherrypy
.config
.update({
56 'response.headers.server': 'Ceph-Prometheus'
60 def health_status_to_number(status
: str) -> int:
61 if status
== 'HEALTH_OK':
63 elif status
== 'HEALTH_WARN':
65 elif status
== 'HEALTH_ERR':
67 raise ValueError(f
'unknown status "{status}"')
70 DF_CLUSTER
= ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
72 DF_POOL
= ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
73 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
74 'compress_bytes_used', 'compress_under_bytes', 'bytes_used', 'percent_used']
76 OSD_POOL_STATS
= ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
77 'recovering_keys_per_sec', 'num_objects_recovered',
78 'num_bytes_recovered', 'num_bytes_recovered')
80 OSD_FLAGS
= ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
81 'norecover', 'noscrub', 'nodeep-scrub')
83 FS_METADATA
= ('data_pools', 'fs_id', 'metadata_pool', 'name')
85 MDS_METADATA
= ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
88 MON_METADATA
= ('ceph_daemon', 'hostname',
89 'public_addr', 'rank', 'ceph_version')
91 MGR_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version')
93 MGR_STATUS
= ('ceph_daemon',)
95 MGR_MODULE_STATUS
= ('name',)
97 MGR_MODULE_CAN_RUN
= ('name',)
99 OSD_METADATA
= ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
100 'front_iface', 'hostname', 'objectstore', 'public_addr',
103 OSD_STATUS
= ['weight', 'up', 'in']
105 OSD_STATS
= ['apply_latency_ms', 'commit_latency_ms']
107 POOL_METADATA
= ('pool_id', 'name', 'type', 'description', 'compression_mode')
109 RGW_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version', 'instance_id')
111 RBD_MIRROR_METADATA
= ('ceph_daemon', 'id', 'instance_id', 'hostname',
114 DISK_OCCUPATION
= ('ceph_daemon', 'device', 'db_device',
115 'wal_device', 'instance', 'devices', 'device_ids')
117 NUM_OBJECTS
= ['degraded', 'misplaced', 'unfound']
119 alert_metric
= namedtuple('alert_metric', 'name description')
121 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process'),
124 HEALTHCHECK_DETAIL
= ('name', 'severity')
127 class Severity(enum
.Enum
):
133 class Format(enum
.Enum
):
136 json_pretty
= 'json-pretty'
140 class HealthCheckEvent
:
142 def __init__(self
, name
: str, severity
: Severity
, first_seen
: float, last_seen
: float, count
: int, active
: bool = True):
144 self
.severity
= severity
145 self
.first_seen
= first_seen
146 self
.last_seen
= last_seen
150 def as_dict(self
) -> Dict
[str, Any
]:
151 """Return the instance as a dictionary."""
156 kv_name
= 'health_history'
157 titles
= "{healthcheck_name:<24} {first_seen:<20} {last_seen:<20} {count:>5} {active:^6}"
158 date_format
= "%Y/%m/%d %H:%M:%S"
160 def __init__(self
, mgr
: MgrModule
):
162 self
.lock
= threading
.Lock()
163 self
.healthcheck
: Dict
[str, HealthCheckEvent
] = {}
166 def _load(self
) -> None:
167 """Load the current state from the mons KV store."""
168 data
= self
.mgr
.get_store(self
.kv_name
)
171 healthcheck_data
= json
.loads(data
)
172 except json
.JSONDecodeError
:
174 f
"INVALID data read from mgr/prometheus/{self.kv_name}. Resetting")
178 for k
, v
in healthcheck_data
.items():
179 self
.healthcheck
[k
] = HealthCheckEvent(
181 severity
=v
.get('severity'),
182 first_seen
=v
.get('first_seen', 0),
183 last_seen
=v
.get('last_seen', 0),
184 count
=v
.get('count', 1),
185 active
=v
.get('active', True))
189 def reset(self
) -> None:
190 """Reset the healthcheck history."""
192 self
.mgr
.set_store(self
.kv_name
, "{}")
193 self
.healthcheck
= {}
195 def save(self
) -> None:
196 """Save the current in-memory healthcheck history to the KV store."""
198 self
.mgr
.set_store(self
.kv_name
, self
.as_json())
200 def check(self
, health_checks
: Dict
[str, Any
]) -> None:
201 """Look at the current health checks and compare existing the history.
204 health_checks (Dict[str, Any]): current health check data
207 current_checks
= health_checks
.get('checks', {})
210 # first turn off any active states we're tracking
211 for seen_check
in self
.healthcheck
:
212 check
= self
.healthcheck
[seen_check
]
213 if check
.active
and seen_check
not in current_checks
:
217 # now look for any additions to track
219 for name
, info
in current_checks
.items():
220 if name
not in self
.healthcheck
:
221 # this healthcheck is new, so start tracking it
223 self
.healthcheck
[name
] = HealthCheckEvent(
225 severity
=info
.get('severity'),
232 # seen it before, so update its metadata
233 check
= self
.healthcheck
[name
]
235 # check has been registered as active already, so skip
238 check
.last_seen
= now
246 def __str__(self
) -> str:
247 """Print the healthcheck history.
250 str: Human readable representation of the healthcheck history
254 if len(self
.healthcheck
.keys()) == 0:
255 out
.append("No healthchecks have been recorded")
257 out
.append(self
.titles
.format(
258 healthcheck_name
="Healthcheck Name",
259 first_seen
="First Seen (UTC)",
260 last_seen
="Last seen (UTC)",
264 for k
in sorted(self
.healthcheck
.keys()):
265 check
= self
.healthcheck
[k
]
266 out
.append(self
.titles
.format(
267 healthcheck_name
=check
.name
,
268 first_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.first_seen
)),
269 last_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.last_seen
)),
271 active
="Yes" if check
.active
else "No")
273 out
.extend([f
"{len(self.healthcheck)} health check(s) listed", ""])
275 return "\n".join(out
)
277 def as_dict(self
) -> Dict
[str, Any
]:
278 """Return the history in a dictionary.
281 Dict[str, Any]: dictionary indexed by the healthcheck name
283 return {name
: self
.healthcheck
[name
].as_dict() for name
in self
.healthcheck
}
285 def as_json(self
, pretty
: bool = False) -> str:
286 """Return the healthcheck history object as a dict (JSON).
289 pretty (bool, optional): whether to json pretty print the history. Defaults to False.
292 str: str representation of the healthcheck in JSON format
295 return json
.dumps(self
.as_dict(), indent
=2)
297 return json
.dumps(self
.as_dict())
299 def as_yaml(self
) -> str:
300 """Return the healthcheck history in yaml format.
303 str: YAML representation of the healthcheck history
305 return yaml
.safe_dump(self
.as_dict(), explicit_start
=True, default_flow_style
=False)
308 class Metric(object):
309 def __init__(self
, mtype
: str, name
: str, desc
: str, labels
: Optional
[LabelValues
] = None) -> None:
313 self
.labelnames
= labels
# tuple if present
314 self
.value
: Dict
[LabelValues
, Number
] = {}
316 def clear(self
) -> None:
319 def set(self
, value
: Number
, labelvalues
: Optional
[LabelValues
] = None) -> None:
320 # labelvalues must be a tuple
321 labelvalues
= labelvalues
or ('',)
322 self
.value
[labelvalues
] = value
324 def str_expfmt(self
) -> str:
326 def promethize(path
: str) -> str:
327 ''' replace illegal metric name characters '''
328 result
= re
.sub(r
'[./\s]|::', '_', path
).replace('+', '_plus')
330 # Hyphens usually turn into underscores, unless they are
332 if result
.endswith("-"):
333 result
= result
[0:-1] + "_minus"
335 result
= result
.replace("-", "_")
337 return "ceph_{0}".format(result
)
339 def floatstr(value
: float) -> str:
340 ''' represent as Go-compatible float '''
341 if value
== float('inf'):
343 if value
== float('-inf'):
345 if math
.isnan(value
):
347 return repr(float(value
))
349 name
= promethize(self
.name
)
352 # TYPE {name} {mtype}'''.format(
358 for labelvalues
, value
in self
.value
.items():
360 labels_list
= zip(self
.labelnames
, labelvalues
)
361 labels
= ','.join('%s="%s"' % (k
, v
) for k
, v
in labels_list
)
365 fmtstr
= '\n{name}{{{labels}}} {value}'
367 fmtstr
= '\n{name} {value}'
368 expfmt
+= fmtstr
.format(
371 value
=floatstr(value
),
378 joins
: Dict
[str, Callable
[[List
[str]], str]],
379 name
: Optional
[str] = None,
382 Groups data by label names.
384 Label names not passed are being removed from the resulting metric but
385 by providing a join function, labels of metrics can be grouped.
387 The purpose of this method is to provide a version of a metric that can
388 be used in matching where otherwise multiple results would be returned.
390 As grouping is possible in Prometheus, the only additional value of this
391 method is the possibility to join labels when grouping. For that reason,
392 passing joins is required. Please use PromQL expressions in all other
395 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
400 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
403 The functionality of group by could roughly be compared with Prometheus'
405 group (ceph_disk_occupation) by (device, instance)
407 with the exception that not all labels which aren't used as a condition
408 to group a metric are discarded, but their values can are joined and the
409 label is thereby preserved.
411 This function takes the value of the first entry of a found group to be
412 used for the resulting value of the grouping operation.
414 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
416 ... ('foo', 'x'): 555,
417 ... ('foo', 'y'): 10,
419 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
420 {('foo', 'x,y'): 555}
422 assert self
.labelnames
, "cannot match keys without label names"
424 assert key
in self
.labelnames
, "unknown key: {}".format(key
)
425 assert joins
, "joins must not be empty"
426 assert all(callable(c
) for c
in joins
.values()), "joins must be callable"
429 grouped
: Dict
[LabelValues
, List
[Tuple
[Dict
[str, str], Number
]]] = defaultdict(list)
430 for label_values
, metric_value
in self
.value
.items():
431 labels
= dict(zip(self
.labelnames
, label_values
))
432 if not all(k
in labels
for k
in keys
):
434 group_key
= tuple(labels
[k
] for k
in keys
)
435 grouped
[group_key
].append((labels
, metric_value
))
437 # as there is nothing specified on how to join labels that are not equal
438 # and Prometheus `group` aggregation functions similarly, we simply drop
441 label
for label
in self
.labelnames
if label
in keys
or label
in joins
443 superfluous_labelnames
= [
444 label
for label
in self
.labelnames
if label
not in labelnames
447 # iterate and convert groups with more than one member into a single
449 values
: MetricValue
= {}
450 for group
in grouped
.values():
451 labels
, metric_value
= group
[0]
453 for label
in superfluous_labelnames
:
457 for key
, fn
in joins
.items():
458 labels
[key
] = fn(list(labels
[key
] for labels
, _
in group
))
460 values
[tuple(labels
.values())] = metric_value
462 new_metric
= Metric(self
.mtype
, name
if name
else self
.name
, self
.desc
, labelnames
)
463 new_metric
.value
= values
468 class MetricCounter(Metric
):
472 labels
: Optional
[LabelValues
] = None) -> None:
473 super(MetricCounter
, self
).__init
__('counter', name
, desc
, labels
)
474 self
.value
= defaultdict(lambda: 0)
476 def clear(self
) -> None:
477 pass # Skip calls to clear as we want to keep the counters here.
481 labelvalues
: Optional
[LabelValues
] = None) -> None:
482 msg
= 'This method must not be used for instances of MetricCounter class'
483 raise NotImplementedError(msg
)
487 labelvalues
: Optional
[LabelValues
] = None) -> None:
488 # labelvalues must be a tuple
489 labelvalues
= labelvalues
or ('',)
490 self
.value
[labelvalues
] += value
493 class MetricCollectionThread(threading
.Thread
):
494 def __init__(self
, module
: 'Module') -> None:
497 self
.event
= threading
.Event()
498 super(MetricCollectionThread
, self
).__init
__(target
=self
.collect
)
500 def collect(self
) -> None:
501 self
.mod
.log
.info('starting metric collection thread')
503 self
.mod
.log
.debug('collecting cache in thread')
504 if self
.mod
.have_mon_connection():
505 start_time
= time
.time()
508 data
= self
.mod
.collect()
510 # Log any issues encountered during the data collection and continue
511 self
.mod
.log
.exception("failed to collect metrics:")
512 self
.event
.wait(self
.mod
.scrape_interval
)
515 duration
= time
.time() - start_time
516 self
.mod
.log
.debug('collecting cache in thread done')
518 sleep_time
= self
.mod
.scrape_interval
- duration
520 self
.mod
.log
.warning(
521 'Collecting data took more time than configured scrape interval. '
522 'This possibly results in stale data. Please check the '
523 '`stale_cache_strategy` configuration option. '
524 'Collecting data took {:.2f} seconds but scrape interval is configured '
525 'to be {:.0f} seconds.'.format(
527 self
.mod
.scrape_interval
,
532 with self
.mod
.collect_lock
:
533 self
.mod
.collect_cache
= data
534 self
.mod
.collect_time
= duration
536 self
.event
.wait(sleep_time
)
538 self
.mod
.log
.error('No MON connection')
539 self
.event
.wait(self
.mod
.scrape_interval
)
541 def stop(self
) -> None:
546 class Module(MgrModule
):
550 default
=get_default_addr(),
551 desc
='the IPv4 or IPv6 address on which the module listens for HTTP requests',
556 default
=DEFAULT_PORT
,
557 desc
='the port on which the module listens for HTTP requests'
565 'stale_cache_strategy',
578 name
='rbd_stats_pools_refresh_interval',
583 name
='standby_behaviour',
586 enum_allowed
=['default', 'error'],
590 name
='standby_error_status_code',
599 STALE_CACHE_FAIL
= 'fail'
600 STALE_CACHE_RETURN
= 'return'
602 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
603 super(Module
, self
).__init
__(*args
, **kwargs
)
604 self
.metrics
= self
._setup
_static
_metrics
()
605 self
.shutdown_event
= threading
.Event()
606 self
.collect_lock
= threading
.Lock()
607 self
.collect_time
= 0.0
608 self
.scrape_interval
: float = 15.0
610 self
.stale_cache_strategy
: str = self
.STALE_CACHE_FAIL
611 self
.collect_cache
: Optional
[str] = None
614 'pools_refresh_time': 0,
616 'write_ops': {'type': self
.PERFCOUNTER_COUNTER
,
617 'desc': 'RBD image writes count'},
618 'read_ops': {'type': self
.PERFCOUNTER_COUNTER
,
619 'desc': 'RBD image reads count'},
620 'write_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
621 'desc': 'RBD image bytes written'},
622 'read_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
623 'desc': 'RBD image bytes read'},
624 'write_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
625 'desc': 'RBD image writes latency (msec)'},
626 'read_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
627 'desc': 'RBD image reads latency (msec)'},
629 } # type: Dict[str, Any]
630 global _global_instance
631 _global_instance
= self
632 self
.metrics_thread
= MetricCollectionThread(_global_instance
)
633 self
.health_history
= HealthHistory(self
)
635 def _setup_static_metrics(self
) -> Dict
[str, Metric
]:
637 metrics
['health_status'] = Metric(
640 'Cluster health status'
642 metrics
['mon_quorum_status'] = Metric(
645 'Monitors in quorum',
648 metrics
['fs_metadata'] = Metric(
654 metrics
['mds_metadata'] = Metric(
660 metrics
['mon_metadata'] = Metric(
666 metrics
['mgr_metadata'] = Metric(
672 metrics
['mgr_status'] = Metric(
675 'MGR status (0=standby, 1=active)',
678 metrics
['mgr_module_status'] = Metric(
681 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
684 metrics
['mgr_module_can_run'] = Metric(
686 'mgr_module_can_run',
687 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
690 metrics
['osd_metadata'] = Metric(
697 # The reason for having this separate to OSD_METADATA is
698 # so that we can stably use the same tag names that
699 # the Prometheus node_exporter does
700 metrics
['disk_occupation'] = Metric(
703 'Associate Ceph daemon with disk used',
707 metrics
['disk_occupation_human'] = Metric(
709 'disk_occupation_human',
710 'Associate Ceph daemon with disk used for displaying to humans,'
711 ' not for joining tables (vector matching)',
712 DISK_OCCUPATION
, # label names are automatically decimated on grouping
715 metrics
['pool_metadata'] = Metric(
722 metrics
['rgw_metadata'] = Metric(
729 metrics
['rbd_mirror_metadata'] = Metric(
731 'rbd_mirror_metadata',
732 'RBD Mirror Metadata',
736 metrics
['pg_total'] = Metric(
739 'PG Total Count per Pool',
743 metrics
['health_detail'] = Metric(
746 'healthcheck status by type (0=inactive, 1=active)',
750 for flag
in OSD_FLAGS
:
751 path
= 'osd_flag_{}'.format(flag
)
752 metrics
[path
] = Metric(
755 'OSD Flag {}'.format(flag
)
757 for state
in OSD_STATUS
:
758 path
= 'osd_{}'.format(state
)
759 metrics
[path
] = Metric(
762 'OSD status {}'.format(state
),
765 for stat
in OSD_STATS
:
766 path
= 'osd_{}'.format(stat
)
767 metrics
[path
] = Metric(
770 'OSD stat {}'.format(stat
),
773 for stat
in OSD_POOL_STATS
:
774 path
= 'pool_{}'.format(stat
)
775 metrics
[path
] = Metric(
778 "OSD pool stats: {}".format(stat
),
781 for state
in PG_STATES
:
782 path
= 'pg_{}'.format(state
)
783 metrics
[path
] = Metric(
786 'PG {} per pool'.format(state
),
789 for state
in DF_CLUSTER
:
790 path
= 'cluster_{}'.format(state
)
791 metrics
[path
] = Metric(
794 'DF {}'.format(state
),
796 for state
in DF_POOL
:
797 path
= 'pool_{}'.format(state
)
798 metrics
[path
] = Metric(
799 'counter' if state
in ('rd', 'rd_bytes', 'wr', 'wr_bytes') else 'gauge',
801 'DF pool {}'.format(state
),
804 for state
in NUM_OBJECTS
:
805 path
= 'num_objects_{}'.format(state
)
806 metrics
[path
] = Metric(
809 'Number of {} objects'.format(state
),
812 for check
in HEALTH_CHECKS
:
813 path
= 'healthcheck_{}'.format(check
.name
.lower())
814 metrics
[path
] = Metric(
823 def get_health(self
) -> None:
825 def _get_value(message
: str, delim
: str = ' ', word_pos
: int = 0) -> Tuple
[int, int]:
826 """Extract value from message (default is 1st field)"""
827 v_str
= message
.split(delim
)[word_pos
]
832 health
= json
.loads(self
.get('health')['json'])
834 self
.metrics
['health_status'].set(
835 health_status_to_number(health
['status'])
838 # Examine the health to see if any health checks triggered need to
839 # become a specific metric with a value from the health detail
840 active_healthchecks
= health
.get('checks', {})
841 active_names
= active_healthchecks
.keys()
843 for check
in HEALTH_CHECKS
:
844 path
= 'healthcheck_{}'.format(check
.name
.lower())
846 if path
in self
.metrics
:
848 if check
.name
in active_names
:
849 check_data
= active_healthchecks
[check
.name
]
850 message
= check_data
['summary'].get('message', '')
853 if check
.name
== "SLOW_OPS":
854 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have
856 v
, err
= _get_value(message
)
860 "healthcheck %s message format is incompatible and has been dropped",
862 # drop the metric, so it's no longer emitted
863 del self
.metrics
[path
]
866 self
.metrics
[path
].set(v
)
868 # health check is not active, so give it a default of 0
869 self
.metrics
[path
].set(0)
871 self
.health_history
.check(health
)
872 for name
, info
in self
.health_history
.healthcheck
.items():
873 v
= 1 if info
.active
else 0
874 self
.metrics
['health_detail'].set(
881 def get_pool_stats(self
) -> None:
882 # retrieve pool stats to provide per pool recovery metrics
883 # (osd_pool_stats moved to mgr in Mimic)
884 pstats
= self
.get('osd_pool_stats')
885 for pool
in pstats
['pool_stats']:
886 for stat
in OSD_POOL_STATS
:
887 self
.metrics
['pool_{}'.format(stat
)].set(
888 pool
['recovery_rate'].get(stat
, 0),
893 def get_df(self
) -> None:
894 # maybe get the to-be-exported metrics from a config?
896 for stat
in DF_CLUSTER
:
897 self
.metrics
['cluster_{}'.format(stat
)].set(df
['stats'][stat
])
899 for pool
in df
['pools']:
901 self
.metrics
['pool_{}'.format(stat
)].set(
907 def get_fs(self
) -> None:
908 fs_map
= self
.get('fs_map')
909 servers
= self
.get_service_list()
910 self
.log
.debug('standbys: {}'.format(fs_map
['standbys']))
911 # export standby mds metadata, default standby fs_id is '-1'
912 for standby
in fs_map
['standbys']:
913 id_
= standby
['name']
914 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
915 addr
, rank
= standby
['addr'], standby
['rank']
916 self
.metrics
['mds_metadata'].set(1, (
917 'mds.{}'.format(id_
), '-1',
923 for fs
in fs_map
['filesystems']:
924 # collect fs metadata
925 data_pools
= ",".join([str(pool
)
926 for pool
in fs
['mdsmap']['data_pools']])
927 self
.metrics
['fs_metadata'].set(1, (
930 fs
['mdsmap']['metadata_pool'],
931 fs
['mdsmap']['fs_name']
933 self
.log
.debug('mdsmap: {}'.format(fs
['mdsmap']))
934 for gid
, daemon
in fs
['mdsmap']['info'].items():
936 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
937 self
.metrics
['mds_metadata'].set(1, (
938 'mds.{}'.format(id_
), fs
['id'],
939 host
, daemon
['addr'],
940 daemon
['rank'], version
944 def get_quorum_status(self
) -> None:
945 mon_status
= json
.loads(self
.get('mon_status')['json'])
946 servers
= self
.get_service_list()
947 for mon
in mon_status
['monmap']['mons']:
950 host_version
= servers
.get((id_
, 'mon'), ('', '', ''))
951 self
.metrics
['mon_metadata'].set(1, (
952 'mon.{}'.format(id_
), host_version
[0],
953 mon
['public_addr'].rsplit(':', 1)[0], rank
,
956 in_quorum
= int(rank
in mon_status
['quorum'])
957 self
.metrics
['mon_quorum_status'].set(in_quorum
, (
958 'mon.{}'.format(id_
),
962 def get_mgr_status(self
) -> None:
963 mgr_map
= self
.get('mgr_map')
964 servers
= self
.get_service_list()
966 active
= mgr_map
['active_name']
967 standbys
= [s
.get('name') for s
in mgr_map
['standbys']]
969 all_mgrs
= list(standbys
)
970 all_mgrs
.append(active
)
972 all_modules
= {module
.get('name'): module
.get('can_run')
973 for module
in mgr_map
['available_modules']}
976 host
, version
, _
= servers
.get((mgr
, 'mgr'), ('', '', ''))
982 self
.metrics
['mgr_metadata'].set(1, (
983 f
'mgr.{mgr}', host
, version
985 self
.metrics
['mgr_status'].set(_state
, (
987 always_on_modules
= mgr_map
['always_on_modules'].get(self
.release_name
, [])
988 active_modules
= list(always_on_modules
)
989 active_modules
.extend(mgr_map
['modules'])
991 for mod_name
in all_modules
.keys():
993 if mod_name
in always_on_modules
:
995 elif mod_name
in active_modules
:
1000 _can_run
= 1 if all_modules
[mod_name
] else 0
1001 self
.metrics
['mgr_module_status'].set(_state
, (mod_name
,))
1002 self
.metrics
['mgr_module_can_run'].set(_can_run
, (mod_name
,))
1005 def get_pg_status(self
) -> None:
1007 pg_summary
= self
.get('pg_summary')
1009 for pool
in pg_summary
['by_pool']:
1010 num_by_state
= defaultdict(int) # type: DefaultDict[str, int]
1012 for state_name
, count
in pg_summary
['by_pool'][pool
].items():
1013 for state
in state_name
.split('+'):
1014 num_by_state
[state
] += count
1015 num_by_state
['total'] += count
1017 for state
, num
in num_by_state
.items():
1019 self
.metrics
["pg_{}".format(state
)].set(num
, (pool
,))
1021 self
.log
.warning("skipping pg in unknown state {}".format(state
))
1024 def get_osd_stats(self
) -> None:
1025 osd_stats
= self
.get('osd_stats')
1026 for osd
in osd_stats
['osd_stats']:
1028 for stat
in OSD_STATS
:
1029 val
= osd
['perf_stat'][stat
]
1030 self
.metrics
['osd_{}'.format(stat
)].set(val
, (
1031 'osd.{}'.format(id_
),
1034 def get_service_list(self
) -> Dict
[Tuple
[str, str], Tuple
[str, str, str]]:
1036 for server
in self
.list_servers():
1037 version
= cast(str, server
.get('ceph_version', ''))
1038 host
= cast(str, server
.get('hostname', ''))
1039 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1040 ret
.update({(service
['id'], service
['type']): (host
, version
, service
.get('name', ''))})
1044 def get_metadata_and_osd_status(self
) -> None:
1045 osd_map
= self
.get('osd_map')
1046 osd_flags
= osd_map
['flags'].split(',')
1047 for flag
in OSD_FLAGS
:
1048 self
.metrics
['osd_flag_{}'.format(flag
)].set(
1049 int(flag
in osd_flags
)
1052 osd_devices
= self
.get('osd_map_crush')['devices']
1053 servers
= self
.get_service_list()
1054 for osd
in osd_map
['osds']:
1055 # id can be used to link osd metrics and metadata
1057 # collect osd metadata
1058 p_addr
= osd
['public_addr'].rsplit(':', 1)[0]
1059 c_addr
= osd
['cluster_addr'].rsplit(':', 1)[0]
1060 if p_addr
== "-" or c_addr
== "-":
1062 "Missing address metadata for osd {0}, skipping occupation"
1063 " and metadata records for this osd".format(id_
)
1068 for osd_device
in osd_devices
:
1069 if osd_device
['id'] == id_
:
1070 dev_class
= osd_device
.get('class', '')
1073 if dev_class
is None:
1074 self
.log
.info("OSD {0} is missing from CRUSH map, "
1075 "skipping output".format(id_
))
1078 host_version
= servers
.get((str(id_
), 'osd'), ('', '', ''))
1080 # collect disk occupation metadata
1081 osd_metadata
= self
.get_metadata("osd", str(id_
))
1082 if osd_metadata
is None:
1085 obj_store
= osd_metadata
.get('osd_objectstore', '')
1086 f_iface
= osd_metadata
.get('front_iface', '')
1087 b_iface
= osd_metadata
.get('back_iface', '')
1089 self
.metrics
['osd_metadata'].set(1, (
1091 'osd.{}'.format(id_
),
1101 # collect osd status
1102 for state
in OSD_STATUS
:
1104 self
.metrics
['osd_{}'.format(state
)].set(status
, (
1105 'osd.{}'.format(id_
),
1109 osd_wal_dev_node
= ''
1110 osd_db_dev_node
= ''
1111 if obj_store
== "filestore":
1112 # collect filestore backend device
1113 osd_dev_node
= osd_metadata
.get(
1114 'backend_filestore_dev_node', None)
1115 # collect filestore journal device
1116 osd_wal_dev_node
= osd_metadata
.get('osd_journal', '')
1117 osd_db_dev_node
= ''
1118 elif obj_store
== "bluestore":
1119 # collect bluestore backend device
1120 osd_dev_node
= osd_metadata
.get(
1121 'bluestore_bdev_dev_node', None)
1122 # collect bluestore wal backend
1123 osd_wal_dev_node
= osd_metadata
.get('bluefs_wal_dev_node', '')
1124 # collect bluestore db backend
1125 osd_db_dev_node
= osd_metadata
.get('bluefs_db_dev_node', '')
1126 if osd_dev_node
and osd_dev_node
== "unknown":
1129 # fetch the devices and ids (vendor, model, serial) from the
1131 osd_devs
= osd_metadata
.get('devices', '') or 'N/A'
1132 osd_dev_ids
= osd_metadata
.get('device_ids', '') or 'N/A'
1134 osd_hostname
= osd_metadata
.get('hostname', None)
1135 if osd_dev_node
and osd_hostname
:
1136 self
.log
.debug("Got dev for osd {0}: {1}/{2}".format(
1137 id_
, osd_hostname
, osd_dev_node
))
1138 self
.metrics
['disk_occupation'].set(1, (
1139 "osd.{0}".format(id_
),
1148 self
.log
.info("Missing dev node metadata for osd {0}, skipping "
1149 "occupation record for this osd".format(id_
))
1151 if 'disk_occupation' in self
.metrics
:
1153 self
.metrics
['disk_occupation_human'] = \
1154 self
.metrics
['disk_occupation'].group_by(
1155 ['device', 'instance'],
1156 {'ceph_daemon': lambda daemons
: ', '.join(daemons
)},
1157 name
='disk_occupation_human',
1159 except Exception as e
:
1162 ec_profiles
= osd_map
.get('erasure_code_profiles', {})
1164 def _get_pool_info(pool
: Dict
[str, Any
]) -> Tuple
[str, str]:
1165 pool_type
= 'unknown'
1166 description
= 'unknown'
1168 if pool
['type'] == 1:
1169 pool_type
= "replicated"
1170 description
= f
"replica:{pool['size']}"
1171 elif pool
['type'] == 3:
1172 pool_type
= "erasure"
1173 name
= pool
.get('erasure_code_profile', '')
1174 profile
= ec_profiles
.get(name
, {})
1176 description
= f
"ec:{profile['k']}+{profile['m']}"
1178 description
= "ec:unknown"
1180 return pool_type
, description
1182 for pool
in osd_map
['pools']:
1184 compression_mode
= 'none'
1185 pool_type
, pool_description
= _get_pool_info(pool
)
1187 if 'options' in pool
:
1188 compression_mode
= pool
['options'].get('compression_mode', 'none')
1190 self
.metrics
['pool_metadata'].set(
1199 # Populate other servers metadata
1200 for key
, value
in servers
.items():
1201 service_id
, service_type
= key
1202 if service_type
== 'rgw':
1203 hostname
, version
, name
= value
1204 self
.metrics
['rgw_metadata'].set(
1206 ('{}.{}'.format(service_type
, name
),
1207 hostname
, version
, service_id
)
1209 elif service_type
== 'rbd-mirror':
1210 mirror_metadata
= self
.get_metadata('rbd-mirror', service_id
)
1211 if mirror_metadata
is None:
1213 mirror_metadata
['ceph_daemon'] = '{}.{}'.format(service_type
,
1215 rbd_mirror_metadata
= cast(LabelValues
,
1216 (mirror_metadata
.get(k
, '')
1217 for k
in RBD_MIRROR_METADATA
))
1218 self
.metrics
['rbd_mirror_metadata'].set(
1219 1, rbd_mirror_metadata
1223 def get_num_objects(self
) -> None:
1224 pg_sum
= self
.get('pg_summary')['pg_stats_sum']['stat_sum']
1225 for obj
in NUM_OBJECTS
:
1226 stat
= 'num_objects_{}'.format(obj
)
1227 self
.metrics
[stat
].set(pg_sum
[stat
])
1230 def get_rbd_stats(self
) -> None:
1231 # Per RBD image stats is collected by registering a dynamic osd perf
1232 # stats query that tells OSDs to group stats for requests associated
1233 # with RBD objects by pool, namespace, and image id, which are
1234 # extracted from the request object names or other attributes.
1235 # The RBD object names have the following prefixes:
1236 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
1237 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
1238 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
1239 # The pool_id in the object name is the id of the pool with the image
1240 # metdata, and should be used in the image spec. If there is no pool_id
1241 # in the object name, the image pool is the pool where the object is
1244 # Parse rbd_stats_pools option, which is a comma or space separated
1245 # list of pool[/namespace] entries. If no namespace is specifed the
1246 # stats are collected for every namespace in the pool. The wildcard
1247 # '*' can be used to indicate all pools or namespaces
1248 pools_string
= cast(str, self
.get_localized_module_option('rbd_stats_pools'))
1250 for x
in re
.split(r
'[\s,]+', pools_string
):
1256 namespace_name
= None
1258 namespace_name
= s
[1]
1260 if pool_name
== "*":
1261 # collect for all pools
1262 osd_map
= self
.get('osd_map')
1263 for pool
in osd_map
['pools']:
1264 if 'rbd' not in pool
.get('application_metadata', {}):
1266 pool_keys
.append((pool
['pool_name'], namespace_name
))
1268 pool_keys
.append((pool_name
, namespace_name
))
1270 pools
= {} # type: Dict[str, Set[str]]
1271 for pool_key
in pool_keys
:
1272 pool_name
= pool_key
[0]
1273 namespace_name
= pool_key
[1]
1274 if not namespace_name
or namespace_name
== "*":
1275 # empty set means collect for all namespaces
1276 pools
[pool_name
] = set()
1279 if pool_name
not in pools
:
1280 pools
[pool_name
] = set()
1281 elif not pools
[pool_name
]:
1283 pools
[pool_name
].add(namespace_name
)
1285 rbd_stats_pools
= {}
1286 for pool_id
in self
.rbd_stats
['pools'].keys():
1287 name
= self
.rbd_stats
['pools'][pool_id
]['name']
1288 if name
not in pools
:
1289 del self
.rbd_stats
['pools'][pool_id
]
1291 rbd_stats_pools
[name
] = \
1292 self
.rbd_stats
['pools'][pool_id
]['ns_names']
1294 pools_refreshed
= False
1296 next_refresh
= self
.rbd_stats
['pools_refresh_time'] + \
1297 self
.get_localized_module_option(
1298 'rbd_stats_pools_refresh_interval', 300)
1299 if rbd_stats_pools
!= pools
or time
.time() >= next_refresh
:
1300 self
.refresh_rbd_stats_pools(pools
)
1301 pools_refreshed
= True
1303 pool_ids
= list(self
.rbd_stats
['pools'])
1305 pool_id_regex
= '^(' + '|'.join([str(x
) for x
in pool_ids
]) + ')$'
1308 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1309 if pool
['ns_names']:
1310 nspace_names
.extend(pool
['ns_names'])
1315 namespace_regex
= '^(' + \
1316 "|".join([re
.escape(x
)
1317 for x
in set(nspace_names
)]) + ')$'
1319 namespace_regex
= '^(.*)$'
1321 if ('query' in self
.rbd_stats
1322 and (pool_id_regex
!= self
.rbd_stats
['query']['key_descriptor'][0]['regex']
1323 or namespace_regex
!= self
.rbd_stats
['query']['key_descriptor'][1]['regex'])):
1324 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1325 del self
.rbd_stats
['query_id']
1326 del self
.rbd_stats
['query']
1328 if not self
.rbd_stats
['pools']:
1331 counters_info
= self
.rbd_stats
['counters_info']
1333 if 'query_id' not in self
.rbd_stats
:
1336 {'type': 'pool_id', 'regex': pool_id_regex
},
1337 {'type': 'namespace', 'regex': namespace_regex
},
1338 {'type': 'object_name',
1339 'regex': r
'^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
1341 'performance_counter_descriptors': list(counters_info
),
1343 query_id
= self
.add_osd_perf_query(query
)
1344 if query_id
is None:
1345 self
.log
.error('failed to add query %s' % query
)
1347 self
.rbd_stats
['query'] = query
1348 self
.rbd_stats
['query_id'] = query_id
1350 res
= self
.get_osd_perf_counters(self
.rbd_stats
['query_id'])
1352 for c
in res
['counters']:
1353 # if the pool id is not found in the object name use id of the
1354 # pool where the object is located
1356 pool_id
= int(c
['k'][2][0])
1358 pool_id
= int(c
['k'][0][0])
1359 if pool_id
not in self
.rbd_stats
['pools'] and not pools_refreshed
:
1360 self
.refresh_rbd_stats_pools(pools
)
1361 pools_refreshed
= True
1362 if pool_id
not in self
.rbd_stats
['pools']:
1364 pool
= self
.rbd_stats
['pools'][pool_id
]
1365 nspace_name
= c
['k'][1][0]
1366 if nspace_name
not in pool
['images']:
1368 image_id
= c
['k'][2][1]
1369 if image_id
not in pool
['images'][nspace_name
] and \
1370 not pools_refreshed
:
1371 self
.refresh_rbd_stats_pools(pools
)
1372 pool
= self
.rbd_stats
['pools'][pool_id
]
1373 pools_refreshed
= True
1374 if image_id
not in pool
['images'][nspace_name
]:
1376 counters
= pool
['images'][nspace_name
][image_id
]['c']
1377 for i
in range(len(c
['c'])):
1378 counters
[i
][0] += c
['c'][i
][0]
1379 counters
[i
][1] += c
['c'][i
][1]
1381 label_names
= ("pool", "namespace", "image")
1382 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1383 pool_name
= pool
['name']
1384 for nspace_name
, images
in pool
['images'].items():
1385 for image_id
in images
:
1386 image_name
= images
[image_id
]['n']
1387 counters
= images
[image_id
]['c']
1389 for key
in counters_info
:
1390 counter_info
= counters_info
[key
]
1391 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1392 labels
= (pool_name
, nspace_name
, image_name
)
1393 if counter_info
['type'] == self
.PERFCOUNTER_COUNTER
:
1395 if path
not in self
.metrics
:
1396 self
.metrics
[path
] = Metric(
1399 counter_info
['desc'],
1402 self
.metrics
[path
].set(counters
[i
][0], labels
)
1403 elif counter_info
['type'] == self
.PERFCOUNTER_LONGRUNAVG
:
1404 path
= 'rbd_' + key
+ '_sum'
1405 if path
not in self
.metrics
:
1406 self
.metrics
[path
] = Metric(
1409 counter_info
['desc'] + ' Total',
1412 self
.metrics
[path
].set(counters
[i
][0], labels
)
1413 path
= 'rbd_' + key
+ '_count'
1414 if path
not in self
.metrics
:
1415 self
.metrics
[path
] = Metric(
1418 counter_info
['desc'] + ' Count',
1421 self
.metrics
[path
].set(counters
[i
][1], labels
)
1424 def refresh_rbd_stats_pools(self
, pools
: Dict
[str, Set
[str]]) -> None:
1425 self
.log
.debug('refreshing rbd pools %s' % (pools
))
1428 counters_info
= self
.rbd_stats
['counters_info']
1429 for pool_name
, cfg_ns_names
in pools
.items():
1431 pool_id
= self
.rados
.pool_lookup(pool_name
)
1432 with self
.rados
.open_ioctx(pool_name
) as ioctx
:
1433 if pool_id
not in self
.rbd_stats
['pools']:
1434 self
.rbd_stats
['pools'][pool_id
] = {'images': {}}
1435 pool
= self
.rbd_stats
['pools'][pool_id
]
1436 pool
['name'] = pool_name
1437 pool
['ns_names'] = cfg_ns_names
1439 nspace_names
= list(cfg_ns_names
)
1441 nspace_names
= [''] + rbd
.namespace_list(ioctx
)
1442 for nspace_name
in pool
['images']:
1443 if nspace_name
not in nspace_names
:
1444 del pool
['images'][nspace_name
]
1445 for nspace_name
in nspace_names
:
1447 not rbd
.namespace_exists(ioctx
, nspace_name
):
1448 self
.log
.debug('unknown namespace %s for pool %s' %
1449 (nspace_name
, pool_name
))
1451 ioctx
.set_namespace(nspace_name
)
1452 if nspace_name
not in pool
['images']:
1453 pool
['images'][nspace_name
] = {}
1454 namespace
= pool
['images'][nspace_name
]
1456 for image_meta
in RBD().list2(ioctx
):
1457 image
= {'n': image_meta
['name']}
1458 image_id
= image_meta
['id']
1459 if image_id
in namespace
:
1460 image
['c'] = namespace
[image_id
]['c']
1462 image
['c'] = [[0, 0] for x
in counters_info
]
1463 images
[image_id
] = image
1464 pool
['images'][nspace_name
] = images
1465 except Exception as e
:
1466 self
.log
.error('failed listing pool %s: %s' % (pool_name
, e
))
1467 self
.rbd_stats
['pools_refresh_time'] = time
.time()
1469 def shutdown_rbd_stats(self
) -> None:
1470 if 'query_id' in self
.rbd_stats
:
1471 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1472 del self
.rbd_stats
['query_id']
1473 del self
.rbd_stats
['query']
1474 self
.rbd_stats
['pools'].clear()
1476 def add_fixed_name_metrics(self
) -> None:
1478 Add fixed name metrics from existing ones that have details in their names
1479 that should be in labels (not in name).
1480 For backward compatibility, a new fixed name metric is created (instead of replacing)
1481 and details are put in new labels.
1482 Intended for RGW sync perf. counters but extendable as required.
1483 See: https://tracker.ceph.com/issues/45311
1486 for metric_path
, metrics
in self
.metrics
.items():
1487 # Address RGW sync perf. counters.
1488 match
= re
.search(r
'^data-sync-from-(.*)\.', metric_path
)
1490 new_path
= re
.sub('from-([^.]*)', 'from-zone', metric_path
)
1491 if new_path
not in new_metrics
:
1492 new_metrics
[new_path
] = Metric(
1496 cast(LabelValues
, metrics
.labelnames
) + ('source_zone',)
1498 for label_values
, value
in metrics
.value
.items():
1499 new_metrics
[new_path
].set(value
, label_values
+ (match
.group(1),))
1501 self
.metrics
.update(new_metrics
)
1503 def get_collect_time_metrics(self
) -> None:
1504 sum_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_sum')
1505 count_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_count')
1506 if sum_metric
is None:
1507 sum_metric
= MetricCounter(
1508 'prometheus_collect_duration_seconds_sum',
1509 'The sum of seconds took to collect all metrics of this exporter',
1511 self
.metrics
['prometheus_collect_duration_seconds_sum'] = sum_metric
1512 if count_metric
is None:
1513 count_metric
= MetricCounter(
1514 'prometheus_collect_duration_seconds_count',
1515 'The amount of metrics gathered for this exporter',
1517 self
.metrics
['prometheus_collect_duration_seconds_count'] = count_metric
1519 # Collect all timing data and make it available as metric, excluding the
1520 # `collect` method because it has not finished at this point and hence
1521 # there's no `_execution_duration` attribute to be found. The
1522 # `_execution_duration` attribute is added by the `profile_method`
1524 for method_name
, method
in Module
.__dict
__.items():
1525 duration
= getattr(method
, '_execution_duration', None)
1526 if duration
is not None:
1527 cast(MetricCounter
, sum_metric
).add(duration
, (method_name
,))
1528 cast(MetricCounter
, count_metric
).add(1, (method_name
,))
1530 @profile_method(True)
1531 def collect(self
) -> str:
1532 # Clear the metrics before scraping
1533 for k
in self
.metrics
.keys():
1534 self
.metrics
[k
].clear()
1538 self
.get_pool_stats()
1540 self
.get_osd_stats()
1541 self
.get_quorum_status()
1542 self
.get_mgr_status()
1543 self
.get_metadata_and_osd_status()
1544 self
.get_pg_status()
1545 self
.get_num_objects()
1547 for daemon
, counters
in self
.get_all_perf_counters().items():
1548 for path
, counter_info
in counters
.items():
1549 # Skip histograms, they are represented by long running avgs
1550 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1551 if not stattype
or stattype
== 'histogram':
1552 self
.log
.debug('ignoring %s, type %s' % (path
, stattype
))
1555 path
, label_names
, labels
= self
._perfpath
_to
_path
_labels
(
1558 # Get the value of the counter
1559 value
= self
._perfvalue
_to
_value
(
1560 counter_info
['type'], counter_info
['value'])
1562 # Represent the long running avgs as sum/count pairs
1563 if counter_info
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1564 _path
= path
+ '_sum'
1565 if _path
not in self
.metrics
:
1566 self
.metrics
[_path
] = Metric(
1569 counter_info
['description'] + ' Total',
1572 self
.metrics
[_path
].set(value
, labels
)
1574 _path
= path
+ '_count'
1575 if _path
not in self
.metrics
:
1576 self
.metrics
[_path
] = Metric(
1579 counter_info
['description'] + ' Count',
1582 self
.metrics
[_path
].set(counter_info
['count'], labels
,)
1584 if path
not in self
.metrics
:
1585 self
.metrics
[path
] = Metric(
1588 counter_info
['description'],
1591 self
.metrics
[path
].set(value
, labels
)
1593 self
.add_fixed_name_metrics()
1594 self
.get_rbd_stats()
1596 self
.get_collect_time_metrics()
1598 # Return formatted metrics and clear no longer used data
1599 _metrics
= [m
.str_expfmt() for m
in self
.metrics
.values()]
1600 for k
in self
.metrics
.keys():
1601 self
.metrics
[k
].clear()
1603 return ''.join(_metrics
) + '\n'
1605 @CLIReadCommand('prometheus file_sd_config')
1606 def get_file_sd_config(self
) -> Tuple
[int, str, str]:
1608 Return file_sd compatible prometheus config for mgr cluster
1610 servers
= self
.list_servers()
1612 for server
in servers
:
1613 hostname
= server
.get('hostname', '')
1614 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1615 if service
['type'] != 'mgr':
1618 port
= self
._get
_module
_option
('server_port', DEFAULT_PORT
, id_
)
1619 targets
.append(f
'{hostname}:{port}')
1626 return 0, json
.dumps(ret
), ""
1628 def self_test(self
) -> None:
1630 self
.get_file_sd_config()
1632 def serve(self
) -> None:
1636 # collapse everything to '/'
1637 def _cp_dispatch(self
, vpath
: str) -> 'Root':
1638 cherrypy
.request
.path
= ''
1642 def index(self
) -> str:
1643 return '''<!DOCTYPE html>
1645 <head><title>Ceph Exporter</title></head>
1647 <h1>Ceph Exporter</h1>
1648 <p><a href='/metrics'>Metrics</a></p>
1653 def metrics(self
) -> Optional
[str]:
1654 # Lock the function execution
1655 assert isinstance(_global_instance
, Module
)
1656 with _global_instance
.collect_lock
:
1657 return self
._metrics
(_global_instance
)
1660 def _metrics(instance
: 'Module') -> Optional
[str]:
1662 self
.log
.debug('Cache disabled, collecting and returning without cache')
1663 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1664 return self
.collect()
1666 # Return cached data if available
1667 if not instance
.collect_cache
:
1668 raise cherrypy
.HTTPError(503, 'No cached data available yet')
1670 def respond() -> Optional
[str]:
1671 assert isinstance(instance
, Module
)
1672 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1673 return instance
.collect_cache
1675 if instance
.collect_time
< instance
.scrape_interval
:
1676 # Respond if cache isn't stale
1679 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_RETURN
:
1680 # Respond even if cache is stale
1682 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1683 'returning metrics from stale cache.'.format(
1684 instance
.collect_time
,
1685 instance
.collect_time
- instance
.scrape_interval
1690 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_FAIL
:
1691 # Fail if cache is stale
1693 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1694 'returning "service unavailable".'.format(
1695 instance
.collect_time
,
1696 instance
.collect_time
- instance
.scrape_interval
,
1699 instance
.log
.error(msg
)
1700 raise cherrypy
.HTTPError(503, msg
)
1703 # Make the cache timeout for collecting configurable
1704 self
.scrape_interval
= cast(float, self
.get_localized_module_option('scrape_interval'))
1706 self
.stale_cache_strategy
= cast(
1707 str, self
.get_localized_module_option('stale_cache_strategy'))
1708 if self
.stale_cache_strategy
not in [self
.STALE_CACHE_FAIL
,
1709 self
.STALE_CACHE_RETURN
]:
1710 self
.stale_cache_strategy
= self
.STALE_CACHE_FAIL
1712 server_addr
= cast(str, self
.get_localized_module_option(
1713 'server_addr', get_default_addr()))
1714 server_port
= cast(int, self
.get_localized_module_option(
1715 'server_port', DEFAULT_PORT
))
1717 "server_addr: %s server_port: %s" %
1718 (server_addr
, server_port
)
1721 self
.cache
= cast(bool, self
.get_localized_module_option('cache', True))
1723 self
.log
.info('Cache enabled')
1724 self
.metrics_thread
.start()
1726 self
.log
.info('Cache disabled')
1728 cherrypy
.config
.update({
1729 'server.socket_host': server_addr
,
1730 'server.socket_port': server_port
,
1731 'engine.autoreload.on': False
1733 # Publish the URI that others may use to access the service we're
1734 # about to start serving
1735 if server_addr
in ['::', '0.0.0.0']:
1736 server_addr
= self
.get_mgr_ip()
1737 self
.set_uri(build_url(scheme
='http', host
=server_addr
, port
=server_port
, path
='/'))
1739 cherrypy
.tree
.mount(Root(), "/")
1740 self
.log
.info('Starting engine...')
1741 cherrypy
.engine
.start()
1742 self
.log
.info('Engine started.')
1743 # wait for the shutdown event
1744 self
.shutdown_event
.wait()
1745 self
.shutdown_event
.clear()
1746 # tell metrics collection thread to stop collecting new metrics
1747 self
.metrics_thread
.stop()
1748 cherrypy
.engine
.stop()
1749 self
.log
.info('Engine stopped.')
1750 self
.shutdown_rbd_stats()
1751 # wait for the metrics collection thread to stop
1752 self
.metrics_thread
.join()
1754 def shutdown(self
) -> None:
1755 self
.log
.info('Stopping engine...')
1756 self
.shutdown_event
.set()
1758 @CLIReadCommand('healthcheck history ls')
1759 def _list_healthchecks(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1760 """List all the healthchecks being tracked
1762 The format options are parsed in ceph_argparse, before they get evaluated here so
1763 we can safely assume that what we have to process is valid. ceph_argparse will throw
1764 a ValueError if the cast to our Format class fails.
1767 format (Format, optional): output format. Defaults to Format.plain.
1770 HandleCommandResult: return code, stdout and stderr returned to the caller
1774 if format
== Format
.plain
:
1775 out
= str(self
.health_history
)
1776 elif format
== Format
.yaml
:
1777 out
= self
.health_history
.as_yaml()
1779 out
= self
.health_history
.as_json(format
== Format
.json_pretty
)
1781 return HandleCommandResult(retval
=0, stdout
=out
)
1783 @CLIWriteCommand('healthcheck history clear')
1784 def _clear_healthchecks(self
) -> HandleCommandResult
:
1785 """Clear the healthcheck history"""
1786 self
.health_history
.reset()
1787 return HandleCommandResult(retval
=0, stdout
="healthcheck history cleared")
1790 class StandbyModule(MgrStandbyModule
):
1792 MODULE_OPTIONS
= Module
.MODULE_OPTIONS
1794 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
1795 super(StandbyModule
, self
).__init
__(*args
, **kwargs
)
1796 self
.shutdown_event
= threading
.Event()
1798 def serve(self
) -> None:
1799 server_addr
= self
.get_localized_module_option(
1800 'server_addr', get_default_addr())
1801 server_port
= self
.get_localized_module_option(
1802 'server_port', DEFAULT_PORT
)
1803 self
.log
.info("server_addr: %s server_port: %s" %
1804 (server_addr
, server_port
))
1805 cherrypy
.config
.update({
1806 'server.socket_host': server_addr
,
1807 'server.socket_port': server_port
,
1808 'engine.autoreload.on': False,
1809 'request.show_tracebacks': False
1816 def index(self
) -> str:
1817 standby_behaviour
= module
.get_module_option('standby_behaviour')
1818 if standby_behaviour
== 'default':
1819 active_uri
= module
.get_active_uri()
1820 return '''<!DOCTYPE html>
1822 <head><title>Ceph Exporter</title></head>
1824 <h1>Ceph Exporter</h1>
1825 <p><a href='{}metrics'>Metrics</a></p>
1827 </html>'''.format(active_uri
)
1829 status
= module
.get_module_option('standby_error_status_code')
1830 raise cherrypy
.HTTPError(status
, message
="Keep on looking")
1833 def metrics(self
) -> str:
1834 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1837 cherrypy
.tree
.mount(Root(), '/', {})
1838 self
.log
.info('Starting engine...')
1839 cherrypy
.engine
.start()
1840 self
.log
.info('Engine started.')
1841 # Wait for shutdown event
1842 self
.shutdown_event
.wait()
1843 self
.shutdown_event
.clear()
1844 cherrypy
.engine
.stop()
1845 self
.log
.info('Engine stopped.')
1847 def shutdown(self
) -> None:
1848 self
.log
.info("Stopping engine...")
1849 self
.shutdown_event
.set()
1850 self
.log
.info("Stopped engine")