]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 import yaml
3 from collections import defaultdict
4 from pkg_resources import packaging # type: ignore
5 import json
6 import math
7 import os
8 import re
9 import threading
10 import time
11 import enum
12 from collections import namedtuple
13
14 from mgr_module import CLIReadCommand, MgrModule, MgrStandbyModule, PG_STATES, Option, ServiceInfoT, HandleCommandResult, CLIWriteCommand
15 from mgr_util import get_default_addr, profile_method, build_url
16 from rbd import RBD
17
18 from typing import DefaultDict, Optional, Dict, Any, Set, cast, Tuple, Union, List, Callable
19
20 LabelValues = Tuple[str, ...]
21 Number = Union[int, float]
22 MetricValue = Dict[LabelValues, Number]
23
24 # Defaults for the Prometheus HTTP server. Can also set in config-key
25 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
26 # for Prometheus exporter port registry
27
28 DEFAULT_PORT = 9283
29
30 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
31 # that the ports its listening on are in fact bound. When using the any address
32 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
33 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
34 # exception.
35 if cherrypy is not None:
36 Version = packaging.version.Version
37 v = Version(cherrypy.__version__)
38 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
39 # centos:7) and back to at least 3.0.0.
40 if Version("3.1.2") <= v < Version("3.2.3"):
41 # https://github.com/cherrypy/cherrypy/issues/1100
42 from cherrypy.process import servers
43 servers.wait_for_occupied_port = lambda host, port: None
44
45
46 # cherrypy likes to sys.exit on error. don't let it take us down too!
47 def os_exit_noop(status: int) -> None:
48 pass
49
50
51 os._exit = os_exit_noop # type: ignore
52
53 # to access things in class Module from subclass Root. Because
54 # it's a dict, the writer doesn't need to declare 'global' for access
55
56 _global_instance = None # type: Optional[Module]
57 cherrypy.config.update({
58 'response.headers.server': 'Ceph-Prometheus'
59 })
60
61
62 def health_status_to_number(status: str) -> int:
63 if status == 'HEALTH_OK':
64 return 0
65 elif status == 'HEALTH_WARN':
66 return 1
67 elif status == 'HEALTH_ERR':
68 return 2
69 raise ValueError(f'unknown status "{status}"')
70
71
72 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
73
74 OSD_BLOCKLIST = ['osd_blocklist_count']
75
76 DF_POOL = ['max_avail', 'avail_raw', 'stored', 'stored_raw', 'objects', 'dirty',
77 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
78 'compress_bytes_used', 'compress_under_bytes', 'bytes_used', 'percent_used']
79
80 OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
81 'recovering_keys_per_sec', 'num_objects_recovered',
82 'num_bytes_recovered', 'num_bytes_recovered')
83
84 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
85 'norecover', 'noscrub', 'nodeep-scrub')
86
87 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
88
89 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
90 'ceph_version')
91
92 MON_METADATA = ('ceph_daemon', 'hostname',
93 'public_addr', 'rank', 'ceph_version')
94
95 MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
96
97 MGR_STATUS = ('ceph_daemon',)
98
99 MGR_MODULE_STATUS = ('name',)
100
101 MGR_MODULE_CAN_RUN = ('name',)
102
103 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
104 'front_iface', 'hostname', 'objectstore', 'public_addr',
105 'ceph_version')
106
107 OSD_STATUS = ['weight', 'up', 'in']
108
109 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
110
111 POOL_METADATA = ('pool_id', 'name', 'type', 'description', 'compression_mode')
112
113 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version', 'instance_id')
114
115 RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
116 'ceph_version')
117
118 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
119 'wal_device', 'instance', 'devices', 'device_ids')
120
121 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
122
123 alert_metric = namedtuple('alert_metric', 'name description')
124 HEALTH_CHECKS = [
125 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process'),
126 ]
127
128 HEALTHCHECK_DETAIL = ('name', 'severity')
129
130
131 class Severity(enum.Enum):
132 ok = "HEALTH_OK"
133 warn = "HEALTH_WARN"
134 error = "HEALTH_ERR"
135
136
137 class Format(enum.Enum):
138 plain = 'plain'
139 json = 'json'
140 json_pretty = 'json-pretty'
141 yaml = 'yaml'
142
143
144 class HealthCheckEvent:
145
146 def __init__(self, name: str, severity: Severity, first_seen: float, last_seen: float, count: int, active: bool = True):
147 self.name = name
148 self.severity = severity
149 self.first_seen = first_seen
150 self.last_seen = last_seen
151 self.count = count
152 self.active = active
153
154 def as_dict(self) -> Dict[str, Any]:
155 """Return the instance as a dictionary."""
156 return self.__dict__
157
158
159 class HealthHistory:
160 kv_name = 'health_history'
161 titles = "{healthcheck_name:<24} {first_seen:<20} {last_seen:<20} {count:>5} {active:^6}"
162 date_format = "%Y/%m/%d %H:%M:%S"
163
164 def __init__(self, mgr: MgrModule):
165 self.mgr = mgr
166 self.lock = threading.Lock()
167 self.healthcheck: Dict[str, HealthCheckEvent] = {}
168 self._load()
169
170 def _load(self) -> None:
171 """Load the current state from the mons KV store."""
172 data = self.mgr.get_store(self.kv_name)
173 if data:
174 try:
175 healthcheck_data = json.loads(data)
176 except json.JSONDecodeError:
177 self.mgr.log.warn(
178 f"INVALID data read from mgr/prometheus/{self.kv_name}. Resetting")
179 self.reset()
180 return
181 else:
182 for k, v in healthcheck_data.items():
183 self.healthcheck[k] = HealthCheckEvent(
184 name=k,
185 severity=v.get('severity'),
186 first_seen=v.get('first_seen', 0),
187 last_seen=v.get('last_seen', 0),
188 count=v.get('count', 1),
189 active=v.get('active', True))
190 else:
191 self.reset()
192
193 def reset(self) -> None:
194 """Reset the healthcheck history."""
195 with self.lock:
196 self.mgr.set_store(self.kv_name, "{}")
197 self.healthcheck = {}
198
199 def save(self) -> None:
200 """Save the current in-memory healthcheck history to the KV store."""
201 with self.lock:
202 self.mgr.set_store(self.kv_name, self.as_json())
203
204 def check(self, health_checks: Dict[str, Any]) -> None:
205 """Look at the current health checks and compare existing the history.
206
207 Args:
208 health_checks (Dict[str, Any]): current health check data
209 """
210
211 current_checks = health_checks.get('checks', {})
212 changes_made = False
213
214 # first turn off any active states we're tracking
215 for seen_check in self.healthcheck:
216 check = self.healthcheck[seen_check]
217 if check.active and seen_check not in current_checks:
218 check.active = False
219 changes_made = True
220
221 # now look for any additions to track
222 now = time.time()
223 for name, info in current_checks.items():
224 if name not in self.healthcheck:
225 # this healthcheck is new, so start tracking it
226 changes_made = True
227 self.healthcheck[name] = HealthCheckEvent(
228 name=name,
229 severity=info.get('severity'),
230 first_seen=now,
231 last_seen=now,
232 count=1,
233 active=True
234 )
235 else:
236 # seen it before, so update its metadata
237 check = self.healthcheck[name]
238 if check.active:
239 # check has been registered as active already, so skip
240 continue
241 else:
242 check.last_seen = now
243 check.count += 1
244 check.active = True
245 changes_made = True
246
247 if changes_made:
248 self.save()
249
250 def __str__(self) -> str:
251 """Print the healthcheck history.
252
253 Returns:
254 str: Human readable representation of the healthcheck history
255 """
256 out = []
257
258 if len(self.healthcheck.keys()) == 0:
259 out.append("No healthchecks have been recorded")
260 else:
261 out.append(self.titles.format(
262 healthcheck_name="Healthcheck Name",
263 first_seen="First Seen (UTC)",
264 last_seen="Last seen (UTC)",
265 count="Count",
266 active="Active")
267 )
268 for k in sorted(self.healthcheck.keys()):
269 check = self.healthcheck[k]
270 out.append(self.titles.format(
271 healthcheck_name=check.name,
272 first_seen=time.strftime(self.date_format, time.localtime(check.first_seen)),
273 last_seen=time.strftime(self.date_format, time.localtime(check.last_seen)),
274 count=check.count,
275 active="Yes" if check.active else "No")
276 )
277 out.extend([f"{len(self.healthcheck)} health check(s) listed", ""])
278
279 return "\n".join(out)
280
281 def as_dict(self) -> Dict[str, Any]:
282 """Return the history in a dictionary.
283
284 Returns:
285 Dict[str, Any]: dictionary indexed by the healthcheck name
286 """
287 return {name: self.healthcheck[name].as_dict() for name in self.healthcheck}
288
289 def as_json(self, pretty: bool = False) -> str:
290 """Return the healthcheck history object as a dict (JSON).
291
292 Args:
293 pretty (bool, optional): whether to json pretty print the history. Defaults to False.
294
295 Returns:
296 str: str representation of the healthcheck in JSON format
297 """
298 if pretty:
299 return json.dumps(self.as_dict(), indent=2)
300 else:
301 return json.dumps(self.as_dict())
302
303 def as_yaml(self) -> str:
304 """Return the healthcheck history in yaml format.
305
306 Returns:
307 str: YAML representation of the healthcheck history
308 """
309 return yaml.safe_dump(self.as_dict(), explicit_start=True, default_flow_style=False)
310
311
312 class Metric(object):
313 def __init__(self, mtype: str, name: str, desc: str, labels: Optional[LabelValues] = None) -> None:
314 self.mtype = mtype
315 self.name = name
316 self.desc = desc
317 self.labelnames = labels # tuple if present
318 self.value: Dict[LabelValues, Number] = {}
319
320 def clear(self) -> None:
321 self.value = {}
322
323 def set(self, value: Number, labelvalues: Optional[LabelValues] = None) -> None:
324 # labelvalues must be a tuple
325 labelvalues = labelvalues or ('',)
326 self.value[labelvalues] = value
327
328 def str_expfmt(self) -> str:
329
330 # Must be kept in sync with promethize() in src/exporter/util.cc
331 def promethize(path: str) -> str:
332 ''' replace illegal metric name characters '''
333 result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
334
335 # Hyphens usually turn into underscores, unless they are
336 # trailing
337 if result.endswith("-"):
338 result = result[0:-1] + "_minus"
339 else:
340 result = result.replace("-", "_")
341
342 return "ceph_{0}".format(result)
343
344 def floatstr(value: float) -> str:
345 ''' represent as Go-compatible float '''
346 if value == float('inf'):
347 return '+Inf'
348 if value == float('-inf'):
349 return '-Inf'
350 if math.isnan(value):
351 return 'NaN'
352 return repr(float(value))
353
354 name = promethize(self.name)
355 expfmt = '''
356 # HELP {name} {desc}
357 # TYPE {name} {mtype}'''.format(
358 name=name,
359 desc=self.desc,
360 mtype=self.mtype,
361 )
362
363 for labelvalues, value in self.value.items():
364 if self.labelnames:
365 labels_list = zip(self.labelnames, labelvalues)
366 labels = ','.join('%s="%s"' % (k, v) for k, v in labels_list)
367 else:
368 labels = ''
369 if labels:
370 fmtstr = '\n{name}{{{labels}}} {value}'
371 else:
372 fmtstr = '\n{name} {value}'
373 expfmt += fmtstr.format(
374 name=name,
375 labels=labels,
376 value=floatstr(value),
377 )
378 return expfmt
379
380 def group_by(
381 self,
382 keys: List[str],
383 joins: Dict[str, Callable[[List[str]], str]],
384 name: Optional[str] = None,
385 ) -> "Metric":
386 """
387 Groups data by label names.
388
389 Label names not passed are being removed from the resulting metric but
390 by providing a join function, labels of metrics can be grouped.
391
392 The purpose of this method is to provide a version of a metric that can
393 be used in matching where otherwise multiple results would be returned.
394
395 As grouping is possible in Prometheus, the only additional value of this
396 method is the possibility to join labels when grouping. For that reason,
397 passing joins is required. Please use PromQL expressions in all other
398 cases.
399
400 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
401 >>> m.value = {
402 ... ('foo', 'x'): 1,
403 ... ('foo', 'y'): 1,
404 ... }
405 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
406 {('foo', 'x,y'): 1}
407
408 The functionality of group by could roughly be compared with Prometheus'
409
410 group (ceph_disk_occupation) by (device, instance)
411
412 with the exception that not all labels which aren't used as a condition
413 to group a metric are discarded, but their values can are joined and the
414 label is thereby preserved.
415
416 This function takes the value of the first entry of a found group to be
417 used for the resulting value of the grouping operation.
418
419 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
420 >>> m.value = {
421 ... ('foo', 'x'): 555,
422 ... ('foo', 'y'): 10,
423 ... }
424 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
425 {('foo', 'x,y'): 555}
426 """
427 assert self.labelnames, "cannot match keys without label names"
428 for key in keys:
429 assert key in self.labelnames, "unknown key: {}".format(key)
430 assert joins, "joins must not be empty"
431 assert all(callable(c) for c in joins.values()), "joins must be callable"
432
433 # group
434 grouped: Dict[LabelValues, List[Tuple[Dict[str, str], Number]]] = defaultdict(list)
435 for label_values, metric_value in self.value.items():
436 labels = dict(zip(self.labelnames, label_values))
437 if not all(k in labels for k in keys):
438 continue
439 group_key = tuple(labels[k] for k in keys)
440 grouped[group_key].append((labels, metric_value))
441
442 # as there is nothing specified on how to join labels that are not equal
443 # and Prometheus `group` aggregation functions similarly, we simply drop
444 # those labels.
445 labelnames = tuple(
446 label for label in self.labelnames if label in keys or label in joins
447 )
448 superfluous_labelnames = [
449 label for label in self.labelnames if label not in labelnames
450 ]
451
452 # iterate and convert groups with more than one member into a single
453 # entry
454 values: MetricValue = {}
455 for group in grouped.values():
456 labels, metric_value = group[0]
457
458 for label in superfluous_labelnames:
459 del labels[label]
460
461 if len(group) > 1:
462 for key, fn in joins.items():
463 labels[key] = fn(list(labels[key] for labels, _ in group))
464
465 values[tuple(labels.values())] = metric_value
466
467 new_metric = Metric(self.mtype, name if name else self.name, self.desc, labelnames)
468 new_metric.value = values
469
470 return new_metric
471
472
473 class MetricCounter(Metric):
474 def __init__(self,
475 name: str,
476 desc: str,
477 labels: Optional[LabelValues] = None) -> None:
478 super(MetricCounter, self).__init__('counter', name, desc, labels)
479 self.value = defaultdict(lambda: 0)
480
481 def clear(self) -> None:
482 pass # Skip calls to clear as we want to keep the counters here.
483
484 def set(self,
485 value: Number,
486 labelvalues: Optional[LabelValues] = None) -> None:
487 msg = 'This method must not be used for instances of MetricCounter class'
488 raise NotImplementedError(msg)
489
490 def add(self,
491 value: Number,
492 labelvalues: Optional[LabelValues] = None) -> None:
493 # labelvalues must be a tuple
494 labelvalues = labelvalues or ('',)
495 self.value[labelvalues] += value
496
497
498 class MetricCollectionThread(threading.Thread):
499 def __init__(self, module: 'Module') -> None:
500 self.mod = module
501 self.active = True
502 self.event = threading.Event()
503 super(MetricCollectionThread, self).__init__(target=self.collect)
504
505 def collect(self) -> None:
506 self.mod.log.info('starting metric collection thread')
507 while self.active:
508 self.mod.log.debug('collecting cache in thread')
509 if self.mod.have_mon_connection():
510 start_time = time.time()
511
512 try:
513 data = self.mod.collect()
514 except Exception:
515 # Log any issues encountered during the data collection and continue
516 self.mod.log.exception("failed to collect metrics:")
517 self.event.wait(self.mod.scrape_interval)
518 continue
519
520 duration = time.time() - start_time
521 self.mod.log.debug('collecting cache in thread done')
522
523 sleep_time = self.mod.scrape_interval - duration
524 if sleep_time < 0:
525 self.mod.log.warning(
526 'Collecting data took more time than configured scrape interval. '
527 'This possibly results in stale data. Please check the '
528 '`stale_cache_strategy` configuration option. '
529 'Collecting data took {:.2f} seconds but scrape interval is configured '
530 'to be {:.0f} seconds.'.format(
531 duration,
532 self.mod.scrape_interval,
533 )
534 )
535 sleep_time = 0
536
537 with self.mod.collect_lock:
538 self.mod.collect_cache = data
539 self.mod.collect_time = duration
540
541 self.event.wait(sleep_time)
542 else:
543 self.mod.log.error('No MON connection')
544 self.event.wait(self.mod.scrape_interval)
545
546 def stop(self) -> None:
547 self.active = False
548 self.event.set()
549
550
551 class Module(MgrModule):
552 MODULE_OPTIONS = [
553 Option(
554 'server_addr',
555 default=get_default_addr(),
556 desc='the IPv4 or IPv6 address on which the module listens for HTTP requests',
557 ),
558 Option(
559 'server_port',
560 type='int',
561 default=DEFAULT_PORT,
562 desc='the port on which the module listens for HTTP requests',
563 runtime=True
564 ),
565 Option(
566 'scrape_interval',
567 type='float',
568 default=15.0
569 ),
570 Option(
571 'stale_cache_strategy',
572 default='log'
573 ),
574 Option(
575 'cache',
576 type='bool',
577 default=True,
578 ),
579 Option(
580 'rbd_stats_pools',
581 default=''
582 ),
583 Option(
584 name='rbd_stats_pools_refresh_interval',
585 type='int',
586 default=300
587 ),
588 Option(
589 name='standby_behaviour',
590 type='str',
591 default='default',
592 enum_allowed=['default', 'error'],
593 runtime=True
594 ),
595 Option(
596 name='standby_error_status_code',
597 type='int',
598 default=500,
599 min=400,
600 max=599,
601 runtime=True
602 )
603 ]
604
605 STALE_CACHE_FAIL = 'fail'
606 STALE_CACHE_RETURN = 'return'
607
608 def __init__(self, *args: Any, **kwargs: Any) -> None:
609 super(Module, self).__init__(*args, **kwargs)
610 self.metrics = self._setup_static_metrics()
611 self.shutdown_event = threading.Event()
612 self.collect_lock = threading.Lock()
613 self.collect_time = 0.0
614 self.scrape_interval: float = 15.0
615 self.cache = True
616 self.stale_cache_strategy: str = self.STALE_CACHE_FAIL
617 self.collect_cache: Optional[str] = None
618 self.rbd_stats = {
619 'pools': {},
620 'pools_refresh_time': 0,
621 'counters_info': {
622 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
623 'desc': 'RBD image writes count'},
624 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
625 'desc': 'RBD image reads count'},
626 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
627 'desc': 'RBD image bytes written'},
628 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
629 'desc': 'RBD image bytes read'},
630 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
631 'desc': 'RBD image writes latency (msec)'},
632 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
633 'desc': 'RBD image reads latency (msec)'},
634 },
635 } # type: Dict[str, Any]
636 global _global_instance
637 _global_instance = self
638 self.metrics_thread = MetricCollectionThread(_global_instance)
639 self.health_history = HealthHistory(self)
640
641 def _setup_static_metrics(self) -> Dict[str, Metric]:
642 metrics = {}
643 metrics['health_status'] = Metric(
644 'untyped',
645 'health_status',
646 'Cluster health status'
647 )
648 metrics['mon_quorum_status'] = Metric(
649 'gauge',
650 'mon_quorum_status',
651 'Monitors in quorum',
652 ('ceph_daemon',)
653 )
654 metrics['fs_metadata'] = Metric(
655 'untyped',
656 'fs_metadata',
657 'FS Metadata',
658 FS_METADATA
659 )
660 metrics['mds_metadata'] = Metric(
661 'untyped',
662 'mds_metadata',
663 'MDS Metadata',
664 MDS_METADATA
665 )
666 metrics['mon_metadata'] = Metric(
667 'untyped',
668 'mon_metadata',
669 'MON Metadata',
670 MON_METADATA
671 )
672 metrics['mgr_metadata'] = Metric(
673 'gauge',
674 'mgr_metadata',
675 'MGR metadata',
676 MGR_METADATA
677 )
678 metrics['mgr_status'] = Metric(
679 'gauge',
680 'mgr_status',
681 'MGR status (0=standby, 1=active)',
682 MGR_STATUS
683 )
684 metrics['mgr_module_status'] = Metric(
685 'gauge',
686 'mgr_module_status',
687 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
688 MGR_MODULE_STATUS
689 )
690 metrics['mgr_module_can_run'] = Metric(
691 'gauge',
692 'mgr_module_can_run',
693 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
694 MGR_MODULE_CAN_RUN
695 )
696 metrics['osd_metadata'] = Metric(
697 'untyped',
698 'osd_metadata',
699 'OSD Metadata',
700 OSD_METADATA
701 )
702
703 # The reason for having this separate to OSD_METADATA is
704 # so that we can stably use the same tag names that
705 # the Prometheus node_exporter does
706 metrics['disk_occupation'] = Metric(
707 'untyped',
708 'disk_occupation',
709 'Associate Ceph daemon with disk used',
710 DISK_OCCUPATION
711 )
712
713 metrics['disk_occupation_human'] = Metric(
714 'untyped',
715 'disk_occupation_human',
716 'Associate Ceph daemon with disk used for displaying to humans,'
717 ' not for joining tables (vector matching)',
718 DISK_OCCUPATION, # label names are automatically decimated on grouping
719 )
720
721 metrics['pool_metadata'] = Metric(
722 'untyped',
723 'pool_metadata',
724 'POOL Metadata',
725 POOL_METADATA
726 )
727
728 metrics['rgw_metadata'] = Metric(
729 'untyped',
730 'rgw_metadata',
731 'RGW Metadata',
732 RGW_METADATA
733 )
734
735 metrics['rbd_mirror_metadata'] = Metric(
736 'untyped',
737 'rbd_mirror_metadata',
738 'RBD Mirror Metadata',
739 RBD_MIRROR_METADATA
740 )
741
742 metrics['pg_total'] = Metric(
743 'gauge',
744 'pg_total',
745 'PG Total Count per Pool',
746 ('pool_id',)
747 )
748
749 metrics['health_detail'] = Metric(
750 'gauge',
751 'health_detail',
752 'healthcheck status by type (0=inactive, 1=active)',
753 HEALTHCHECK_DETAIL
754 )
755
756 metrics['pool_objects_repaired'] = Metric(
757 'counter',
758 'pool_objects_repaired',
759 'Number of objects repaired in a pool',
760 ('pool_id',)
761 )
762
763 metrics['daemon_health_metrics'] = Metric(
764 'gauge',
765 'daemon_health_metrics',
766 'Health metrics for Ceph daemons',
767 ('type', 'ceph_daemon',)
768 )
769
770 for flag in OSD_FLAGS:
771 path = 'osd_flag_{}'.format(flag)
772 metrics[path] = Metric(
773 'untyped',
774 path,
775 'OSD Flag {}'.format(flag)
776 )
777 for state in OSD_STATUS:
778 path = 'osd_{}'.format(state)
779 metrics[path] = Metric(
780 'untyped',
781 path,
782 'OSD status {}'.format(state),
783 ('ceph_daemon',)
784 )
785 for stat in OSD_STATS:
786 path = 'osd_{}'.format(stat)
787 metrics[path] = Metric(
788 'gauge',
789 path,
790 'OSD stat {}'.format(stat),
791 ('ceph_daemon',)
792 )
793 for stat in OSD_POOL_STATS:
794 path = 'pool_{}'.format(stat)
795 metrics[path] = Metric(
796 'gauge',
797 path,
798 "OSD pool stats: {}".format(stat),
799 ('pool_id',)
800 )
801 for state in PG_STATES:
802 path = 'pg_{}'.format(state)
803 metrics[path] = Metric(
804 'gauge',
805 path,
806 'PG {} per pool'.format(state),
807 ('pool_id',)
808 )
809 for state in DF_CLUSTER:
810 path = 'cluster_{}'.format(state)
811 metrics[path] = Metric(
812 'gauge',
813 path,
814 'DF {}'.format(state),
815 )
816 path = 'cluster_by_class_{}'.format(state)
817 metrics[path] = Metric(
818 'gauge',
819 path,
820 'DF {}'.format(state),
821 ('device_class',)
822 )
823 for state in DF_POOL:
824 path = 'pool_{}'.format(state)
825 metrics[path] = Metric(
826 'counter' if state in ('rd', 'rd_bytes', 'wr', 'wr_bytes') else 'gauge',
827 path,
828 'DF pool {}'.format(state),
829 ('pool_id',)
830 )
831 for state in OSD_BLOCKLIST:
832 path = 'cluster_{}'.format(state)
833 metrics[path] = Metric(
834 'gauge',
835 path,
836 'OSD Blocklist Count {}'.format(state),
837 )
838 for state in NUM_OBJECTS:
839 path = 'num_objects_{}'.format(state)
840 metrics[path] = Metric(
841 'gauge',
842 path,
843 'Number of {} objects'.format(state),
844 )
845
846 for check in HEALTH_CHECKS:
847 path = 'healthcheck_{}'.format(check.name.lower())
848 metrics[path] = Metric(
849 'gauge',
850 path,
851 check.description,
852 )
853
854 return metrics
855
856 def get_server_addr(self) -> str:
857 """
858 Return the current mgr server IP.
859 """
860 server_addr = cast(str, self.get_localized_module_option('server_addr', get_default_addr()))
861 if server_addr in ['::', '0.0.0.0']:
862 return self.get_mgr_ip()
863 return server_addr
864
865 def config_notify(self) -> None:
866 """
867 This method is called whenever one of our config options is changed.
868 """
869 # https://stackoverflow.com/questions/7254845/change-cherrypy-port-and-restart-web-server
870 # if we omit the line: cherrypy.server.httpserver = None
871 # then the cherrypy server is not restarted correctly
872 self.log.info('Restarting engine...')
873 cherrypy.engine.stop()
874 cherrypy.server.httpserver = None
875 server_addr = cast(str, self.get_localized_module_option('server_addr', get_default_addr()))
876 server_port = cast(int, self.get_localized_module_option('server_port', DEFAULT_PORT))
877 self.configure(server_addr, server_port)
878 cherrypy.engine.start()
879 self.log.info('Engine started.')
880
881 @profile_method()
882 def get_health(self) -> None:
883
884 def _get_value(message: str, delim: str = ' ', word_pos: int = 0) -> Tuple[int, int]:
885 """Extract value from message (default is 1st field)"""
886 v_str = message.split(delim)[word_pos]
887 if v_str.isdigit():
888 return int(v_str), 0
889 return 0, 1
890
891 health = json.loads(self.get('health')['json'])
892 # set overall health
893 self.metrics['health_status'].set(
894 health_status_to_number(health['status'])
895 )
896
897 # Examine the health to see if any health checks triggered need to
898 # become a specific metric with a value from the health detail
899 active_healthchecks = health.get('checks', {})
900 active_names = active_healthchecks.keys()
901
902 for check in HEALTH_CHECKS:
903 path = 'healthcheck_{}'.format(check.name.lower())
904
905 if path in self.metrics:
906
907 if check.name in active_names:
908 check_data = active_healthchecks[check.name]
909 message = check_data['summary'].get('message', '')
910 v, err = 0, 0
911
912 if check.name == "SLOW_OPS":
913 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have
914 # slow ops.
915 v, err = _get_value(message)
916
917 if err:
918 self.log.error(
919 "healthcheck %s message format is incompatible and has been dropped",
920 check.name)
921 # drop the metric, so it's no longer emitted
922 del self.metrics[path]
923 continue
924 else:
925 self.metrics[path].set(v)
926 else:
927 # health check is not active, so give it a default of 0
928 self.metrics[path].set(0)
929
930 self.health_history.check(health)
931 for name, info in self.health_history.healthcheck.items():
932 v = 1 if info.active else 0
933 self.metrics['health_detail'].set(
934 v, (
935 name,
936 str(info.severity))
937 )
938
939 @profile_method()
940 def get_pool_stats(self) -> None:
941 # retrieve pool stats to provide per pool recovery metrics
942 # (osd_pool_stats moved to mgr in Mimic)
943 pstats = self.get('osd_pool_stats')
944 for pool in pstats['pool_stats']:
945 for stat in OSD_POOL_STATS:
946 self.metrics['pool_{}'.format(stat)].set(
947 pool['recovery_rate'].get(stat, 0),
948 (pool['pool_id'],)
949 )
950
951 @profile_method()
952 def get_df(self) -> None:
953 # maybe get the to-be-exported metrics from a config?
954 df = self.get('df')
955 for stat in DF_CLUSTER:
956 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
957 for device_class in df['stats_by_class']:
958 self.metrics['cluster_by_class_{}'.format(stat)].set(
959 df['stats_by_class'][device_class][stat], (device_class,))
960
961 for pool in df['pools']:
962 for stat in DF_POOL:
963 self.metrics['pool_{}'.format(stat)].set(
964 pool['stats'][stat],
965 (pool['id'],)
966 )
967
968 @profile_method()
969 def get_osd_blocklisted_entries(self) -> None:
970 r = self.mon_command({
971 'prefix': 'osd blocklist ls',
972 'format': 'json'
973 })
974 blocklist_entries = r[2].split(' ')
975 blocklist_count = blocklist_entries[1]
976 for stat in OSD_BLOCKLIST:
977 self.metrics['cluster_{}'.format(stat)].set(int(blocklist_count))
978
979 @profile_method()
980 def get_fs(self) -> None:
981 fs_map = self.get('fs_map')
982 servers = self.get_service_list()
983 self.log.debug('standbys: {}'.format(fs_map['standbys']))
984 # export standby mds metadata, default standby fs_id is '-1'
985 for standby in fs_map['standbys']:
986 id_ = standby['name']
987 host, version, _ = servers.get((id_, 'mds'), ('', '', ''))
988 addr, rank = standby['addr'], standby['rank']
989 self.metrics['mds_metadata'].set(1, (
990 'mds.{}'.format(id_), '-1',
991 cast(str, host),
992 cast(str, addr),
993 cast(str, rank),
994 cast(str, version)
995 ))
996 for fs in fs_map['filesystems']:
997 # collect fs metadata
998 data_pools = ",".join([str(pool)
999 for pool in fs['mdsmap']['data_pools']])
1000 self.metrics['fs_metadata'].set(1, (
1001 data_pools,
1002 fs['id'],
1003 fs['mdsmap']['metadata_pool'],
1004 fs['mdsmap']['fs_name']
1005 ))
1006 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
1007 for gid, daemon in fs['mdsmap']['info'].items():
1008 id_ = daemon['name']
1009 host, version, _ = servers.get((id_, 'mds'), ('', '', ''))
1010 self.metrics['mds_metadata'].set(1, (
1011 'mds.{}'.format(id_), fs['id'],
1012 host, daemon['addr'],
1013 daemon['rank'], version
1014 ))
1015
1016 @profile_method()
1017 def get_quorum_status(self) -> None:
1018 mon_status = json.loads(self.get('mon_status')['json'])
1019 servers = self.get_service_list()
1020 for mon in mon_status['monmap']['mons']:
1021 rank = mon['rank']
1022 id_ = mon['name']
1023 mon_version = servers.get((id_, 'mon'), ('', '', ''))
1024 self.metrics['mon_metadata'].set(1, (
1025 'mon.{}'.format(id_), mon_version[0],
1026 mon['public_addr'].rsplit(':', 1)[0], rank,
1027 mon_version[1]
1028 ))
1029 in_quorum = int(rank in mon_status['quorum'])
1030 self.metrics['mon_quorum_status'].set(in_quorum, (
1031 'mon.{}'.format(id_),
1032 ))
1033
1034 @profile_method()
1035 def get_mgr_status(self) -> None:
1036 mgr_map = self.get('mgr_map')
1037 servers = self.get_service_list()
1038
1039 active = mgr_map['active_name']
1040 standbys = [s.get('name') for s in mgr_map['standbys']]
1041
1042 all_mgrs = list(standbys)
1043 all_mgrs.append(active)
1044
1045 all_modules = {module.get('name'): module.get('can_run')
1046 for module in mgr_map['available_modules']}
1047
1048 for mgr in all_mgrs:
1049 host, version, _ = servers.get((mgr, 'mgr'), ('', '', ''))
1050 if mgr == active:
1051 _state = 1
1052 else:
1053 _state = 0
1054
1055 self.metrics['mgr_metadata'].set(1, (
1056 f'mgr.{mgr}', host, version
1057 ))
1058 self.metrics['mgr_status'].set(_state, (
1059 f'mgr.{mgr}',))
1060 always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
1061 active_modules = list(always_on_modules)
1062 active_modules.extend(mgr_map['modules'])
1063
1064 for mod_name in all_modules.keys():
1065
1066 if mod_name in always_on_modules:
1067 _state = 2
1068 elif mod_name in active_modules:
1069 _state = 1
1070 else:
1071 _state = 0
1072
1073 _can_run = 1 if all_modules[mod_name] else 0
1074 self.metrics['mgr_module_status'].set(_state, (mod_name,))
1075 self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
1076
1077 @profile_method()
1078 def get_pg_status(self) -> None:
1079
1080 pg_summary = self.get('pg_summary')
1081
1082 for pool in pg_summary['by_pool']:
1083 num_by_state: DefaultDict[str, int] = defaultdict(int)
1084 for state in PG_STATES:
1085 num_by_state[state] = 0
1086
1087 for state_name, count in pg_summary['by_pool'][pool].items():
1088 for state in state_name.split('+'):
1089 num_by_state[state] += count
1090 num_by_state['total'] += count
1091
1092 for state, num in num_by_state.items():
1093 try:
1094 self.metrics["pg_{}".format(state)].set(num, (pool,))
1095 except KeyError:
1096 self.log.warning("skipping pg in unknown state {}".format(state))
1097
1098 @profile_method()
1099 def get_osd_stats(self) -> None:
1100 osd_stats = self.get('osd_stats')
1101 for osd in osd_stats['osd_stats']:
1102 id_ = osd['osd']
1103 for stat in OSD_STATS:
1104 val = osd['perf_stat'][stat]
1105 self.metrics['osd_{}'.format(stat)].set(val, (
1106 'osd.{}'.format(id_),
1107 ))
1108
1109 def get_service_list(self) -> Dict[Tuple[str, str], Tuple[str, str, str]]:
1110 ret = {}
1111 for server in self.list_servers():
1112 host = cast(str, server.get('hostname', ''))
1113 for service in cast(List[ServiceInfoT], server.get('services', [])):
1114 ret.update({(service['id'], service['type']): (host,
1115 service.get('ceph_version', 'unknown'),
1116 service.get('name', ''))})
1117 return ret
1118
1119 @profile_method()
1120 def get_metadata_and_osd_status(self) -> None:
1121 osd_map = self.get('osd_map')
1122 osd_flags = osd_map['flags'].split(',')
1123 for flag in OSD_FLAGS:
1124 self.metrics['osd_flag_{}'.format(flag)].set(
1125 int(flag in osd_flags)
1126 )
1127
1128 osd_devices = self.get('osd_map_crush')['devices']
1129 servers = self.get_service_list()
1130 for osd in osd_map['osds']:
1131 # id can be used to link osd metrics and metadata
1132 id_ = osd['osd']
1133 # collect osd metadata
1134 p_addr = osd['public_addr'].rsplit(':', 1)[0]
1135 c_addr = osd['cluster_addr'].rsplit(':', 1)[0]
1136 if p_addr == "-" or c_addr == "-":
1137 self.log.info(
1138 "Missing address metadata for osd {0}, skipping occupation"
1139 " and metadata records for this osd".format(id_)
1140 )
1141 continue
1142
1143 dev_class = None
1144 for osd_device in osd_devices:
1145 if osd_device['id'] == id_:
1146 dev_class = osd_device.get('class', '')
1147 break
1148
1149 if dev_class is None:
1150 self.log.info("OSD {0} is missing from CRUSH map, "
1151 "skipping output".format(id_))
1152 continue
1153
1154 osd_version = servers.get((str(id_), 'osd'), ('', '', ''))
1155
1156 # collect disk occupation metadata
1157 osd_metadata = self.get_metadata("osd", str(id_))
1158 if osd_metadata is None:
1159 continue
1160
1161 obj_store = osd_metadata.get('osd_objectstore', '')
1162 f_iface = osd_metadata.get('front_iface', '')
1163 b_iface = osd_metadata.get('back_iface', '')
1164
1165 self.metrics['osd_metadata'].set(1, (
1166 b_iface,
1167 'osd.{}'.format(id_),
1168 c_addr,
1169 dev_class,
1170 f_iface,
1171 osd_version[0],
1172 obj_store,
1173 p_addr,
1174 osd_version[1]
1175 ))
1176
1177 # collect osd status
1178 for state in OSD_STATUS:
1179 status = osd[state]
1180 self.metrics['osd_{}'.format(state)].set(status, (
1181 'osd.{}'.format(id_),
1182 ))
1183
1184 osd_dev_node = None
1185 osd_wal_dev_node = ''
1186 osd_db_dev_node = ''
1187 if obj_store == "filestore":
1188 # collect filestore backend device
1189 osd_dev_node = osd_metadata.get(
1190 'backend_filestore_dev_node', None)
1191 # collect filestore journal device
1192 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
1193 osd_db_dev_node = ''
1194 elif obj_store == "bluestore":
1195 # collect bluestore backend device
1196 osd_dev_node = osd_metadata.get(
1197 'bluestore_bdev_dev_node', None)
1198 # collect bluestore wal backend
1199 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
1200 # collect bluestore db backend
1201 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
1202 if osd_dev_node and osd_dev_node == "unknown":
1203 osd_dev_node = None
1204
1205 # fetch the devices and ids (vendor, model, serial) from the
1206 # osd_metadata
1207 osd_devs = osd_metadata.get('devices', '') or 'N/A'
1208 osd_dev_ids = osd_metadata.get('device_ids', '') or 'N/A'
1209
1210 osd_hostname = osd_metadata.get('hostname', None)
1211 if osd_dev_node and osd_hostname:
1212 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
1213 id_, osd_hostname, osd_dev_node))
1214 self.metrics['disk_occupation'].set(1, (
1215 "osd.{0}".format(id_),
1216 osd_dev_node,
1217 osd_db_dev_node,
1218 osd_wal_dev_node,
1219 osd_hostname,
1220 osd_devs,
1221 osd_dev_ids,
1222 ))
1223 else:
1224 self.log.info("Missing dev node metadata for osd {0}, skipping "
1225 "occupation record for this osd".format(id_))
1226
1227 if 'disk_occupation' in self.metrics:
1228 try:
1229 self.metrics['disk_occupation_human'] = \
1230 self.metrics['disk_occupation'].group_by(
1231 ['device', 'instance'],
1232 {'ceph_daemon': lambda daemons: ', '.join(daemons)},
1233 name='disk_occupation_human',
1234 )
1235 except Exception as e:
1236 self.log.error(e)
1237
1238 ec_profiles = osd_map.get('erasure_code_profiles', {})
1239
1240 def _get_pool_info(pool: Dict[str, Any]) -> Tuple[str, str]:
1241 pool_type = 'unknown'
1242 description = 'unknown'
1243
1244 if pool['type'] == 1:
1245 pool_type = "replicated"
1246 description = f"replica:{pool['size']}"
1247 elif pool['type'] == 3:
1248 pool_type = "erasure"
1249 name = pool.get('erasure_code_profile', '')
1250 profile = ec_profiles.get(name, {})
1251 if profile:
1252 description = f"ec:{profile['k']}+{profile['m']}"
1253 else:
1254 description = "ec:unknown"
1255
1256 return pool_type, description
1257
1258 for pool in osd_map['pools']:
1259
1260 compression_mode = 'none'
1261 pool_type, pool_description = _get_pool_info(pool)
1262
1263 if 'options' in pool:
1264 compression_mode = pool['options'].get('compression_mode', 'none')
1265
1266 self.metrics['pool_metadata'].set(
1267 1, (
1268 pool['pool'],
1269 pool['pool_name'],
1270 pool_type,
1271 pool_description,
1272 compression_mode)
1273 )
1274
1275 # Populate other servers metadata
1276 for key, value in servers.items():
1277 service_id, service_type = key
1278 if service_type == 'rgw':
1279 hostname, version, name = value
1280 self.metrics['rgw_metadata'].set(
1281 1,
1282 ('{}.{}'.format(service_type, name),
1283 hostname, version, service_id)
1284 )
1285 elif service_type == 'rbd-mirror':
1286 mirror_metadata = self.get_metadata('rbd-mirror', service_id)
1287 if mirror_metadata is None:
1288 continue
1289 mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
1290 service_id)
1291 rbd_mirror_metadata = cast(LabelValues,
1292 (mirror_metadata.get(k, '')
1293 for k in RBD_MIRROR_METADATA))
1294 self.metrics['rbd_mirror_metadata'].set(
1295 1, rbd_mirror_metadata
1296 )
1297
1298 @profile_method()
1299 def get_num_objects(self) -> None:
1300 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
1301 for obj in NUM_OBJECTS:
1302 stat = 'num_objects_{}'.format(obj)
1303 self.metrics[stat].set(pg_sum[stat])
1304
1305 @profile_method()
1306 def get_rbd_stats(self) -> None:
1307 # Per RBD image stats is collected by registering a dynamic osd perf
1308 # stats query that tells OSDs to group stats for requests associated
1309 # with RBD objects by pool, namespace, and image id, which are
1310 # extracted from the request object names or other attributes.
1311 # The RBD object names have the following prefixes:
1312 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
1313 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
1314 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
1315 # The pool_id in the object name is the id of the pool with the image
1316 # metdata, and should be used in the image spec. If there is no pool_id
1317 # in the object name, the image pool is the pool where the object is
1318 # located.
1319
1320 # Parse rbd_stats_pools option, which is a comma or space separated
1321 # list of pool[/namespace] entries. If no namespace is specifed the
1322 # stats are collected for every namespace in the pool. The wildcard
1323 # '*' can be used to indicate all pools or namespaces
1324 pools_string = cast(str, self.get_localized_module_option('rbd_stats_pools'))
1325 pool_keys = set()
1326 osd_map = self.get('osd_map')
1327 rbd_pools = [pool['pool_name'] for pool in osd_map['pools']
1328 if 'rbd' in pool.get('application_metadata', {})]
1329 for x in re.split(r'[\s,]+', pools_string):
1330 if not x:
1331 continue
1332
1333 s = x.split('/', 2)
1334 pool_name = s[0]
1335 namespace_name = None
1336 if len(s) == 2:
1337 namespace_name = s[1]
1338
1339 if pool_name == "*":
1340 # collect for all pools
1341 for pool in rbd_pools:
1342 pool_keys.add((pool, namespace_name))
1343 else:
1344 if pool_name in rbd_pools:
1345 pool_keys.add((pool_name, namespace_name)) # avoids adding deleted pool
1346
1347 pools = {} # type: Dict[str, Set[str]]
1348 for pool_key in pool_keys:
1349 pool_name = pool_key[0]
1350 namespace_name = pool_key[1]
1351 if not namespace_name or namespace_name == "*":
1352 # empty set means collect for all namespaces
1353 pools[pool_name] = set()
1354 continue
1355
1356 if pool_name not in pools:
1357 pools[pool_name] = set()
1358 elif not pools[pool_name]:
1359 continue
1360 pools[pool_name].add(namespace_name)
1361
1362 rbd_stats_pools = {}
1363 for pool_id in self.rbd_stats['pools'].keys():
1364 name = self.rbd_stats['pools'][pool_id]['name']
1365 if name not in pools:
1366 del self.rbd_stats['pools'][pool_id]
1367 else:
1368 rbd_stats_pools[name] = \
1369 self.rbd_stats['pools'][pool_id]['ns_names']
1370
1371 pools_refreshed = False
1372 if pools:
1373 next_refresh = self.rbd_stats['pools_refresh_time'] + \
1374 self.get_localized_module_option(
1375 'rbd_stats_pools_refresh_interval', 300)
1376 if rbd_stats_pools != pools or time.time() >= next_refresh:
1377 self.refresh_rbd_stats_pools(pools)
1378 pools_refreshed = True
1379
1380 pool_ids = list(self.rbd_stats['pools'])
1381 pool_ids.sort()
1382 pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
1383
1384 nspace_names = []
1385 for pool_id, pool in self.rbd_stats['pools'].items():
1386 if pool['ns_names']:
1387 nspace_names.extend(pool['ns_names'])
1388 else:
1389 nspace_names = []
1390 break
1391 if nspace_names:
1392 namespace_regex = '^(' + \
1393 "|".join([re.escape(x)
1394 for x in set(nspace_names)]) + ')$'
1395 else:
1396 namespace_regex = '^(.*)$'
1397
1398 if ('query' in self.rbd_stats
1399 and (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex']
1400 or namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex'])):
1401 self.remove_osd_perf_query(self.rbd_stats['query_id'])
1402 del self.rbd_stats['query_id']
1403 del self.rbd_stats['query']
1404
1405 if not self.rbd_stats['pools']:
1406 return
1407
1408 counters_info = self.rbd_stats['counters_info']
1409
1410 if 'query_id' not in self.rbd_stats:
1411 query = {
1412 'key_descriptor': [
1413 {'type': 'pool_id', 'regex': pool_id_regex},
1414 {'type': 'namespace', 'regex': namespace_regex},
1415 {'type': 'object_name',
1416 'regex': r'^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
1417 ],
1418 'performance_counter_descriptors': list(counters_info),
1419 }
1420 query_id = self.add_osd_perf_query(query)
1421 if query_id is None:
1422 self.log.error('failed to add query %s' % query)
1423 return
1424 self.rbd_stats['query'] = query
1425 self.rbd_stats['query_id'] = query_id
1426
1427 res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
1428 assert res
1429 for c in res['counters']:
1430 # if the pool id is not found in the object name use id of the
1431 # pool where the object is located
1432 if c['k'][2][0]:
1433 pool_id = int(c['k'][2][0])
1434 else:
1435 pool_id = int(c['k'][0][0])
1436 if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
1437 self.refresh_rbd_stats_pools(pools)
1438 pools_refreshed = True
1439 if pool_id not in self.rbd_stats['pools']:
1440 continue
1441 pool = self.rbd_stats['pools'][pool_id]
1442 nspace_name = c['k'][1][0]
1443 if nspace_name not in pool['images']:
1444 continue
1445 image_id = c['k'][2][1]
1446 if image_id not in pool['images'][nspace_name] and \
1447 not pools_refreshed:
1448 self.refresh_rbd_stats_pools(pools)
1449 pool = self.rbd_stats['pools'][pool_id]
1450 pools_refreshed = True
1451 if image_id not in pool['images'][nspace_name]:
1452 continue
1453 counters = pool['images'][nspace_name][image_id]['c']
1454 for i in range(len(c['c'])):
1455 counters[i][0] += c['c'][i][0]
1456 counters[i][1] += c['c'][i][1]
1457
1458 label_names = ("pool", "namespace", "image")
1459 for pool_id, pool in self.rbd_stats['pools'].items():
1460 pool_name = pool['name']
1461 for nspace_name, images in pool['images'].items():
1462 for image_id in images:
1463 image_name = images[image_id]['n']
1464 counters = images[image_id]['c']
1465 i = 0
1466 for key in counters_info:
1467 counter_info = counters_info[key]
1468 stattype = self._stattype_to_str(counter_info['type'])
1469 labels = (pool_name, nspace_name, image_name)
1470 if counter_info['type'] == self.PERFCOUNTER_COUNTER:
1471 path = 'rbd_' + key
1472 if path not in self.metrics:
1473 self.metrics[path] = Metric(
1474 stattype,
1475 path,
1476 counter_info['desc'],
1477 label_names,
1478 )
1479 self.metrics[path].set(counters[i][0], labels)
1480 elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
1481 path = 'rbd_' + key + '_sum'
1482 if path not in self.metrics:
1483 self.metrics[path] = Metric(
1484 stattype,
1485 path,
1486 counter_info['desc'] + ' Total',
1487 label_names,
1488 )
1489 self.metrics[path].set(counters[i][0], labels)
1490 path = 'rbd_' + key + '_count'
1491 if path not in self.metrics:
1492 self.metrics[path] = Metric(
1493 'counter',
1494 path,
1495 counter_info['desc'] + ' Count',
1496 label_names,
1497 )
1498 self.metrics[path].set(counters[i][1], labels)
1499 i += 1
1500
1501 def refresh_rbd_stats_pools(self, pools: Dict[str, Set[str]]) -> None:
1502 self.log.debug('refreshing rbd pools %s' % (pools))
1503
1504 rbd = RBD()
1505 counters_info = self.rbd_stats['counters_info']
1506 for pool_name, cfg_ns_names in pools.items():
1507 try:
1508 pool_id = self.rados.pool_lookup(pool_name)
1509 with self.rados.open_ioctx(pool_name) as ioctx:
1510 if pool_id not in self.rbd_stats['pools']:
1511 self.rbd_stats['pools'][pool_id] = {'images': {}}
1512 pool = self.rbd_stats['pools'][pool_id]
1513 pool['name'] = pool_name
1514 pool['ns_names'] = cfg_ns_names
1515 if cfg_ns_names:
1516 nspace_names = list(cfg_ns_names)
1517 else:
1518 nspace_names = [''] + rbd.namespace_list(ioctx)
1519 for nspace_name in pool['images']:
1520 if nspace_name not in nspace_names:
1521 del pool['images'][nspace_name]
1522 for nspace_name in nspace_names:
1523 if nspace_name and\
1524 not rbd.namespace_exists(ioctx, nspace_name):
1525 self.log.debug('unknown namespace %s for pool %s' %
1526 (nspace_name, pool_name))
1527 continue
1528 ioctx.set_namespace(nspace_name)
1529 if nspace_name not in pool['images']:
1530 pool['images'][nspace_name] = {}
1531 namespace = pool['images'][nspace_name]
1532 images = {}
1533 for image_meta in RBD().list2(ioctx):
1534 image = {'n': image_meta['name']}
1535 image_id = image_meta['id']
1536 if image_id in namespace:
1537 image['c'] = namespace[image_id]['c']
1538 else:
1539 image['c'] = [[0, 0] for x in counters_info]
1540 images[image_id] = image
1541 pool['images'][nspace_name] = images
1542 except Exception as e:
1543 self.log.error('failed listing pool %s: %s' % (pool_name, e))
1544 self.rbd_stats['pools_refresh_time'] = time.time()
1545
1546 def shutdown_rbd_stats(self) -> None:
1547 if 'query_id' in self.rbd_stats:
1548 self.remove_osd_perf_query(self.rbd_stats['query_id'])
1549 del self.rbd_stats['query_id']
1550 del self.rbd_stats['query']
1551 self.rbd_stats['pools'].clear()
1552
1553 def add_fixed_name_metrics(self) -> None:
1554 """
1555 Add fixed name metrics from existing ones that have details in their names
1556 that should be in labels (not in name).
1557 For backward compatibility, a new fixed name metric is created (instead of replacing)
1558 and details are put in new labels.
1559 Intended for RGW sync perf. counters but extendable as required.
1560 See: https://tracker.ceph.com/issues/45311
1561 """
1562 new_metrics = {}
1563 for metric_path, metrics in self.metrics.items():
1564 # Address RGW sync perf. counters.
1565 match = re.search(r'^data-sync-from-(.*)\.', metric_path)
1566 if match:
1567 new_path = re.sub('from-([^.]*)', 'from-zone', metric_path)
1568 if new_path not in new_metrics:
1569 new_metrics[new_path] = Metric(
1570 metrics.mtype,
1571 new_path,
1572 metrics.desc,
1573 cast(LabelValues, metrics.labelnames) + ('source_zone',)
1574 )
1575 for label_values, value in metrics.value.items():
1576 new_metrics[new_path].set(value, label_values + (match.group(1),))
1577
1578 self.metrics.update(new_metrics)
1579
1580 def get_collect_time_metrics(self) -> None:
1581 sum_metric = self.metrics.get('prometheus_collect_duration_seconds_sum')
1582 count_metric = self.metrics.get('prometheus_collect_duration_seconds_count')
1583 if sum_metric is None:
1584 sum_metric = MetricCounter(
1585 'prometheus_collect_duration_seconds_sum',
1586 'The sum of seconds took to collect all metrics of this exporter',
1587 ('method',))
1588 self.metrics['prometheus_collect_duration_seconds_sum'] = sum_metric
1589 if count_metric is None:
1590 count_metric = MetricCounter(
1591 'prometheus_collect_duration_seconds_count',
1592 'The amount of metrics gathered for this exporter',
1593 ('method',))
1594 self.metrics['prometheus_collect_duration_seconds_count'] = count_metric
1595
1596 # Collect all timing data and make it available as metric, excluding the
1597 # `collect` method because it has not finished at this point and hence
1598 # there's no `_execution_duration` attribute to be found. The
1599 # `_execution_duration` attribute is added by the `profile_method`
1600 # decorator.
1601 for method_name, method in Module.__dict__.items():
1602 duration = getattr(method, '_execution_duration', None)
1603 if duration is not None:
1604 cast(MetricCounter, sum_metric).add(duration, (method_name,))
1605 cast(MetricCounter, count_metric).add(1, (method_name,))
1606
1607 def get_pool_repaired_objects(self) -> None:
1608 dump = self.get('pg_dump')
1609 for stats in dump['pool_stats']:
1610 path = 'pool_objects_repaired'
1611 self.metrics[path].set(stats['stat_sum']['num_objects_repaired'],
1612 labelvalues=(stats['poolid'],))
1613
1614 def get_all_daemon_health_metrics(self) -> None:
1615 daemon_metrics = self.get_daemon_health_metrics()
1616 self.log.debug('metrics jeje %s' % (daemon_metrics))
1617 for daemon_name, health_metrics in daemon_metrics.items():
1618 for health_metric in health_metrics:
1619 path = 'daemon_health_metrics'
1620 self.metrics[path].set(health_metric['value'], labelvalues=(
1621 health_metric['type'], daemon_name,))
1622
1623 @profile_method(True)
1624 def collect(self) -> str:
1625 # Clear the metrics before scraping
1626 for k in self.metrics.keys():
1627 self.metrics[k].clear()
1628
1629 self.get_health()
1630 self.get_df()
1631 self.get_osd_blocklisted_entries()
1632 self.get_pool_stats()
1633 self.get_fs()
1634 self.get_osd_stats()
1635 self.get_quorum_status()
1636 self.get_mgr_status()
1637 self.get_metadata_and_osd_status()
1638 self.get_pg_status()
1639 self.get_pool_repaired_objects()
1640 self.get_num_objects()
1641 self.get_all_daemon_health_metrics()
1642
1643 for daemon, counters in self.get_all_perf_counters().items():
1644 for path, counter_info in counters.items():
1645 # Skip histograms, they are represented by long running avgs
1646 stattype = self._stattype_to_str(counter_info['type'])
1647 if not stattype or stattype == 'histogram':
1648 self.log.debug('ignoring %s, type %s' % (path, stattype))
1649 continue
1650
1651 path, label_names, labels = self._perfpath_to_path_labels(
1652 daemon, path)
1653
1654 # Get the value of the counter
1655 value = self._perfvalue_to_value(
1656 counter_info['type'], counter_info['value'])
1657
1658 # Represent the long running avgs as sum/count pairs
1659 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
1660 _path = path + '_sum'
1661 if _path not in self.metrics:
1662 self.metrics[_path] = Metric(
1663 stattype,
1664 _path,
1665 counter_info['description'] + ' Total',
1666 label_names,
1667 )
1668 self.metrics[_path].set(value, labels)
1669
1670 _path = path + '_count'
1671 if _path not in self.metrics:
1672 self.metrics[_path] = Metric(
1673 'counter',
1674 _path,
1675 counter_info['description'] + ' Count',
1676 label_names,
1677 )
1678 self.metrics[_path].set(counter_info['count'], labels,)
1679 else:
1680 if path not in self.metrics:
1681 self.metrics[path] = Metric(
1682 stattype,
1683 path,
1684 counter_info['description'],
1685 label_names,
1686 )
1687 self.metrics[path].set(value, labels)
1688
1689 self.add_fixed_name_metrics()
1690 self.get_rbd_stats()
1691
1692 self.get_collect_time_metrics()
1693
1694 # Return formatted metrics and clear no longer used data
1695 _metrics = [m.str_expfmt() for m in self.metrics.values()]
1696 for k in self.metrics.keys():
1697 self.metrics[k].clear()
1698
1699 return ''.join(_metrics) + '\n'
1700
1701 @CLIReadCommand('prometheus file_sd_config')
1702 def get_file_sd_config(self) -> Tuple[int, str, str]:
1703 '''
1704 Return file_sd compatible prometheus config for mgr cluster
1705 '''
1706 servers = self.list_servers()
1707 targets = []
1708 for server in servers:
1709 hostname = server.get('hostname', '')
1710 for service in cast(List[ServiceInfoT], server.get('services', [])):
1711 if service['type'] != 'mgr':
1712 continue
1713 id_ = service['id']
1714 port = self._get_module_option('server_port', DEFAULT_PORT, id_)
1715 targets.append(f'{hostname}:{port}')
1716 ret = [
1717 {
1718 "targets": targets,
1719 "labels": {}
1720 }
1721 ]
1722 return 0, json.dumps(ret), ""
1723
1724 def self_test(self) -> None:
1725 self.collect()
1726 self.get_file_sd_config()
1727
1728 def configure(self, server_addr: str, server_port: int) -> None:
1729 # cephadm deployments have a TLS monitoring stack setup option.
1730 # If the cephadm module is on and the setting is true (defaults to false)
1731 # we should have prometheus be set up to interact with that
1732 cephadm_secure_monitoring_stack = self.get_module_option_ex(
1733 'cephadm', 'secure_monitoring_stack', False)
1734 if cephadm_secure_monitoring_stack:
1735 try:
1736 self.setup_cephadm_tls_config(server_addr, server_port)
1737 return
1738 except Exception as e:
1739 self.log.exception(f'Failed to setup cephadm based secure monitoring stack: {e}\n',
1740 'Falling back to default configuration')
1741 self.setup_default_config(server_addr, server_port)
1742
1743 def setup_default_config(self, server_addr: str, server_port: int) -> None:
1744 cherrypy.config.update({
1745 'server.socket_host': server_addr,
1746 'server.socket_port': server_port,
1747 'engine.autoreload.on': False,
1748 'server.ssl_module': None,
1749 'server.ssl_certificate': None,
1750 'server.ssl_private_key': None,
1751 })
1752 # Publish the URI that others may use to access the service we're about to start serving
1753 self.set_uri(build_url(scheme='http', host=self.get_server_addr(),
1754 port=server_port, path='/'))
1755
1756 def setup_cephadm_tls_config(self, server_addr: str, server_port: int) -> None:
1757 from cephadm.ssl_cert_utils import SSLCerts
1758 # the ssl certs utils uses a NamedTemporaryFile for the cert files
1759 # generated with generate_cert_files function. We need the SSLCerts
1760 # object to not be cleaned up in order to have those temp files not
1761 # be cleaned up, so making it an attribute of the module instead
1762 # of just a standalone object
1763 self.cephadm_monitoring_tls_ssl_certs = SSLCerts()
1764 host = self.get_mgr_ip()
1765 try:
1766 old_cert = self.get_store('root/cert')
1767 old_key = self.get_store('root/key')
1768 if not old_cert or not old_key:
1769 raise Exception('No old credentials for mgr-prometheus endpoint')
1770 self.cephadm_monitoring_tls_ssl_certs.load_root_credentials(old_cert, old_key)
1771 except Exception:
1772 self.cephadm_monitoring_tls_ssl_certs.generate_root_cert(host)
1773 self.set_store('root/cert', self.cephadm_monitoring_tls_ssl_certs.get_root_cert())
1774 self.set_store('root/key', self.cephadm_monitoring_tls_ssl_certs.get_root_key())
1775
1776 cert_file_path, key_file_path = self.cephadm_monitoring_tls_ssl_certs.generate_cert_files(
1777 self.get_hostname(), host)
1778
1779 cherrypy.config.update({
1780 'server.socket_host': server_addr,
1781 'server.socket_port': server_port,
1782 'engine.autoreload.on': False,
1783 'server.ssl_module': 'builtin',
1784 'server.ssl_certificate': cert_file_path,
1785 'server.ssl_private_key': key_file_path,
1786 })
1787 # Publish the URI that others may use to access the service we're about to start serving
1788 self.set_uri(build_url(scheme='https', host=self.get_server_addr(),
1789 port=server_port, path='/'))
1790
1791 def serve(self) -> None:
1792
1793 class Root(object):
1794
1795 # collapse everything to '/'
1796 def _cp_dispatch(self, vpath: str) -> 'Root':
1797 cherrypy.request.path = ''
1798 return self
1799
1800 @cherrypy.expose
1801 def index(self) -> str:
1802 return '''<!DOCTYPE html>
1803 <html>
1804 <head><title>Ceph Exporter</title></head>
1805 <body>
1806 <h1>Ceph Exporter</h1>
1807 <p><a href='/metrics'>Metrics</a></p>
1808 </body>
1809 </html>'''
1810
1811 @cherrypy.expose
1812 def metrics(self) -> Optional[str]:
1813 # Lock the function execution
1814 assert isinstance(_global_instance, Module)
1815 with _global_instance.collect_lock:
1816 return self._metrics(_global_instance)
1817
1818 @staticmethod
1819 def _metrics(instance: 'Module') -> Optional[str]:
1820 if not self.cache:
1821 self.log.debug('Cache disabled, collecting and returning without cache')
1822 cherrypy.response.headers['Content-Type'] = 'text/plain'
1823 return self.collect()
1824
1825 # Return cached data if available
1826 if not instance.collect_cache:
1827 raise cherrypy.HTTPError(503, 'No cached data available yet')
1828
1829 def respond() -> Optional[str]:
1830 assert isinstance(instance, Module)
1831 cherrypy.response.headers['Content-Type'] = 'text/plain'
1832 return instance.collect_cache
1833
1834 if instance.collect_time < instance.scrape_interval:
1835 # Respond if cache isn't stale
1836 return respond()
1837
1838 if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN:
1839 # Respond even if cache is stale
1840 instance.log.info(
1841 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1842 'returning metrics from stale cache.'.format(
1843 instance.collect_time,
1844 instance.collect_time - instance.scrape_interval
1845 )
1846 )
1847 return respond()
1848
1849 if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL:
1850 # Fail if cache is stale
1851 msg = (
1852 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1853 'returning "service unavailable".'.format(
1854 instance.collect_time,
1855 instance.collect_time - instance.scrape_interval,
1856 )
1857 )
1858 instance.log.error(msg)
1859 raise cherrypy.HTTPError(503, msg)
1860 return None
1861
1862 # Make the cache timeout for collecting configurable
1863 self.scrape_interval = cast(float, self.get_localized_module_option('scrape_interval'))
1864
1865 self.stale_cache_strategy = cast(
1866 str, self.get_localized_module_option('stale_cache_strategy'))
1867 if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL,
1868 self.STALE_CACHE_RETURN]:
1869 self.stale_cache_strategy = self.STALE_CACHE_FAIL
1870
1871 server_addr = cast(str, self.get_localized_module_option('server_addr', get_default_addr()))
1872 server_port = cast(int, self.get_localized_module_option('server_port', DEFAULT_PORT))
1873 self.log.info(
1874 "server_addr: %s server_port: %s" %
1875 (server_addr, server_port)
1876 )
1877
1878 self.cache = cast(bool, self.get_localized_module_option('cache', True))
1879 if self.cache:
1880 self.log.info('Cache enabled')
1881 self.metrics_thread.start()
1882 else:
1883 self.log.info('Cache disabled')
1884
1885 self.configure(server_addr, server_port)
1886
1887 cherrypy.tree.mount(Root(), "/")
1888 self.log.info('Starting engine...')
1889 cherrypy.engine.start()
1890 self.log.info('Engine started.')
1891
1892 # wait for the shutdown event
1893 self.shutdown_event.wait()
1894 self.shutdown_event.clear()
1895 # tell metrics collection thread to stop collecting new metrics
1896 self.metrics_thread.stop()
1897 cherrypy.engine.stop()
1898 cherrypy.server.httpserver = None
1899 self.log.info('Engine stopped.')
1900 self.shutdown_rbd_stats()
1901 # wait for the metrics collection thread to stop
1902 self.metrics_thread.join()
1903
1904 def shutdown(self) -> None:
1905 self.log.info('Stopping engine...')
1906 self.shutdown_event.set()
1907
1908 @CLIReadCommand('healthcheck history ls')
1909 def _list_healthchecks(self, format: Format = Format.plain) -> HandleCommandResult:
1910 """List all the healthchecks being tracked
1911
1912 The format options are parsed in ceph_argparse, before they get evaluated here so
1913 we can safely assume that what we have to process is valid. ceph_argparse will throw
1914 a ValueError if the cast to our Format class fails.
1915
1916 Args:
1917 format (Format, optional): output format. Defaults to Format.plain.
1918
1919 Returns:
1920 HandleCommandResult: return code, stdout and stderr returned to the caller
1921 """
1922
1923 out = ""
1924 if format == Format.plain:
1925 out = str(self.health_history)
1926 elif format == Format.yaml:
1927 out = self.health_history.as_yaml()
1928 else:
1929 out = self.health_history.as_json(format == Format.json_pretty)
1930
1931 return HandleCommandResult(retval=0, stdout=out)
1932
1933 @CLIWriteCommand('healthcheck history clear')
1934 def _clear_healthchecks(self) -> HandleCommandResult:
1935 """Clear the healthcheck history"""
1936 self.health_history.reset()
1937 return HandleCommandResult(retval=0, stdout="healthcheck history cleared")
1938
1939
1940 class StandbyModule(MgrStandbyModule):
1941
1942 MODULE_OPTIONS = Module.MODULE_OPTIONS
1943
1944 def __init__(self, *args: Any, **kwargs: Any) -> None:
1945 super(StandbyModule, self).__init__(*args, **kwargs)
1946 self.shutdown_event = threading.Event()
1947
1948 def serve(self) -> None:
1949 server_addr = self.get_localized_module_option(
1950 'server_addr', get_default_addr())
1951 server_port = self.get_localized_module_option(
1952 'server_port', DEFAULT_PORT)
1953 self.log.info("server_addr: %s server_port: %s" %
1954 (server_addr, server_port))
1955 cherrypy.config.update({
1956 'server.socket_host': server_addr,
1957 'server.socket_port': server_port,
1958 'engine.autoreload.on': False,
1959 'request.show_tracebacks': False
1960 })
1961
1962 module = self
1963
1964 class Root(object):
1965 @cherrypy.expose
1966 def index(self) -> str:
1967 standby_behaviour = module.get_module_option('standby_behaviour')
1968 if standby_behaviour == 'default':
1969 active_uri = module.get_active_uri()
1970 return '''<!DOCTYPE html>
1971 <html>
1972 <head><title>Ceph Exporter</title></head>
1973 <body>
1974 <h1>Ceph Exporter</h1>
1975 <p><a href='{}metrics'>Metrics</a></p>
1976 </body>
1977 </html>'''.format(active_uri)
1978 else:
1979 status = module.get_module_option('standby_error_status_code')
1980 raise cherrypy.HTTPError(status, message="Keep on looking")
1981
1982 @cherrypy.expose
1983 def metrics(self) -> str:
1984 cherrypy.response.headers['Content-Type'] = 'text/plain'
1985 return ''
1986
1987 cherrypy.tree.mount(Root(), '/', {})
1988 self.log.info('Starting engine...')
1989 cherrypy.engine.start()
1990 self.log.info('Engine started.')
1991 # Wait for shutdown event
1992 self.shutdown_event.wait()
1993 self.shutdown_event.clear()
1994 cherrypy.engine.stop()
1995 cherrypy.server.httpserver = None
1996 self.log.info('Engine stopped.')
1997
1998 def shutdown(self) -> None:
1999 self.log.info("Stopping engine...")
2000 self.shutdown_event.set()
2001 self.log.info("Stopped engine")