]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
56d239843d13731b2e016ba3d12931d5716a18a3
2 from collections
import defaultdict
3 from distutils
.version
import StrictVersion
11 from mgr_module
import CLIReadCommand
, MgrModule
, MgrStandbyModule
, PG_STATES
, Option
, ServiceInfoT
, HandleCommandResult
, CLIWriteCommand
12 from mgr_util
import get_default_addr
, profile_method
, build_url
14 from collections
import namedtuple
17 from typing
import DefaultDict
, Optional
, Dict
, Any
, Set
, cast
, Tuple
, Union
, List
, Callable
19 LabelValues
= Tuple
[str, ...]
20 Number
= Union
[int, float]
21 MetricValue
= Dict
[LabelValues
, Number
]
23 # Defaults for the Prometheus HTTP server. Can also set in config-key
24 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
25 # for Prometheus exporter port registry
29 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
30 # that the ports its listening on are in fact bound. When using the any address
31 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
32 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
34 if cherrypy
is not None:
35 v
= StrictVersion(cherrypy
.__version
__)
36 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
37 # centos:7) and back to at least 3.0.0.
38 if StrictVersion("3.1.2") <= v
< StrictVersion("3.2.3"):
39 # https://github.com/cherrypy/cherrypy/issues/1100
40 from cherrypy
.process
import servers
41 servers
.wait_for_occupied_port
= lambda host
, port
: None
44 # cherrypy likes to sys.exit on error. don't let it take us down too!
45 def os_exit_noop(status
: int) -> None:
49 os
._exit
= os_exit_noop
# type: ignore
51 # to access things in class Module from subclass Root. Because
52 # it's a dict, the writer doesn't need to declare 'global' for access
54 _global_instance
= None # type: Optional[Module]
55 cherrypy
.config
.update({
56 'response.headers.server': 'Ceph-Prometheus'
60 def health_status_to_number(status
: str) -> int:
61 if status
== 'HEALTH_OK':
63 elif status
== 'HEALTH_WARN':
65 elif status
== 'HEALTH_ERR':
67 raise ValueError(f
'unknown status "{status}"')
70 DF_CLUSTER
= ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
72 DF_POOL
= ['max_avail', 'avail_raw', 'stored', 'stored_raw', 'objects', 'dirty',
73 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
74 'compress_bytes_used', 'compress_under_bytes', 'bytes_used', 'percent_used']
76 OSD_POOL_STATS
= ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
77 'recovering_keys_per_sec', 'num_objects_recovered',
78 'num_bytes_recovered', 'num_bytes_recovered')
80 OSD_FLAGS
= ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
81 'norecover', 'noscrub', 'nodeep-scrub')
83 FS_METADATA
= ('data_pools', 'fs_id', 'metadata_pool', 'name')
85 MDS_METADATA
= ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
88 MON_METADATA
= ('ceph_daemon', 'hostname',
89 'public_addr', 'rank', 'ceph_version')
91 MGR_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version')
93 MGR_STATUS
= ('ceph_daemon',)
95 MGR_MODULE_STATUS
= ('name',)
97 MGR_MODULE_CAN_RUN
= ('name',)
99 OSD_METADATA
= ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
100 'front_iface', 'hostname', 'objectstore', 'public_addr',
103 OSD_STATUS
= ['weight', 'up', 'in']
105 OSD_STATS
= ['apply_latency_ms', 'commit_latency_ms']
107 POOL_METADATA
= ('pool_id', 'name', 'type', 'description', 'compression_mode')
109 RGW_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version', 'instance_id')
111 RBD_MIRROR_METADATA
= ('ceph_daemon', 'id', 'instance_id', 'hostname',
114 DISK_OCCUPATION
= ('ceph_daemon', 'device', 'db_device',
115 'wal_device', 'instance', 'devices', 'device_ids')
117 NUM_OBJECTS
= ['degraded', 'misplaced', 'unfound']
119 alert_metric
= namedtuple('alert_metric', 'name description')
121 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process'),
124 HEALTHCHECK_DETAIL
= ('name', 'severity')
127 class Severity(enum
.Enum
):
133 class Format(enum
.Enum
):
136 json_pretty
= 'json-pretty'
140 class HealthCheckEvent
:
142 def __init__(self
, name
: str, severity
: Severity
, first_seen
: float, last_seen
: float, count
: int, active
: bool = True):
144 self
.severity
= severity
145 self
.first_seen
= first_seen
146 self
.last_seen
= last_seen
150 def as_dict(self
) -> Dict
[str, Any
]:
151 """Return the instance as a dictionary."""
156 kv_name
= 'health_history'
157 titles
= "{healthcheck_name:<24} {first_seen:<20} {last_seen:<20} {count:>5} {active:^6}"
158 date_format
= "%Y/%m/%d %H:%M:%S"
160 def __init__(self
, mgr
: MgrModule
):
162 self
.lock
= threading
.Lock()
163 self
.healthcheck
: Dict
[str, HealthCheckEvent
] = {}
166 def _load(self
) -> None:
167 """Load the current state from the mons KV store."""
168 data
= self
.mgr
.get_store(self
.kv_name
)
171 healthcheck_data
= json
.loads(data
)
172 except json
.JSONDecodeError
:
174 f
"INVALID data read from mgr/prometheus/{self.kv_name}. Resetting")
178 for k
, v
in healthcheck_data
.items():
179 self
.healthcheck
[k
] = HealthCheckEvent(
181 severity
=v
.get('severity'),
182 first_seen
=v
.get('first_seen', 0),
183 last_seen
=v
.get('last_seen', 0),
184 count
=v
.get('count', 1),
185 active
=v
.get('active', True))
189 def reset(self
) -> None:
190 """Reset the healthcheck history."""
192 self
.mgr
.set_store(self
.kv_name
, "{}")
193 self
.healthcheck
= {}
195 def save(self
) -> None:
196 """Save the current in-memory healthcheck history to the KV store."""
198 self
.mgr
.set_store(self
.kv_name
, self
.as_json())
200 def check(self
, health_checks
: Dict
[str, Any
]) -> None:
201 """Look at the current health checks and compare existing the history.
204 health_checks (Dict[str, Any]): current health check data
207 current_checks
= health_checks
.get('checks', {})
210 # first turn off any active states we're tracking
211 for seen_check
in self
.healthcheck
:
212 check
= self
.healthcheck
[seen_check
]
213 if check
.active
and seen_check
not in current_checks
:
217 # now look for any additions to track
219 for name
, info
in current_checks
.items():
220 if name
not in self
.healthcheck
:
221 # this healthcheck is new, so start tracking it
223 self
.healthcheck
[name
] = HealthCheckEvent(
225 severity
=info
.get('severity'),
232 # seen it before, so update its metadata
233 check
= self
.healthcheck
[name
]
235 # check has been registered as active already, so skip
238 check
.last_seen
= now
246 def __str__(self
) -> str:
247 """Print the healthcheck history.
250 str: Human readable representation of the healthcheck history
254 if len(self
.healthcheck
.keys()) == 0:
255 out
.append("No healthchecks have been recorded")
257 out
.append(self
.titles
.format(
258 healthcheck_name
="Healthcheck Name",
259 first_seen
="First Seen (UTC)",
260 last_seen
="Last seen (UTC)",
264 for k
in sorted(self
.healthcheck
.keys()):
265 check
= self
.healthcheck
[k
]
266 out
.append(self
.titles
.format(
267 healthcheck_name
=check
.name
,
268 first_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.first_seen
)),
269 last_seen
=time
.strftime(self
.date_format
, time
.localtime(check
.last_seen
)),
271 active
="Yes" if check
.active
else "No")
273 out
.extend([f
"{len(self.healthcheck)} health check(s) listed", ""])
275 return "\n".join(out
)
277 def as_dict(self
) -> Dict
[str, Any
]:
278 """Return the history in a dictionary.
281 Dict[str, Any]: dictionary indexed by the healthcheck name
283 return {name
: self
.healthcheck
[name
].as_dict() for name
in self
.healthcheck
}
285 def as_json(self
, pretty
: bool = False) -> str:
286 """Return the healthcheck history object as a dict (JSON).
289 pretty (bool, optional): whether to json pretty print the history. Defaults to False.
292 str: str representation of the healthcheck in JSON format
295 return json
.dumps(self
.as_dict(), indent
=2)
297 return json
.dumps(self
.as_dict())
299 def as_yaml(self
) -> str:
300 """Return the healthcheck history in yaml format.
303 str: YAML representation of the healthcheck history
305 return yaml
.safe_dump(self
.as_dict(), explicit_start
=True, default_flow_style
=False)
308 class Metric(object):
309 def __init__(self
, mtype
: str, name
: str, desc
: str, labels
: Optional
[LabelValues
] = None) -> None:
313 self
.labelnames
= labels
# tuple if present
314 self
.value
: Dict
[LabelValues
, Number
] = {}
316 def clear(self
) -> None:
319 def set(self
, value
: Number
, labelvalues
: Optional
[LabelValues
] = None) -> None:
320 # labelvalues must be a tuple
321 labelvalues
= labelvalues
or ('',)
322 self
.value
[labelvalues
] = value
324 def str_expfmt(self
) -> str:
326 def promethize(path
: str) -> str:
327 ''' replace illegal metric name characters '''
328 result
= re
.sub(r
'[./\s]|::', '_', path
).replace('+', '_plus')
330 # Hyphens usually turn into underscores, unless they are
332 if result
.endswith("-"):
333 result
= result
[0:-1] + "_minus"
335 result
= result
.replace("-", "_")
337 return "ceph_{0}".format(result
)
339 def floatstr(value
: float) -> str:
340 ''' represent as Go-compatible float '''
341 if value
== float('inf'):
343 if value
== float('-inf'):
345 if math
.isnan(value
):
347 return repr(float(value
))
349 name
= promethize(self
.name
)
352 # TYPE {name} {mtype}'''.format(
358 for labelvalues
, value
in self
.value
.items():
360 labels_list
= zip(self
.labelnames
, labelvalues
)
361 labels
= ','.join('%s="%s"' % (k
, v
) for k
, v
in labels_list
)
365 fmtstr
= '\n{name}{{{labels}}} {value}'
367 fmtstr
= '\n{name} {value}'
368 expfmt
+= fmtstr
.format(
371 value
=floatstr(value
),
378 joins
: Dict
[str, Callable
[[List
[str]], str]],
379 name
: Optional
[str] = None,
382 Groups data by label names.
384 Label names not passed are being removed from the resulting metric but
385 by providing a join function, labels of metrics can be grouped.
387 The purpose of this method is to provide a version of a metric that can
388 be used in matching where otherwise multiple results would be returned.
390 As grouping is possible in Prometheus, the only additional value of this
391 method is the possibility to join labels when grouping. For that reason,
392 passing joins is required. Please use PromQL expressions in all other
395 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
400 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
403 The functionality of group by could roughly be compared with Prometheus'
405 group (ceph_disk_occupation) by (device, instance)
407 with the exception that not all labels which aren't used as a condition
408 to group a metric are discarded, but their values can are joined and the
409 label is thereby preserved.
411 This function takes the value of the first entry of a found group to be
412 used for the resulting value of the grouping operation.
414 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
416 ... ('foo', 'x'): 555,
417 ... ('foo', 'y'): 10,
419 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
420 {('foo', 'x,y'): 555}
422 assert self
.labelnames
, "cannot match keys without label names"
424 assert key
in self
.labelnames
, "unknown key: {}".format(key
)
425 assert joins
, "joins must not be empty"
426 assert all(callable(c
) for c
in joins
.values()), "joins must be callable"
429 grouped
: Dict
[LabelValues
, List
[Tuple
[Dict
[str, str], Number
]]] = defaultdict(list)
430 for label_values
, metric_value
in self
.value
.items():
431 labels
= dict(zip(self
.labelnames
, label_values
))
432 if not all(k
in labels
for k
in keys
):
434 group_key
= tuple(labels
[k
] for k
in keys
)
435 grouped
[group_key
].append((labels
, metric_value
))
437 # as there is nothing specified on how to join labels that are not equal
438 # and Prometheus `group` aggregation functions similarly, we simply drop
441 label
for label
in self
.labelnames
if label
in keys
or label
in joins
443 superfluous_labelnames
= [
444 label
for label
in self
.labelnames
if label
not in labelnames
447 # iterate and convert groups with more than one member into a single
449 values
: MetricValue
= {}
450 for group
in grouped
.values():
451 labels
, metric_value
= group
[0]
453 for label
in superfluous_labelnames
:
457 for key
, fn
in joins
.items():
458 labels
[key
] = fn(list(labels
[key
] for labels
, _
in group
))
460 values
[tuple(labels
.values())] = metric_value
462 new_metric
= Metric(self
.mtype
, name
if name
else self
.name
, self
.desc
, labelnames
)
463 new_metric
.value
= values
468 class MetricCounter(Metric
):
472 labels
: Optional
[LabelValues
] = None) -> None:
473 super(MetricCounter
, self
).__init
__('counter', name
, desc
, labels
)
474 self
.value
= defaultdict(lambda: 0)
476 def clear(self
) -> None:
477 pass # Skip calls to clear as we want to keep the counters here.
481 labelvalues
: Optional
[LabelValues
] = None) -> None:
482 msg
= 'This method must not be used for instances of MetricCounter class'
483 raise NotImplementedError(msg
)
487 labelvalues
: Optional
[LabelValues
] = None) -> None:
488 # labelvalues must be a tuple
489 labelvalues
= labelvalues
or ('',)
490 self
.value
[labelvalues
] += value
493 class MetricCollectionThread(threading
.Thread
):
494 def __init__(self
, module
: 'Module') -> None:
497 self
.event
= threading
.Event()
498 super(MetricCollectionThread
, self
).__init
__(target
=self
.collect
)
500 def collect(self
) -> None:
501 self
.mod
.log
.info('starting metric collection thread')
503 self
.mod
.log
.debug('collecting cache in thread')
504 if self
.mod
.have_mon_connection():
505 start_time
= time
.time()
508 data
= self
.mod
.collect()
510 # Log any issues encountered during the data collection and continue
511 self
.mod
.log
.exception("failed to collect metrics:")
512 self
.event
.wait(self
.mod
.scrape_interval
)
515 duration
= time
.time() - start_time
516 self
.mod
.log
.debug('collecting cache in thread done')
518 sleep_time
= self
.mod
.scrape_interval
- duration
520 self
.mod
.log
.warning(
521 'Collecting data took more time than configured scrape interval. '
522 'This possibly results in stale data. Please check the '
523 '`stale_cache_strategy` configuration option. '
524 'Collecting data took {:.2f} seconds but scrape interval is configured '
525 'to be {:.0f} seconds.'.format(
527 self
.mod
.scrape_interval
,
532 with self
.mod
.collect_lock
:
533 self
.mod
.collect_cache
= data
534 self
.mod
.collect_time
= duration
536 self
.event
.wait(sleep_time
)
538 self
.mod
.log
.error('No MON connection')
539 self
.event
.wait(self
.mod
.scrape_interval
)
541 def stop(self
) -> None:
546 class Module(MgrModule
):
550 default
=get_default_addr(),
551 desc
='the IPv4 or IPv6 address on which the module listens for HTTP requests',
556 default
=DEFAULT_PORT
,
557 desc
='the port on which the module listens for HTTP requests',
566 'stale_cache_strategy',
579 name
='rbd_stats_pools_refresh_interval',
584 name
='standby_behaviour',
587 enum_allowed
=['default', 'error'],
591 name
='standby_error_status_code',
600 STALE_CACHE_FAIL
= 'fail'
601 STALE_CACHE_RETURN
= 'return'
603 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
604 super(Module
, self
).__init
__(*args
, **kwargs
)
605 self
.metrics
= self
._setup
_static
_metrics
()
606 self
.shutdown_event
= threading
.Event()
607 self
.collect_lock
= threading
.Lock()
608 self
.collect_time
= 0.0
609 self
.scrape_interval
: float = 15.0
611 self
.stale_cache_strategy
: str = self
.STALE_CACHE_FAIL
612 self
.collect_cache
: Optional
[str] = None
615 'pools_refresh_time': 0,
617 'write_ops': {'type': self
.PERFCOUNTER_COUNTER
,
618 'desc': 'RBD image writes count'},
619 'read_ops': {'type': self
.PERFCOUNTER_COUNTER
,
620 'desc': 'RBD image reads count'},
621 'write_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
622 'desc': 'RBD image bytes written'},
623 'read_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
624 'desc': 'RBD image bytes read'},
625 'write_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
626 'desc': 'RBD image writes latency (msec)'},
627 'read_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
628 'desc': 'RBD image reads latency (msec)'},
630 } # type: Dict[str, Any]
631 global _global_instance
632 _global_instance
= self
633 self
.metrics_thread
= MetricCollectionThread(_global_instance
)
634 self
.health_history
= HealthHistory(self
)
636 def _setup_static_metrics(self
) -> Dict
[str, Metric
]:
638 metrics
['health_status'] = Metric(
641 'Cluster health status'
643 metrics
['mon_quorum_status'] = Metric(
646 'Monitors in quorum',
649 metrics
['fs_metadata'] = Metric(
655 metrics
['mds_metadata'] = Metric(
661 metrics
['mon_metadata'] = Metric(
667 metrics
['mgr_metadata'] = Metric(
673 metrics
['mgr_status'] = Metric(
676 'MGR status (0=standby, 1=active)',
679 metrics
['mgr_module_status'] = Metric(
682 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
685 metrics
['mgr_module_can_run'] = Metric(
687 'mgr_module_can_run',
688 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
691 metrics
['osd_metadata'] = Metric(
698 # The reason for having this separate to OSD_METADATA is
699 # so that we can stably use the same tag names that
700 # the Prometheus node_exporter does
701 metrics
['disk_occupation'] = Metric(
704 'Associate Ceph daemon with disk used',
708 metrics
['disk_occupation_human'] = Metric(
710 'disk_occupation_human',
711 'Associate Ceph daemon with disk used for displaying to humans,'
712 ' not for joining tables (vector matching)',
713 DISK_OCCUPATION
, # label names are automatically decimated on grouping
716 metrics
['pool_metadata'] = Metric(
723 metrics
['rgw_metadata'] = Metric(
730 metrics
['rbd_mirror_metadata'] = Metric(
732 'rbd_mirror_metadata',
733 'RBD Mirror Metadata',
737 metrics
['pg_total'] = Metric(
740 'PG Total Count per Pool',
744 metrics
['health_detail'] = Metric(
747 'healthcheck status by type (0=inactive, 1=active)',
751 for flag
in OSD_FLAGS
:
752 path
= 'osd_flag_{}'.format(flag
)
753 metrics
[path
] = Metric(
756 'OSD Flag {}'.format(flag
)
758 for state
in OSD_STATUS
:
759 path
= 'osd_{}'.format(state
)
760 metrics
[path
] = Metric(
763 'OSD status {}'.format(state
),
766 for stat
in OSD_STATS
:
767 path
= 'osd_{}'.format(stat
)
768 metrics
[path
] = Metric(
771 'OSD stat {}'.format(stat
),
774 for stat
in OSD_POOL_STATS
:
775 path
= 'pool_{}'.format(stat
)
776 metrics
[path
] = Metric(
779 "OSD pool stats: {}".format(stat
),
782 for state
in PG_STATES
:
783 path
= 'pg_{}'.format(state
)
784 metrics
[path
] = Metric(
787 'PG {} per pool'.format(state
),
790 for state
in DF_CLUSTER
:
791 path
= 'cluster_{}'.format(state
)
792 metrics
[path
] = Metric(
795 'DF {}'.format(state
),
797 path
= 'cluster_by_class_{}'.format(state
)
798 metrics
[path
] = Metric(
801 'DF {}'.format(state
),
804 for state
in DF_POOL
:
805 path
= 'pool_{}'.format(state
)
806 metrics
[path
] = Metric(
807 'counter' if state
in ('rd', 'rd_bytes', 'wr', 'wr_bytes') else 'gauge',
809 'DF pool {}'.format(state
),
812 for state
in NUM_OBJECTS
:
813 path
= 'num_objects_{}'.format(state
)
814 metrics
[path
] = Metric(
817 'Number of {} objects'.format(state
),
820 for check
in HEALTH_CHECKS
:
821 path
= 'healthcheck_{}'.format(check
.name
.lower())
822 metrics
[path
] = Metric(
830 def get_server_addr(self
) -> str:
832 Return the current mgr server IP.
834 server_addr
= cast(str, self
.get_localized_module_option('server_addr', get_default_addr()))
835 if server_addr
in ['::', '0.0.0.0']:
836 return self
.get_mgr_ip()
839 def config_notify(self
) -> None:
841 This method is called whenever one of our config options is changed.
843 # https://stackoverflow.com/questions/7254845/change-cherrypy-port-and-restart-web-server
844 # if we omit the line: cherrypy.server.httpserver = None
845 # then the cherrypy server is not restarted correctly
846 self
.log
.info('Restarting engine...')
847 cherrypy
.engine
.stop()
848 cherrypy
.server
.httpserver
= None
849 server_port
= cast(int, self
.get_localized_module_option('server_port', DEFAULT_PORT
))
850 self
.set_uri(build_url(scheme
='http', host
=self
.get_server_addr(), port
=server_port
, path
='/'))
851 cherrypy
.config
.update({'server.socket_port': server_port
})
852 cherrypy
.engine
.start()
853 self
.log
.info('Engine started.')
856 def get_health(self
) -> None:
858 def _get_value(message
: str, delim
: str = ' ', word_pos
: int = 0) -> Tuple
[int, int]:
859 """Extract value from message (default is 1st field)"""
860 v_str
= message
.split(delim
)[word_pos
]
865 health
= json
.loads(self
.get('health')['json'])
867 self
.metrics
['health_status'].set(
868 health_status_to_number(health
['status'])
871 # Examine the health to see if any health checks triggered need to
872 # become a specific metric with a value from the health detail
873 active_healthchecks
= health
.get('checks', {})
874 active_names
= active_healthchecks
.keys()
876 for check
in HEALTH_CHECKS
:
877 path
= 'healthcheck_{}'.format(check
.name
.lower())
879 if path
in self
.metrics
:
881 if check
.name
in active_names
:
882 check_data
= active_healthchecks
[check
.name
]
883 message
= check_data
['summary'].get('message', '')
886 if check
.name
== "SLOW_OPS":
887 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have
889 v
, err
= _get_value(message
)
893 "healthcheck %s message format is incompatible and has been dropped",
895 # drop the metric, so it's no longer emitted
896 del self
.metrics
[path
]
899 self
.metrics
[path
].set(v
)
901 # health check is not active, so give it a default of 0
902 self
.metrics
[path
].set(0)
904 self
.health_history
.check(health
)
905 for name
, info
in self
.health_history
.healthcheck
.items():
906 v
= 1 if info
.active
else 0
907 self
.metrics
['health_detail'].set(
914 def get_pool_stats(self
) -> None:
915 # retrieve pool stats to provide per pool recovery metrics
916 # (osd_pool_stats moved to mgr in Mimic)
917 pstats
= self
.get('osd_pool_stats')
918 for pool
in pstats
['pool_stats']:
919 for stat
in OSD_POOL_STATS
:
920 self
.metrics
['pool_{}'.format(stat
)].set(
921 pool
['recovery_rate'].get(stat
, 0),
926 def get_df(self
) -> None:
927 # maybe get the to-be-exported metrics from a config?
929 for stat
in DF_CLUSTER
:
930 self
.metrics
['cluster_{}'.format(stat
)].set(df
['stats'][stat
])
931 for device_class
in df
['stats_by_class']:
932 self
.metrics
['cluster_by_class_{}'.format(stat
)].set(df
['stats_by_class'][device_class
][stat
], (device_class
,))
934 for pool
in df
['pools']:
936 self
.metrics
['pool_{}'.format(stat
)].set(
942 def get_fs(self
) -> None:
943 fs_map
= self
.get('fs_map')
944 servers
= self
.get_service_list()
945 self
.log
.debug('standbys: {}'.format(fs_map
['standbys']))
946 # export standby mds metadata, default standby fs_id is '-1'
947 for standby
in fs_map
['standbys']:
948 id_
= standby
['name']
949 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
950 addr
, rank
= standby
['addr'], standby
['rank']
951 self
.metrics
['mds_metadata'].set(1, (
952 'mds.{}'.format(id_
), '-1',
958 for fs
in fs_map
['filesystems']:
959 # collect fs metadata
960 data_pools
= ",".join([str(pool
)
961 for pool
in fs
['mdsmap']['data_pools']])
962 self
.metrics
['fs_metadata'].set(1, (
965 fs
['mdsmap']['metadata_pool'],
966 fs
['mdsmap']['fs_name']
968 self
.log
.debug('mdsmap: {}'.format(fs
['mdsmap']))
969 for gid
, daemon
in fs
['mdsmap']['info'].items():
971 host
, version
, _
= servers
.get((id_
, 'mds'), ('', '', ''))
972 self
.metrics
['mds_metadata'].set(1, (
973 'mds.{}'.format(id_
), fs
['id'],
974 host
, daemon
['addr'],
975 daemon
['rank'], version
979 def get_quorum_status(self
) -> None:
980 mon_status
= json
.loads(self
.get('mon_status')['json'])
981 servers
= self
.get_service_list()
982 for mon
in mon_status
['monmap']['mons']:
985 mon_version
= servers
.get((id_
, 'mon'), ('', '', ''))
986 self
.metrics
['mon_metadata'].set(1, (
987 'mon.{}'.format(id_
), mon_version
[0],
988 mon
['public_addr'].rsplit(':', 1)[0], rank
,
991 in_quorum
= int(rank
in mon_status
['quorum'])
992 self
.metrics
['mon_quorum_status'].set(in_quorum
, (
993 'mon.{}'.format(id_
),
997 def get_mgr_status(self
) -> None:
998 mgr_map
= self
.get('mgr_map')
999 servers
= self
.get_service_list()
1001 active
= mgr_map
['active_name']
1002 standbys
= [s
.get('name') for s
in mgr_map
['standbys']]
1004 all_mgrs
= list(standbys
)
1005 all_mgrs
.append(active
)
1007 all_modules
= {module
.get('name'): module
.get('can_run')
1008 for module
in mgr_map
['available_modules']}
1010 for mgr
in all_mgrs
:
1011 host
, version
, _
= servers
.get((mgr
, 'mgr'), ('', '', ''))
1017 self
.metrics
['mgr_metadata'].set(1, (
1018 f
'mgr.{mgr}', host
, version
1020 self
.metrics
['mgr_status'].set(_state
, (
1022 always_on_modules
= mgr_map
['always_on_modules'].get(self
.release_name
, [])
1023 active_modules
= list(always_on_modules
)
1024 active_modules
.extend(mgr_map
['modules'])
1026 for mod_name
in all_modules
.keys():
1028 if mod_name
in always_on_modules
:
1030 elif mod_name
in active_modules
:
1035 _can_run
= 1 if all_modules
[mod_name
] else 0
1036 self
.metrics
['mgr_module_status'].set(_state
, (mod_name
,))
1037 self
.metrics
['mgr_module_can_run'].set(_can_run
, (mod_name
,))
1040 def get_pg_status(self
) -> None:
1042 pg_summary
= self
.get('pg_summary')
1044 for pool
in pg_summary
['by_pool']:
1045 num_by_state
= defaultdict(int) # type: DefaultDict[str, int]
1047 for state_name
, count
in pg_summary
['by_pool'][pool
].items():
1048 for state
in state_name
.split('+'):
1049 num_by_state
[state
] += count
1050 num_by_state
['total'] += count
1052 for state
, num
in num_by_state
.items():
1054 self
.metrics
["pg_{}".format(state
)].set(num
, (pool
,))
1056 self
.log
.warning("skipping pg in unknown state {}".format(state
))
1059 def get_osd_stats(self
) -> None:
1060 osd_stats
= self
.get('osd_stats')
1061 for osd
in osd_stats
['osd_stats']:
1063 for stat
in OSD_STATS
:
1064 val
= osd
['perf_stat'][stat
]
1065 self
.metrics
['osd_{}'.format(stat
)].set(val
, (
1066 'osd.{}'.format(id_
),
1069 def get_service_list(self
) -> Dict
[Tuple
[str, str], Tuple
[str, str, str]]:
1071 for server
in self
.list_servers():
1072 host
= cast(str, server
.get('hostname', ''))
1073 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1074 ret
.update({(service
['id'], service
['type']): (
1075 host
, service
['ceph_version'], service
.get('name', ''))})
1079 def get_metadata_and_osd_status(self
) -> None:
1080 osd_map
= self
.get('osd_map')
1081 osd_flags
= osd_map
['flags'].split(',')
1082 for flag
in OSD_FLAGS
:
1083 self
.metrics
['osd_flag_{}'.format(flag
)].set(
1084 int(flag
in osd_flags
)
1087 osd_devices
= self
.get('osd_map_crush')['devices']
1088 servers
= self
.get_service_list()
1089 for osd
in osd_map
['osds']:
1090 # id can be used to link osd metrics and metadata
1092 # collect osd metadata
1093 p_addr
= osd
['public_addr'].rsplit(':', 1)[0]
1094 c_addr
= osd
['cluster_addr'].rsplit(':', 1)[0]
1095 if p_addr
== "-" or c_addr
== "-":
1097 "Missing address metadata for osd {0}, skipping occupation"
1098 " and metadata records for this osd".format(id_
)
1103 for osd_device
in osd_devices
:
1104 if osd_device
['id'] == id_
:
1105 dev_class
= osd_device
.get('class', '')
1108 if dev_class
is None:
1109 self
.log
.info("OSD {0} is missing from CRUSH map, "
1110 "skipping output".format(id_
))
1113 osd_version
= servers
.get((str(id_
), 'osd'), ('', '', ''))
1115 # collect disk occupation metadata
1116 osd_metadata
= self
.get_metadata("osd", str(id_
))
1117 if osd_metadata
is None:
1120 obj_store
= osd_metadata
.get('osd_objectstore', '')
1121 f_iface
= osd_metadata
.get('front_iface', '')
1122 b_iface
= osd_metadata
.get('back_iface', '')
1124 self
.metrics
['osd_metadata'].set(1, (
1126 'osd.{}'.format(id_
),
1136 # collect osd status
1137 for state
in OSD_STATUS
:
1139 self
.metrics
['osd_{}'.format(state
)].set(status
, (
1140 'osd.{}'.format(id_
),
1144 osd_wal_dev_node
= ''
1145 osd_db_dev_node
= ''
1146 if obj_store
== "filestore":
1147 # collect filestore backend device
1148 osd_dev_node
= osd_metadata
.get(
1149 'backend_filestore_dev_node', None)
1150 # collect filestore journal device
1151 osd_wal_dev_node
= osd_metadata
.get('osd_journal', '')
1152 osd_db_dev_node
= ''
1153 elif obj_store
== "bluestore":
1154 # collect bluestore backend device
1155 osd_dev_node
= osd_metadata
.get(
1156 'bluestore_bdev_dev_node', None)
1157 # collect bluestore wal backend
1158 osd_wal_dev_node
= osd_metadata
.get('bluefs_wal_dev_node', '')
1159 # collect bluestore db backend
1160 osd_db_dev_node
= osd_metadata
.get('bluefs_db_dev_node', '')
1161 if osd_dev_node
and osd_dev_node
== "unknown":
1164 # fetch the devices and ids (vendor, model, serial) from the
1166 osd_devs
= osd_metadata
.get('devices', '') or 'N/A'
1167 osd_dev_ids
= osd_metadata
.get('device_ids', '') or 'N/A'
1169 osd_hostname
= osd_metadata
.get('hostname', None)
1170 if osd_dev_node
and osd_hostname
:
1171 self
.log
.debug("Got dev for osd {0}: {1}/{2}".format(
1172 id_
, osd_hostname
, osd_dev_node
))
1173 self
.metrics
['disk_occupation'].set(1, (
1174 "osd.{0}".format(id_
),
1183 self
.log
.info("Missing dev node metadata for osd {0}, skipping "
1184 "occupation record for this osd".format(id_
))
1186 if 'disk_occupation' in self
.metrics
:
1188 self
.metrics
['disk_occupation_human'] = \
1189 self
.metrics
['disk_occupation'].group_by(
1190 ['device', 'instance'],
1191 {'ceph_daemon': lambda daemons
: ', '.join(daemons
)},
1192 name
='disk_occupation_human',
1194 except Exception as e
:
1197 ec_profiles
= osd_map
.get('erasure_code_profiles', {})
1199 def _get_pool_info(pool
: Dict
[str, Any
]) -> Tuple
[str, str]:
1200 pool_type
= 'unknown'
1201 description
= 'unknown'
1203 if pool
['type'] == 1:
1204 pool_type
= "replicated"
1205 description
= f
"replica:{pool['size']}"
1206 elif pool
['type'] == 3:
1207 pool_type
= "erasure"
1208 name
= pool
.get('erasure_code_profile', '')
1209 profile
= ec_profiles
.get(name
, {})
1211 description
= f
"ec:{profile['k']}+{profile['m']}"
1213 description
= "ec:unknown"
1215 return pool_type
, description
1217 for pool
in osd_map
['pools']:
1219 compression_mode
= 'none'
1220 pool_type
, pool_description
= _get_pool_info(pool
)
1222 if 'options' in pool
:
1223 compression_mode
= pool
['options'].get('compression_mode', 'none')
1225 self
.metrics
['pool_metadata'].set(
1234 # Populate other servers metadata
1235 for key
, value
in servers
.items():
1236 service_id
, service_type
= key
1237 if service_type
== 'rgw':
1238 hostname
, version
, name
= value
1239 self
.metrics
['rgw_metadata'].set(
1241 ('{}.{}'.format(service_type
, name
),
1242 hostname
, version
, service_id
)
1244 elif service_type
== 'rbd-mirror':
1245 mirror_metadata
= self
.get_metadata('rbd-mirror', service_id
)
1246 if mirror_metadata
is None:
1248 mirror_metadata
['ceph_daemon'] = '{}.{}'.format(service_type
,
1250 rbd_mirror_metadata
= cast(LabelValues
,
1251 (mirror_metadata
.get(k
, '')
1252 for k
in RBD_MIRROR_METADATA
))
1253 self
.metrics
['rbd_mirror_metadata'].set(
1254 1, rbd_mirror_metadata
1258 def get_num_objects(self
) -> None:
1259 pg_sum
= self
.get('pg_summary')['pg_stats_sum']['stat_sum']
1260 for obj
in NUM_OBJECTS
:
1261 stat
= 'num_objects_{}'.format(obj
)
1262 self
.metrics
[stat
].set(pg_sum
[stat
])
1265 def get_rbd_stats(self
) -> None:
1266 # Per RBD image stats is collected by registering a dynamic osd perf
1267 # stats query that tells OSDs to group stats for requests associated
1268 # with RBD objects by pool, namespace, and image id, which are
1269 # extracted from the request object names or other attributes.
1270 # The RBD object names have the following prefixes:
1271 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
1272 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
1273 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
1274 # The pool_id in the object name is the id of the pool with the image
1275 # metdata, and should be used in the image spec. If there is no pool_id
1276 # in the object name, the image pool is the pool where the object is
1279 # Parse rbd_stats_pools option, which is a comma or space separated
1280 # list of pool[/namespace] entries. If no namespace is specifed the
1281 # stats are collected for every namespace in the pool. The wildcard
1282 # '*' can be used to indicate all pools or namespaces
1283 pools_string
= cast(str, self
.get_localized_module_option('rbd_stats_pools'))
1285 for x
in re
.split(r
'[\s,]+', pools_string
):
1291 namespace_name
= None
1293 namespace_name
= s
[1]
1295 if pool_name
== "*":
1296 # collect for all pools
1297 osd_map
= self
.get('osd_map')
1298 for pool
in osd_map
['pools']:
1299 if 'rbd' not in pool
.get('application_metadata', {}):
1301 pool_keys
.append((pool
['pool_name'], namespace_name
))
1303 pool_keys
.append((pool_name
, namespace_name
))
1305 pools
= {} # type: Dict[str, Set[str]]
1306 for pool_key
in pool_keys
:
1307 pool_name
= pool_key
[0]
1308 namespace_name
= pool_key
[1]
1309 if not namespace_name
or namespace_name
== "*":
1310 # empty set means collect for all namespaces
1311 pools
[pool_name
] = set()
1314 if pool_name
not in pools
:
1315 pools
[pool_name
] = set()
1316 elif not pools
[pool_name
]:
1318 pools
[pool_name
].add(namespace_name
)
1320 rbd_stats_pools
= {}
1321 for pool_id
in self
.rbd_stats
['pools'].keys():
1322 name
= self
.rbd_stats
['pools'][pool_id
]['name']
1323 if name
not in pools
:
1324 del self
.rbd_stats
['pools'][pool_id
]
1326 rbd_stats_pools
[name
] = \
1327 self
.rbd_stats
['pools'][pool_id
]['ns_names']
1329 pools_refreshed
= False
1331 next_refresh
= self
.rbd_stats
['pools_refresh_time'] + \
1332 self
.get_localized_module_option(
1333 'rbd_stats_pools_refresh_interval', 300)
1334 if rbd_stats_pools
!= pools
or time
.time() >= next_refresh
:
1335 self
.refresh_rbd_stats_pools(pools
)
1336 pools_refreshed
= True
1338 pool_ids
= list(self
.rbd_stats
['pools'])
1340 pool_id_regex
= '^(' + '|'.join([str(x
) for x
in pool_ids
]) + ')$'
1343 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1344 if pool
['ns_names']:
1345 nspace_names
.extend(pool
['ns_names'])
1350 namespace_regex
= '^(' + \
1351 "|".join([re
.escape(x
)
1352 for x
in set(nspace_names
)]) + ')$'
1354 namespace_regex
= '^(.*)$'
1356 if ('query' in self
.rbd_stats
1357 and (pool_id_regex
!= self
.rbd_stats
['query']['key_descriptor'][0]['regex']
1358 or namespace_regex
!= self
.rbd_stats
['query']['key_descriptor'][1]['regex'])):
1359 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1360 del self
.rbd_stats
['query_id']
1361 del self
.rbd_stats
['query']
1363 if not self
.rbd_stats
['pools']:
1366 counters_info
= self
.rbd_stats
['counters_info']
1368 if 'query_id' not in self
.rbd_stats
:
1371 {'type': 'pool_id', 'regex': pool_id_regex
},
1372 {'type': 'namespace', 'regex': namespace_regex
},
1373 {'type': 'object_name',
1374 'regex': r
'^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
1376 'performance_counter_descriptors': list(counters_info
),
1378 query_id
= self
.add_osd_perf_query(query
)
1379 if query_id
is None:
1380 self
.log
.error('failed to add query %s' % query
)
1382 self
.rbd_stats
['query'] = query
1383 self
.rbd_stats
['query_id'] = query_id
1385 res
= self
.get_osd_perf_counters(self
.rbd_stats
['query_id'])
1387 for c
in res
['counters']:
1388 # if the pool id is not found in the object name use id of the
1389 # pool where the object is located
1391 pool_id
= int(c
['k'][2][0])
1393 pool_id
= int(c
['k'][0][0])
1394 if pool_id
not in self
.rbd_stats
['pools'] and not pools_refreshed
:
1395 self
.refresh_rbd_stats_pools(pools
)
1396 pools_refreshed
= True
1397 if pool_id
not in self
.rbd_stats
['pools']:
1399 pool
= self
.rbd_stats
['pools'][pool_id
]
1400 nspace_name
= c
['k'][1][0]
1401 if nspace_name
not in pool
['images']:
1403 image_id
= c
['k'][2][1]
1404 if image_id
not in pool
['images'][nspace_name
] and \
1405 not pools_refreshed
:
1406 self
.refresh_rbd_stats_pools(pools
)
1407 pool
= self
.rbd_stats
['pools'][pool_id
]
1408 pools_refreshed
= True
1409 if image_id
not in pool
['images'][nspace_name
]:
1411 counters
= pool
['images'][nspace_name
][image_id
]['c']
1412 for i
in range(len(c
['c'])):
1413 counters
[i
][0] += c
['c'][i
][0]
1414 counters
[i
][1] += c
['c'][i
][1]
1416 label_names
= ("pool", "namespace", "image")
1417 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
1418 pool_name
= pool
['name']
1419 for nspace_name
, images
in pool
['images'].items():
1420 for image_id
in images
:
1421 image_name
= images
[image_id
]['n']
1422 counters
= images
[image_id
]['c']
1424 for key
in counters_info
:
1425 counter_info
= counters_info
[key
]
1426 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1427 labels
= (pool_name
, nspace_name
, image_name
)
1428 if counter_info
['type'] == self
.PERFCOUNTER_COUNTER
:
1430 if path
not in self
.metrics
:
1431 self
.metrics
[path
] = Metric(
1434 counter_info
['desc'],
1437 self
.metrics
[path
].set(counters
[i
][0], labels
)
1438 elif counter_info
['type'] == self
.PERFCOUNTER_LONGRUNAVG
:
1439 path
= 'rbd_' + key
+ '_sum'
1440 if path
not in self
.metrics
:
1441 self
.metrics
[path
] = Metric(
1444 counter_info
['desc'] + ' Total',
1447 self
.metrics
[path
].set(counters
[i
][0], labels
)
1448 path
= 'rbd_' + key
+ '_count'
1449 if path
not in self
.metrics
:
1450 self
.metrics
[path
] = Metric(
1453 counter_info
['desc'] + ' Count',
1456 self
.metrics
[path
].set(counters
[i
][1], labels
)
1459 def refresh_rbd_stats_pools(self
, pools
: Dict
[str, Set
[str]]) -> None:
1460 self
.log
.debug('refreshing rbd pools %s' % (pools
))
1463 counters_info
= self
.rbd_stats
['counters_info']
1464 for pool_name
, cfg_ns_names
in pools
.items():
1466 pool_id
= self
.rados
.pool_lookup(pool_name
)
1467 with self
.rados
.open_ioctx(pool_name
) as ioctx
:
1468 if pool_id
not in self
.rbd_stats
['pools']:
1469 self
.rbd_stats
['pools'][pool_id
] = {'images': {}}
1470 pool
= self
.rbd_stats
['pools'][pool_id
]
1471 pool
['name'] = pool_name
1472 pool
['ns_names'] = cfg_ns_names
1474 nspace_names
= list(cfg_ns_names
)
1476 nspace_names
= [''] + rbd
.namespace_list(ioctx
)
1477 for nspace_name
in pool
['images']:
1478 if nspace_name
not in nspace_names
:
1479 del pool
['images'][nspace_name
]
1480 for nspace_name
in nspace_names
:
1482 not rbd
.namespace_exists(ioctx
, nspace_name
):
1483 self
.log
.debug('unknown namespace %s for pool %s' %
1484 (nspace_name
, pool_name
))
1486 ioctx
.set_namespace(nspace_name
)
1487 if nspace_name
not in pool
['images']:
1488 pool
['images'][nspace_name
] = {}
1489 namespace
= pool
['images'][nspace_name
]
1491 for image_meta
in RBD().list2(ioctx
):
1492 image
= {'n': image_meta
['name']}
1493 image_id
= image_meta
['id']
1494 if image_id
in namespace
:
1495 image
['c'] = namespace
[image_id
]['c']
1497 image
['c'] = [[0, 0] for x
in counters_info
]
1498 images
[image_id
] = image
1499 pool
['images'][nspace_name
] = images
1500 except Exception as e
:
1501 self
.log
.error('failed listing pool %s: %s' % (pool_name
, e
))
1502 self
.rbd_stats
['pools_refresh_time'] = time
.time()
1504 def shutdown_rbd_stats(self
) -> None:
1505 if 'query_id' in self
.rbd_stats
:
1506 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1507 del self
.rbd_stats
['query_id']
1508 del self
.rbd_stats
['query']
1509 self
.rbd_stats
['pools'].clear()
1511 def add_fixed_name_metrics(self
) -> None:
1513 Add fixed name metrics from existing ones that have details in their names
1514 that should be in labels (not in name).
1515 For backward compatibility, a new fixed name metric is created (instead of replacing)
1516 and details are put in new labels.
1517 Intended for RGW sync perf. counters but extendable as required.
1518 See: https://tracker.ceph.com/issues/45311
1521 for metric_path
, metrics
in self
.metrics
.items():
1522 # Address RGW sync perf. counters.
1523 match
= re
.search(r
'^data-sync-from-(.*)\.', metric_path
)
1525 new_path
= re
.sub('from-([^.]*)', 'from-zone', metric_path
)
1526 if new_path
not in new_metrics
:
1527 new_metrics
[new_path
] = Metric(
1531 cast(LabelValues
, metrics
.labelnames
) + ('source_zone',)
1533 for label_values
, value
in metrics
.value
.items():
1534 new_metrics
[new_path
].set(value
, label_values
+ (match
.group(1),))
1536 self
.metrics
.update(new_metrics
)
1538 def get_collect_time_metrics(self
) -> None:
1539 sum_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_sum')
1540 count_metric
= self
.metrics
.get('prometheus_collect_duration_seconds_count')
1541 if sum_metric
is None:
1542 sum_metric
= MetricCounter(
1543 'prometheus_collect_duration_seconds_sum',
1544 'The sum of seconds took to collect all metrics of this exporter',
1546 self
.metrics
['prometheus_collect_duration_seconds_sum'] = sum_metric
1547 if count_metric
is None:
1548 count_metric
= MetricCounter(
1549 'prometheus_collect_duration_seconds_count',
1550 'The amount of metrics gathered for this exporter',
1552 self
.metrics
['prometheus_collect_duration_seconds_count'] = count_metric
1554 # Collect all timing data and make it available as metric, excluding the
1555 # `collect` method because it has not finished at this point and hence
1556 # there's no `_execution_duration` attribute to be found. The
1557 # `_execution_duration` attribute is added by the `profile_method`
1559 for method_name
, method
in Module
.__dict
__.items():
1560 duration
= getattr(method
, '_execution_duration', None)
1561 if duration
is not None:
1562 cast(MetricCounter
, sum_metric
).add(duration
, (method_name
,))
1563 cast(MetricCounter
, count_metric
).add(1, (method_name
,))
1565 @profile_method(True)
1566 def collect(self
) -> str:
1567 # Clear the metrics before scraping
1568 for k
in self
.metrics
.keys():
1569 self
.metrics
[k
].clear()
1573 self
.get_pool_stats()
1575 self
.get_osd_stats()
1576 self
.get_quorum_status()
1577 self
.get_mgr_status()
1578 self
.get_metadata_and_osd_status()
1579 self
.get_pg_status()
1580 self
.get_num_objects()
1582 for daemon
, counters
in self
.get_all_perf_counters().items():
1583 for path
, counter_info
in counters
.items():
1584 # Skip histograms, they are represented by long running avgs
1585 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1586 if not stattype
or stattype
== 'histogram':
1587 self
.log
.debug('ignoring %s, type %s' % (path
, stattype
))
1590 path
, label_names
, labels
= self
._perfpath
_to
_path
_labels
(
1593 # Get the value of the counter
1594 value
= self
._perfvalue
_to
_value
(
1595 counter_info
['type'], counter_info
['value'])
1597 # Represent the long running avgs as sum/count pairs
1598 if counter_info
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1599 _path
= path
+ '_sum'
1600 if _path
not in self
.metrics
:
1601 self
.metrics
[_path
] = Metric(
1604 counter_info
['description'] + ' Total',
1607 self
.metrics
[_path
].set(value
, labels
)
1609 _path
= path
+ '_count'
1610 if _path
not in self
.metrics
:
1611 self
.metrics
[_path
] = Metric(
1614 counter_info
['description'] + ' Count',
1617 self
.metrics
[_path
].set(counter_info
['count'], labels
,)
1619 if path
not in self
.metrics
:
1620 self
.metrics
[path
] = Metric(
1623 counter_info
['description'],
1626 self
.metrics
[path
].set(value
, labels
)
1628 self
.add_fixed_name_metrics()
1629 self
.get_rbd_stats()
1631 self
.get_collect_time_metrics()
1633 # Return formatted metrics and clear no longer used data
1634 _metrics
= [m
.str_expfmt() for m
in self
.metrics
.values()]
1635 for k
in self
.metrics
.keys():
1636 self
.metrics
[k
].clear()
1638 return ''.join(_metrics
) + '\n'
1640 @CLIReadCommand('prometheus file_sd_config')
1641 def get_file_sd_config(self
) -> Tuple
[int, str, str]:
1643 Return file_sd compatible prometheus config for mgr cluster
1645 servers
= self
.list_servers()
1647 for server
in servers
:
1648 hostname
= server
.get('hostname', '')
1649 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
1650 if service
['type'] != 'mgr':
1653 port
= self
._get
_module
_option
('server_port', DEFAULT_PORT
, id_
)
1654 targets
.append(f
'{hostname}:{port}')
1661 return 0, json
.dumps(ret
), ""
1663 def self_test(self
) -> None:
1665 self
.get_file_sd_config()
1667 def serve(self
) -> None:
1671 # collapse everything to '/'
1672 def _cp_dispatch(self
, vpath
: str) -> 'Root':
1673 cherrypy
.request
.path
= ''
1677 def index(self
) -> str:
1678 return '''<!DOCTYPE html>
1680 <head><title>Ceph Exporter</title></head>
1682 <h1>Ceph Exporter</h1>
1683 <p><a href='/metrics'>Metrics</a></p>
1688 def metrics(self
) -> Optional
[str]:
1689 # Lock the function execution
1690 assert isinstance(_global_instance
, Module
)
1691 with _global_instance
.collect_lock
:
1692 return self
._metrics
(_global_instance
)
1695 def _metrics(instance
: 'Module') -> Optional
[str]:
1697 self
.log
.debug('Cache disabled, collecting and returning without cache')
1698 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1699 return self
.collect()
1701 # Return cached data if available
1702 if not instance
.collect_cache
:
1703 raise cherrypy
.HTTPError(503, 'No cached data available yet')
1705 def respond() -> Optional
[str]:
1706 assert isinstance(instance
, Module
)
1707 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1708 return instance
.collect_cache
1710 if instance
.collect_time
< instance
.scrape_interval
:
1711 # Respond if cache isn't stale
1714 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_RETURN
:
1715 # Respond even if cache is stale
1717 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1718 'returning metrics from stale cache.'.format(
1719 instance
.collect_time
,
1720 instance
.collect_time
- instance
.scrape_interval
1725 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_FAIL
:
1726 # Fail if cache is stale
1728 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1729 'returning "service unavailable".'.format(
1730 instance
.collect_time
,
1731 instance
.collect_time
- instance
.scrape_interval
,
1734 instance
.log
.error(msg
)
1735 raise cherrypy
.HTTPError(503, msg
)
1738 # Make the cache timeout for collecting configurable
1739 self
.scrape_interval
= cast(float, self
.get_localized_module_option('scrape_interval'))
1741 self
.stale_cache_strategy
= cast(
1742 str, self
.get_localized_module_option('stale_cache_strategy'))
1743 if self
.stale_cache_strategy
not in [self
.STALE_CACHE_FAIL
,
1744 self
.STALE_CACHE_RETURN
]:
1745 self
.stale_cache_strategy
= self
.STALE_CACHE_FAIL
1747 server_addr
= cast(str, self
.get_localized_module_option(
1748 'server_addr', get_default_addr()))
1749 server_port
= cast(int, self
.get_localized_module_option(
1750 'server_port', DEFAULT_PORT
))
1752 "server_addr: %s server_port: %s" %
1753 (server_addr
, server_port
)
1756 self
.cache
= cast(bool, self
.get_localized_module_option('cache', True))
1758 self
.log
.info('Cache enabled')
1759 self
.metrics_thread
.start()
1761 self
.log
.info('Cache disabled')
1763 cherrypy
.config
.update({
1764 'server.socket_host': server_addr
,
1765 'server.socket_port': server_port
,
1766 'engine.autoreload.on': False
1768 # Publish the URI that others may use to access the service we're
1769 # about to start serving
1770 self
.set_uri(build_url(scheme
='http', host
=self
.get_server_addr(), port
=server_port
, path
='/'))
1772 cherrypy
.tree
.mount(Root(), "/")
1773 self
.log
.info('Starting engine...')
1774 cherrypy
.engine
.start()
1775 self
.log
.info('Engine started.')
1776 # wait for the shutdown event
1777 self
.shutdown_event
.wait()
1778 self
.shutdown_event
.clear()
1779 # tell metrics collection thread to stop collecting new metrics
1780 self
.metrics_thread
.stop()
1781 cherrypy
.engine
.stop()
1782 cherrypy
.server
.httpserver
= None
1783 self
.log
.info('Engine stopped.')
1784 self
.shutdown_rbd_stats()
1785 # wait for the metrics collection thread to stop
1786 self
.metrics_thread
.join()
1788 def shutdown(self
) -> None:
1789 self
.log
.info('Stopping engine...')
1790 self
.shutdown_event
.set()
1792 @CLIReadCommand('healthcheck history ls')
1793 def _list_healthchecks(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1794 """List all the healthchecks being tracked
1796 The format options are parsed in ceph_argparse, before they get evaluated here so
1797 we can safely assume that what we have to process is valid. ceph_argparse will throw
1798 a ValueError if the cast to our Format class fails.
1801 format (Format, optional): output format. Defaults to Format.plain.
1804 HandleCommandResult: return code, stdout and stderr returned to the caller
1808 if format
== Format
.plain
:
1809 out
= str(self
.health_history
)
1810 elif format
== Format
.yaml
:
1811 out
= self
.health_history
.as_yaml()
1813 out
= self
.health_history
.as_json(format
== Format
.json_pretty
)
1815 return HandleCommandResult(retval
=0, stdout
=out
)
1817 @CLIWriteCommand('healthcheck history clear')
1818 def _clear_healthchecks(self
) -> HandleCommandResult
:
1819 """Clear the healthcheck history"""
1820 self
.health_history
.reset()
1821 return HandleCommandResult(retval
=0, stdout
="healthcheck history cleared")
1824 class StandbyModule(MgrStandbyModule
):
1826 MODULE_OPTIONS
= Module
.MODULE_OPTIONS
1828 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
1829 super(StandbyModule
, self
).__init
__(*args
, **kwargs
)
1830 self
.shutdown_event
= threading
.Event()
1832 def serve(self
) -> None:
1833 server_addr
= self
.get_localized_module_option(
1834 'server_addr', get_default_addr())
1835 server_port
= self
.get_localized_module_option(
1836 'server_port', DEFAULT_PORT
)
1837 self
.log
.info("server_addr: %s server_port: %s" %
1838 (server_addr
, server_port
))
1839 cherrypy
.config
.update({
1840 'server.socket_host': server_addr
,
1841 'server.socket_port': server_port
,
1842 'engine.autoreload.on': False,
1843 'request.show_tracebacks': False
1850 def index(self
) -> str:
1851 standby_behaviour
= module
.get_module_option('standby_behaviour')
1852 if standby_behaviour
== 'default':
1853 active_uri
= module
.get_active_uri()
1854 return '''<!DOCTYPE html>
1856 <head><title>Ceph Exporter</title></head>
1858 <h1>Ceph Exporter</h1>
1859 <p><a href='{}metrics'>Metrics</a></p>
1861 </html>'''.format(active_uri
)
1863 status
= module
.get_module_option('standby_error_status_code')
1864 raise cherrypy
.HTTPError(status
, message
="Keep on looking")
1867 def metrics(self
) -> str:
1868 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1871 cherrypy
.tree
.mount(Root(), '/', {})
1872 self
.log
.info('Starting engine...')
1873 cherrypy
.engine
.start()
1874 self
.log
.info('Engine started.')
1875 # Wait for shutdown event
1876 self
.shutdown_event
.wait()
1877 self
.shutdown_event
.clear()
1878 cherrypy
.engine
.stop()
1879 cherrypy
.server
.httpserver
= None
1880 self
.log
.info('Engine stopped.')
1882 def shutdown(self
) -> None:
1883 self
.log
.info("Stopping engine...")
1884 self
.shutdown_event
.set()
1885 self
.log
.info("Stopped engine")